Class: Raptor::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/raptor/reactor.rb,
sig/generated/raptor/reactor.rbs

Overview

High-performance I/O reactor for managing client connections and timeouts.

Reactor uses NIO selectors for efficient I/O multiplexing and implements client timeouts using a red-black tree for O(log n) timeout management. It coordinates between thread pools for blocking operations and ractor pools for CPU-intensive HTTP parsing, and provides backlog metrics that the server uses for backpressure control to prevent overload.

Examples:

reactor = Reactor.new(thread_pool, ractor_pool, client_options: {
  first_data_timeout: 30,
  chunk_data_timeout: 10
})
reactor.run
reactor.add(id: client.object_id, socket: client)
# ... later
reactor.shutdown

Defined Under Namespace

Classes: TimeoutClient

Constant Summary collapse

CHUNK_SIZE =

Returns:

  • (Object)
64 * 1024
TIMEOUT_RESPONSE =

Returns:

  • (::String)
"HTTP/1.1 408 Request Timeout\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"

Instance Method Summary collapse

Constructor Details

#initialize(thread_pool, ractor_pool, client_options:) ⇒ Reactor

Creates a new Reactor instance.

Parameters:

  • thread_pool (AtomicThreadPool)

    thread pool for application processing

  • ractor_pool (RactorPool)

    ractor pool for HTTP parsing

  • client_options (Hash)

    timeout configuration options

  • client_options: (Hash[Symbol, Integer])

Options Hash (client_options:):

  • :first_data_timeout (Integer)

    timeout for initial data

  • :chunk_data_timeout (Integer)

    timeout for subsequent chunks

  • :persistent_data_timeout (Integer)

    timeout for keep-alive connections



91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/raptor/reactor.rb', line 91

def initialize(thread_pool, ractor_pool, client_options:)
  @thread_pool = thread_pool
  @ractor_pool = ractor_pool
  @client_options = client_options

  @selector = NIO::Selector.new
  @queue = Queue.new
  @timeouts = RedBlackTree.new

  @id_to_socket = {}
  @socket_to_state = {}
  @id_to_timeout = {}
  @id_to_writer = {}
end

Instance Method Details

#add(state) ⇒ void

This method returns an undefined value.

Adds a new client connection to the reactor.

Parameters:

  • state (Hash)

    client connection state including socket and ID

Options Hash (state):

  • :socket (TCPSocket)

    the client socket

  • :id (Integer)

    unique identifier for the client



162
163
164
165
166
167
168
169
170
171
# File 'lib/raptor/reactor.rb', line 162

def add(state)
  socket = state[:socket]
  state.delete(:socket)
  writer = state.delete(:writer)
  @id_to_socket[state[:id]] = socket
  @socket_to_state[socket] = state
  @id_to_writer[state[:id]] = writer if writer

  read_and_queue_for_parse(socket, state)
end

#backlogInteger

Returns the number of complete requests either being processed or awaiting processing.

Returns:

  • (Integer)

    number of complete requests



304
305
306
# File 'lib/raptor/reactor.rb', line 304

def backlog
  @thread_pool.queue_size + @thread_pool.active_count
end

#cleanup(socket) ⇒ void

This method returns an undefined value.

Cleans up a client connection by removing it from tracking and closing the socket.

Parameters:

  • socket (TCPSocket)

    the socket to clean up



384
385
386
387
388
389
# File 'lib/raptor/reactor.rb', line 384

def cleanup(socket)
  state = @socket_to_state.delete(socket)
  @id_to_socket.delete(state[:id])
  @id_to_writer.delete(state[:id])
  socket.close
end

#complete?(state) ⇒ Boolean

Checks if a request is complete i.e., processable.

Parameters:

  • state (Hash)

    connection state

Returns:

  • (Boolean)

    true if the request is complete



397
398
399
# File 'lib/raptor/reactor.rb', line 397

def complete?(state)
  state[:complete]
end

#first_data_received?(state) ⇒ Boolean

Checks if any data has been received for this connection.

Parameters:

  • state (Hash)

    connection state

Returns:

  • (Boolean)

    true if first data has been received



407
408
409
# File 'lib/raptor/reactor.rb', line 407

def first_data_received?(state)
  complete?(state) || (state.dig(:parse_data, :parse_count) || 0) >= 1
end

#persist(socket, id, request_count, remote_addr:, url_scheme:) ⇒ void

This method returns an undefined value.

Re-registers a kept-alive connection for the next request cycle.

Called after successfully writing a response when keep-alive is active. Resets the connection state and re-queues the socket in the selector using the persistent data timeout.

Parameters:

  • socket (TCPSocket)

    the kept-alive client socket

  • id (Integer)

    the unique client identifier

  • request_count (Integer)

    number of requests handled on this connection

  • remote_addr (String)

    the client's remote IP address

  • url_scheme (String)

    "http" or "https"

  • remote_addr: (String)
  • url_scheme: (String)


223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/raptor/reactor.rb', line 223

def persist(socket, id, request_count, remote_addr:, url_scheme:)
  state = {
    id: id,
    request_count: request_count,
    remote_addr: remote_addr,
    url_scheme: url_scheme,
    persisted: true
  }

  @id_to_socket[id] = socket
  @socket_to_state[socket] = state
  @queue << socket
  @selector.wakeup
rescue ClosedQueueError
  socket.close
end

#read_and_queue_for_parse(socket, state) ⇒ Hash?

Reads data from a socket and either queues it for parsing, or for selector registration.

Parameters:

  • socket (TCPSocket)

    the socket to read from and queue

  • state (Hash)

    current connection state

Returns:

  • (Hash, nil)

    updated state, if successful



355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/raptor/reactor.rb', line 355

def read_and_queue_for_parse(socket, state)
  data = begin
    socket.read_nonblock(CHUNK_SIZE)
  rescue IO::WaitReadable
    @queue << socket
    @selector.wakeup
    return
  rescue EOFError
    cleanup(socket)
    return
  end

  buffer = state[:buffer] ? state[:buffer].dup : String.new
  buffer << data

  while socket.respond_to?(:pending) && socket.pending > 0
    buffer << socket.read_nonblock(socket.pending)
  end

  state = state.frozen? ? state.merge(buffer: buffer) : state.merge!(buffer: buffer)
  @ractor_pool << Ractor.make_shareable(state)
end

#register(socket) ⇒ void

This method returns an undefined value.

Registers a socket with the NIO selector and sets up timeout tracking.

Parameters:

  • socket (TCPSocket)

    the socket to register



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/raptor/reactor.rb', line 316

def register(socket)
  @selector.register(socket, :r).value = socket

  state = @socket_to_state[socket]
  client = TimeoutClient.new(state)
  timeout = if state[:persisted]
    @client_options[:persistent_data_timeout]
  elsif first_data_received?(state)
    @client_options[:chunk_data_timeout]
  else
    @client_options[:first_data_timeout]
  end
  client.timeout_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
  @timeouts << client
  @id_to_timeout[state[:id]] = client
end

#remove(id) ⇒ TCPSocket?

Removes a client connection from the reactor.

Called when an HTTP request is complete and ready for application processing. Triggers server accept re-enabling if system capacity allows.

Parameters:

  • id (Integer)

    unique client identifier

Returns:

  • (TCPSocket, nil)

    the removed socket, if found



203
204
205
206
207
# File 'lib/raptor/reactor.rb', line 203

def remove(id)
  @id_to_socket.delete(id).tap do |socket|
    @socket_to_state.delete(socket)
  end
end

#runThread

Starts the reactor's main event loop in a new thread.

The event loop handles I/O events, processes timeouts, manages the registration queue, and controls server connection acceptance. It continues until the queue is closed and emptied.

Returns:

  • (Thread)

    the thread running the reactor event loop



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/raptor/reactor.rb', line 115

def run
  Thread.new do
    Thread.current.name = self.class.name

    until @queue.closed? && @queue.empty?
      timeout = @timeouts.min&.timeout(Process.clock_gettime(Process::CLOCK_MONOTONIC))
      @selector.select(timeout) do |monitor|
        wakeup!(monitor.value)
      end

      now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      expired = []
      @timeouts.traverse do |to_client|
        break unless to_client.timeout(now) == 0

        expired << to_client
      end

      expired.each do |to_client|
        @timeouts.delete!(to_client)
        id = to_client.client_data[:id]
        @id_to_timeout.delete(id)
        socket = @id_to_socket[id]
        next unless socket

        @selector.deregister(socket)
        socket.write(TIMEOUT_RESPONSE) rescue nil
        cleanup(socket)
      end

      until @queue.empty?
        register(@queue.pop)
      end
    end

    @selector.close
  end
end

#shutdownvoid

This method returns an undefined value.

Initiates reactor shutdown.

Closes the registration queue and wakes up the selector to begin graceful shutdown process.



293
294
295
296
# File 'lib/raptor/reactor.rb', line 293

def shutdown
  @queue.close
  @selector.wakeup
end

#socket_for(id) ⇒ TCPSocket?

Returns the socket for a given client identifier without removing it.

Used by HTTP/2 connections where the socket remains registered across multiple stream requests.

Parameters:

  • id (Integer)

    unique client identifier

Returns:

  • (TCPSocket, nil)

    the socket, if found



249
250
251
# File 'lib/raptor/reactor.rb', line 249

def socket_for(id)
  @id_to_socket[id]
end

#update_http2_state(state) ⇒ void

This method returns an undefined value.

Updates connection state for an HTTP/2 connection after frame processing.

Re-registers the socket with the selector for further reads and stores the updated HPACK table and stream states.

Parameters:

  • state (Hash)

    updated connection state from the ractor pool



274
275
276
277
278
279
280
281
282
283
# File 'lib/raptor/reactor.rb', line 274

def update_http2_state(state)
  socket = @id_to_socket[state[:id]]
  return unless socket

  @socket_to_state[socket] = state
  @queue << socket
  @selector.wakeup
rescue ClosedQueueError
  socket.close
end

#update_state(state) ⇒ void

This method returns an undefined value.

Updates the state of an existing client connection.

Called when an incomplete HTTP request needs to be re-registered with the reactor for further processing.

Parameters:

  • state (Hash)

    updated client connection state

Options Hash (state):

  • :id (Integer)

    client identifier



183
184
185
186
187
188
189
190
191
192
# File 'lib/raptor/reactor.rb', line 183

def update_state(state)
  socket = @id_to_socket[state[:id]]
  return unless socket

  @socket_to_state[socket] = state
  @queue << socket
  @selector.wakeup
rescue ClosedQueueError
  socket.close
end

#wakeup!(socket) ⇒ void

This method returns an undefined value.

Handles socket wakeup by deregistering and queuing for processing.

Parameters:

  • socket (TCPSocket)

    the socket that became ready



339
340
341
342
343
344
345
# File 'lib/raptor/reactor.rb', line 339

def wakeup!(socket)
  @selector.deregister(socket)
  state = @socket_to_state[socket]
  to_client = @id_to_timeout.delete(state[:id])
  @timeouts.delete!(to_client)
  read_and_queue_for_parse(socket, state)
end

#writer_for(id) ⇒ Object?

Returns the writer object associated with a given connection, if one was supplied when the connection was added. Used by protocol handlers that need to coordinate concurrent socket writes.

Parameters:

  • id (Integer)

    unique client identifier

Returns:

  • (Object, nil)

    the writer, if found



261
262
263
# File 'lib/raptor/reactor.rb', line 261

def writer_for(id)
  @id_to_writer[id]
end