Class: Raptor::Cluster
- Inherits:
-
Object
- Object
- Raptor::Cluster
- Defined in:
- lib/raptor/cluster.rb,
sig/generated/raptor/cluster.rbs
Overview
Multi-process web server cluster with advanced concurrency architecture.
Cluster manages multiple worker processes, each running a complete server stack including a reactor thread, server thread, ractor pool for HTTP parsing, and thread pool for application processing. It handles process forking, signal management, graceful shutdown, and automatic worker restart when a worker process unexpectedly exits.
The architecture provides horizontal scaling through processes while maintaining efficient I/O and CPU utilization within each process through the combination of NIO reactors, ractor-based parsing, and thread pools.
Flow per worker process:
- Server continuously accepts connections but skips acceptance when backlog is high
- Reactor manages I/O multiplexing and provides backlog metrics for load control
- Ractor pool handles CPU-intensive HTTP parsing in parallel
- Thread pool processes Rack applications and handles response writing
- Natural load balancing occurs through backpressure-based acceptance control
Class Method Summary collapse
-
.run(options) ⇒ void
Convenience method to create and run a cluster with the given options.
Instance Method Summary collapse
-
#exit_description(status) ⇒ String
Returns a human-readable description of how a process exited.
-
#initialize(options) ⇒ Cluster
constructor
Creates a new Cluster with the specified configuration.
-
#log_initialization ⇒ void
Logs cluster initialization details including architecture and bind addresses.
-
#log_stats ⇒ void
Logs current stats for all workers to stdout.
-
#run ⇒ void
Starts the multi-process cluster and manages worker processes.
-
#run_worker(index) ⇒ void
Runs the full server stack inside a worker process.
-
#shutdown ⇒ void
Initiates graceful shutdown of the cluster.
-
#spawn_worker(index) ⇒ void
Forks a new worker process and registers it at the given index.
-
#stats ⇒ Array<Hash>
Returns stats for all worker processes.
-
#write_stats_file_loop ⇒ void
Writes the stats file on a 1-second interval until shutdown.
Constructor Details
#initialize(options) ⇒ Cluster
Creates a new Cluster with the specified configuration.
Initializes the cluster with thread, ractor, and worker counts, sets up network binding, loads the Rack application, and prepares for multi-process operation.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/raptor/cluster.rb', line 86 def initialize() @thread_count = [:threads] @ractor_count = [:ractors] @worker_count = [:workers] @client_options = [:client] @binder = Binder.new([:binds]) @server_port = @binder.server_port @app = [:app] || Rack::Builder.parse_file([:rackup]) log_initialization @shutdown = false @workers = {} @stats = Stats.new(@worker_count) @stats_file = [:stats_file] end |
Class Method Details
.run(options) ⇒ void
This method returns an undefined value.
Convenience method to create and run a cluster with the given options.
53 54 55 |
# File 'lib/raptor/cluster.rb', line 53 def self.run() new().run end |
Instance Method Details
#exit_description(status) ⇒ String
Returns a human-readable description of how a process exited.
282 283 284 285 286 287 288 289 290 |
# File 'lib/raptor/cluster.rb', line 282 def exit_description(status) if status.exited? "exited with code #{status.exitstatus}" elsif status.signaled? "killed by SIG#{Signal.signame(status.termsig)}" else "exited" end end |
#log_initialization ⇒ void
This method returns an undefined value.
Logs cluster initialization details including architecture and bind addresses.
Outputs a hierarchical view of the cluster configuration showing the master process, worker processes, and per-process thread/ractor allocation along with listening addresses.
312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/raptor/cluster.rb', line 312 def log_initialization puts "Raptor Cluster initializing:" puts "├─ Version: #{VERSION}" puts "├─ Ruby Version: #{RUBY_DESCRIPTION}" puts "├─ Master PID: #{Process.pid}" puts "│ └─ #{@worker_count} worker process#{"es" if @worker_count > 1}" puts "│ ├─ 1 server thread" puts "│ ├─ 1 reactor thread" puts "│ ├─ #{@ractor_count} pipeline ractor#{"s" if @ractor_count > 1}" puts "│ ├─ 1 pipeline collector thread" puts "│ ├─ #{@thread_count} worker thread#{"s" if @thread_count > 1}" puts "│ └─ 1 stats thread" puts "└─ Listening on #{@binder.addresses.join(", ")}" end |
#log_stats ⇒ void
This method returns an undefined value.
Logs current stats for all workers to stdout.
Triggered by SIGUSR1 in the master process.
334 335 336 337 338 339 340 341 |
# File 'lib/raptor/cluster.rb', line 334 def log_stats @stats.all.each_with_index do |stat, index| status = stat[:booted] ? "booted" : "starting" puts "Worker #{index}: pid=#{stat[:pid]}, requests=#{stat[:requests]}, " \ "backlog=#{stat[:backlog]}, #{status}, " \ "last_checkin=#{Time.at(stat[:last_checkin]).strftime("%H:%M:%S")}" end end |
#run ⇒ void
This method returns an undefined value.
Starts the multi-process cluster and manages worker processes.
Forks the configured number of worker processes and monitors them, automatically restarting any that exit unexpectedly. Handles graceful shutdown via INT or TERM signals, and stats logging via USR1.
Each worker process includes:
- 1 server thread (continuously accepts connections with backpressure control)
- 1 reactor thread (I/O multiplexing, timeout handling, backlog monitoring)
- N ractor workers (parallel HTTP parsing)
- 1 ractor collector thread (coordinates parsing results)
- M worker threads (Rack application processing and response writing)
- 1 stats thread (writes per-worker metrics to shared memory every second)
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/raptor/cluster.rb', line 120 def run trap("INT") { shutdown } trap("TERM") { shutdown } trap("USR1") { log_stats } @worker_count.times { |index| spawn_worker(index) } stats_file_thread = if @stats_file Thread.new do Thread.current.name = "Raptor Stats File" write_stats_file_loop end end until @shutdown begin pid, status = Process.wait2(-1, Process::WNOHANG) rescue Errno::ECHILD break end if pid index = @workers.key(pid) @workers.delete(index) unless @shutdown warn "[#{Process.pid}] Restarting worker #{index} (#{pid}), #{exit_description(status)}" spawn_worker(index) end else sleep 0.1 end end @workers.values.each { |pid| Process.kill("TERM", pid) rescue nil } @workers.values.each { |pid| Process.wait(pid) rescue nil } stats_file_thread&.join File.delete(@stats_file) rescue nil if @stats_file @stats.unmap end |
#run_worker(index) ⇒ void
This method returns an undefined value.
Runs the full server stack inside a worker process.
Sets up and coordinates the reactor, server, ractor pool, thread pool, and stats thread, running until a shutdown signal is received or a critical component fails.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/raptor/cluster.rb', line 195 def run_worker(index) shutdown_requested = false trap("INT") { shutdown_requested = true } trap("TERM") { shutdown_requested = true } started_at = Process.clock_gettime(Process::CLOCK_REALTIME) request_count = 0 @stats.write( index, pid: Process.pid, requests: 0, backlog: 0, started_at:, last_checkin: started_at, booted: false ) reactor = nil counting_app = ->(env) { request_count += 1 @app.call(env) } thread_pool = AtomicThreadPool.new(name: "Raptor Workers", size: @thread_count) request = Request.new(counting_app, @server_port) http2 = Http2.new(counting_app, @server_port) ractor_pool = RactorPool.new( name: "Raptor Pipeline Workers", size: @ractor_count, worker: request.http_parser_worker ) do |parsed_result| if parsed_result[:protocol] == :http2 http2.handle_parsed_request(parsed_result, reactor, thread_pool) else request.handle_parsed_request(parsed_result, reactor, thread_pool) end end reactor = Reactor.new(thread_pool, ractor_pool, client_options: @client_options) reactor_thread = reactor.run server = Server.new(@binder, reactor, thread_pool, request) server_thread = server.run puts "[#{Process.pid}] Worker #{index} booted" stats_thread = Thread.new do Thread.current.name = "Raptor Stats" loop do @stats.write( index, pid: Process.pid, requests: request_count, backlog: reactor.backlog, started_at:, last_checkin: Process.clock_gettime(Process::CLOCK_REALTIME), booted: true ) break if shutdown_requested sleep 1 end end until shutdown_requested break unless server_thread.alive? && reactor_thread.alive? sleep 0.5 end server.shutdown server_thread.join reactor.shutdown reactor_thread.join ractor_pool.shutdown thread_pool.shutdown stats_thread.join end |
#shutdown ⇒ void
This method returns an undefined value.
Initiates graceful shutdown of the cluster.
297 298 299 300 301 |
# File 'lib/raptor/cluster.rb', line 297 def shutdown return if @shutdown @shutdown = true end |
#spawn_worker(index) ⇒ void
This method returns an undefined value.
Forks a new worker process and registers it at the given index.
180 181 182 183 |
# File 'lib/raptor/cluster.rb', line 180 def spawn_worker(index) pid = fork { run_worker(index) } @workers[index] = pid end |
#stats ⇒ Array<Hash>
Returns stats for all worker processes.
168 169 170 |
# File 'lib/raptor/cluster.rb', line 168 def stats @stats.all end |
#write_stats_file_loop ⇒ void
This method returns an undefined value.
Writes the stats file on a 1-second interval until shutdown.
348 349 350 351 352 353 354 355 356 |
# File 'lib/raptor/cluster.rb', line 348 def write_stats_file_loop loop do File.write(@stats_file, JSON.generate({ master_pid: Process.pid, workers: @stats.all })) break if @shutdown sleep 1 end rescue SystemCallError end |