Class: Raptor::Request

Inherits:
Object
  • Object
show all
Defined in:
lib/raptor/request.rb,
sig/generated/raptor/request.rbs

Overview

Handles HTTP request processing and Rack application integration.

Request manages the HTTP parsing pipeline using Ractors and coordinates with the reactor for connection state management. It bridges between the low-level HTTP parsing and high-level Rack application interface, handling both incomplete requests (that need more data) and complete requests (ready for application processing).

Defined Under Namespace

Classes: Error, WriteError

Constant Summary collapse

BODY_BUFFER_THRESHOLD =

Returns:

  • (Object)
256 * 1024
FILE_CHUNK_SIZE =

Returns:

  • (Object)
64 * 1024
READ_BUFFER_SIZE =

Returns:

  • (Object)
64 * 1024
WRITE_TIMEOUT =

Returns:

  • (::Integer)
5
KEEPALIVE_READ_TIMEOUT =

Returns:

  • (::Float)
0.001
MAX_KEEPALIVE_REQUESTS =

Returns:

  • (::Integer)
100
HTTP_SCHEME =

Returns:

  • (::String)
"http"
HTTP_10 =

Returns:

  • (::String)
"HTTP/1.0"
HTTP_11 =

Returns:

  • (::String)
"HTTP/1.1"
STATUS_LINE_CACHE_10 =

Returns:

  • (Object)
Hash.new do |h, status|
  reason = Rack::Utils::HTTP_STATUS_CODES[status]
  h[status] = "HTTP/1.0 #{status}#{reason ? " #{reason}" : ""}\r\n".freeze
end
STATUS_LINE_CACHE_11 =

Returns:

  • (Object)
Hash.new do |h, status|
  reason = Rack::Utils::HTTP_STATUS_CODES[status]
  h[status] = "HTTP/1.1 #{status}#{reason ? " #{reason}" : ""}\r\n".freeze
end
STATUS_WITH_NO_ENTITY_BODY =

Returns:

  • (Object)
Set.new([204, 304, *100..199]).freeze
ERROR_RESPONSE_500 =

Returns:

  • (::String)
"HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
CONNECTION_CLOSE =

Returns:

  • (::String)
"close"
CONNECTION_KEEPALIVE =

Returns:

  • (::String)
"keep-alive"
TRANSFER_ENCODING_CHUNKED =

Returns:

  • (::String)
"chunked"
HTTP_CONNECTION =

Returns:

  • (::String)
"HTTP_CONNECTION"
HTTP_TRANSFER_ENCODING =

Returns:

  • (::String)
"HTTP_TRANSFER_ENCODING"
RACK_HEADER_PREFIX =

Returns:

  • (::String)
"rack."
RACK_HIJACKED =

Returns:

  • (::String)
"rack.hijacked"
RACK_HIJACK_IO =

Returns:

  • (::String)
"rack.hijack_io"
ILLEGAL_HEADER_KEY_REGEX =

Returns:

  • (::Regexp)
/[\x00-\x20\(\)<>@,;:\\"\/\[\]\?=\{\}\x7F]/
ILLEGAL_HEADER_VALUE_REGEX =

Returns:

  • (::Regexp)
/[\x00-\x08\x0A-\x1F]/

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, server_port) ⇒ Request

Creates a new Request handler.

Parameters:

  • app (#call)

    the Rack application to dispatch complete requests to

  • server_port (Integer)

    port number used to populate SERVER_PORT in the Rack env



101
102
103
104
# File 'lib/raptor/request.rb', line 101

def initialize(app, server_port)
  @app = app
  @server_port = server_port
end

Class Method Details

.decode_chunked(buffer) ⇒ Array(String, Boolean)

Decodes a chunked transfer-encoded body buffer.

Returns the decoded bytes and a flag indicating whether the terminating zero-length chunk was found. The decoder stops at the first unparseable boundary (incomplete CRLF) or zero-length chunk.

Parameters:

  • buffer (String)

    the raw body buffer to decode

Returns:

  • (Array(String, Boolean))

    decoded body and completion flag



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/raptor/request.rb', line 72

def self.decode_chunked(buffer)
  decoded = String.new
  offset = 0

  while offset < buffer.bytesize
    crlf = buffer.index("\r\n", offset)
    return [decoded, false] unless crlf

    chunk_size = buffer.byteslice(offset, crlf - offset).to_i(16)
    return [decoded, true] if chunk_size == 0

    offset = crlf + 2
    decoded << buffer.byteslice(offset, chunk_size)
    offset += chunk_size + 2
  end

  [decoded, false]
end

Instance Method Details

#build_rack_env(env, parse_data, body, socket, remote_addr: "127.0.0.1", url_scheme: HTTP_SCHEME) ⇒ Hash

Builds a Rack environment hash from parsed HTTP request data.

Populates all required Rack env keys including rack.* keys, REMOTE_ADDR, SERVER_NAME, SERVER_PORT, and hijack support.

Parameters:

  • env (Hash)

    partial env hash from the HTTP parser

  • parse_data (Hash)

    metadata from the parsing pass, including content_length

  • body (String, nil)

    decoded request body, or nil if no body

  • socket (TCPSocket)

    the client socket, used for hijack support

  • remote_addr (String) (defaults to: "127.0.0.1")

    client IP address

  • url_scheme (String) (defaults to: HTTP_SCHEME)

    "http" or "https"

  • remote_addr: (String) (defaults to: "127.0.0.1")
  • url_scheme: (String) (defaults to: HTTP_SCHEME)

Returns:

  • (Hash)

    fully populated Rack environment hash



476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
# File 'lib/raptor/request.rb', line 476

def build_rack_env(env, parse_data, body, socket, remote_addr: "127.0.0.1", url_scheme: HTTP_SCHEME)
  env[Rack::RACK_VERSION] = Rack::VERSION
  env[Rack::RACK_URL_SCHEME] = url_scheme
  env[Rack::RACK_INPUT] = (body ? StringIO.new(body) : StringIO.new).set_encoding(Encoding::ASCII_8BIT)
  env[Rack::RACK_ERRORS] = $stderr
  env[Rack::RACK_RESPONSE_FINISHED] = []

  env[Rack::RACK_IS_HIJACK] = true
  env[Rack::RACK_HIJACK] = proc do
    env[RACK_HIJACKED] = true
    env[RACK_HIJACK_IO] = socket
    socket
  end

  env[Rack::RACK_EARLY_HINTS] = proc do |hints|
    send_early_hints(socket, hints) rescue nil
  end

  env[Rack::SCRIPT_NAME] = "" unless env.key?(Rack::SCRIPT_NAME)
  env[Rack::PATH_INFO] = env.delete(Rack::REQUEST_PATH) if env.key?(Rack::REQUEST_PATH)
  env[Rack::PATH_INFO] = "" unless env.key?(Rack::PATH_INFO)
  env[Rack::QUERY_STRING] = "" unless env.key?(Rack::QUERY_STRING)

  if (content_length = parse_data[:content_length]).positive?
    env["CONTENT_LENGTH"] = content_length.to_s
  end

  env["REMOTE_ADDR"] = remote_addr

  http_host = env[Rack::HTTP_HOST]
  if http_host
    if http_host.start_with?("[")
      host = http_host[/\A\[([^\]]+)\]/, 1]
      port = http_host[/\]:(\d+)\z/, 1]
    else
      host, port = http_host.split(":", 2)
    end
    env[Rack::SERVER_NAME] ||= host
    env[Rack::SERVER_PORT] ||= port || @server_port.to_s
  else
    env[Rack::SERVER_NAME] ||= "localhost"
    env[Rack::SERVER_PORT] ||= @server_port.to_s
  end

  env
end

#build_status_line(http_version, status) ⇒ String

Builds the HTTP status line string.

Parameters:

  • http_version (String)

    "HTTP/1.1" or "HTTP/1.0"

  • status (Integer)

    HTTP status code

Returns:

  • (String)

    the status line including trailing CRLF



684
685
686
687
# File 'lib/raptor/request.rb', line 684

def build_status_line(http_version, status)
  cache = http_version == HTTP_11 ? STATUS_LINE_CACHE_11 : STATUS_LINE_CACHE_10
  cache[status].dup
end

#calculate_content_length(body) ⇒ Integer?

Calculates content length from an array or file body without consuming it.

Returns nil for enumerable bodies whose length cannot be determined upfront.

Parameters:

  • body (Object)

    the response body

Returns:

  • (Integer, nil)

    the byte length, or nil if it cannot be determined



799
800
801
802
803
804
805
806
807
808
809
810
# File 'lib/raptor/request.rb', line 799

def calculate_content_length(body)
  if body.respond_to?(:to_ary)
    array = body.to_ary
    return nil unless array.is_a?(Array)

    array.sum { |chunk| chunk.is_a?(String) ? chunk.bytesize : 0 }
  elsif body.respond_to?(:to_path) && (path = body.to_path) && File.readable?(path)
    File.size(path)
  else
    nil
  end
end

#call_response_finished(env, status, headers, error) ⇒ void

This method returns an undefined value.

Calls all rack.response_finished callbacks registered in the environment.

Callbacks are called in reverse registration order. Individual callback failures are rescued so all callbacks are always attempted.

Parameters:

  • env (Hash, nil)

    the Rack environment

  • status (Integer, nil)

    the response status code

  • headers (Hash, nil)

    the response headers

  • error (Exception, nil)

    any error raised during processing, or nil on success



1009
1010
1011
1012
1013
1014
1015
# File 'lib/raptor/request.rb', line 1009

def call_response_finished(env, status, headers, error)
  return unless env && env[Rack::RACK_RESPONSE_FINISHED].is_a?(Array)

  env[Rack::RACK_RESPONSE_FINISHED].reverse_each do |callable|
    callable.call(env, status, headers, error) rescue nil
  end
end

#cork_socket(socket) ⇒ void

This method returns an undefined value.

Enables TCP_CORK on the socket to batch outgoing packets into fewer segments.

Only applies to TCP sockets. No-op on non-TCP sockets. Available on Linux only; this method is not defined on other platforms.

Parameters:

  • socket (TCPSocket)

    the socket to cork



1054
1055
1056
# File 'lib/raptor/request.rb', line 1054

def cork_socket(socket)
  socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if socket.is_a?(TCPSocket)
end

#eager_accept(socket, id, reactor, thread_pool, remote_addr, url_scheme) ⇒ void

This method returns an undefined value.

Eagerly reads and parses the first request on a freshly accepted connection on the server thread, dispatching directly to the thread pool when complete. Falls back to the reactor when more data is needed.

Parameters:

  • socket (TCPSocket)

    the freshly accepted client socket

  • id (Integer)

    unique client identifier

  • reactor (Reactor)

    the reactor for fallback registration

  • thread_pool (AtomicThreadPool)

    thread pool for application processing

  • remote_addr (String)

    client IP address

  • url_scheme (String)

    "http" or "https"



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/raptor/request.rb', line 119

def eager_accept(socket, id, reactor, thread_pool, remote_addr, url_scheme)
  data = begin
    socket.read_nonblock(READ_BUFFER_SIZE)
  rescue IO::WaitReadable
    reactor.add(
      id: id,
      socket: socket,
      remote_addr: remote_addr,
      url_scheme: url_scheme
    )
    return
  rescue EOFError, IOError
    socket.close rescue nil
    return
  end

  buffer = String.new
  buffer << data

  while socket.respond_to?(:pending) && socket.pending > 0
    buffer << socket.read_nonblock(socket.pending)
  end

  parser = HttpParser.new
  env = {}
  nread = parser.execute(env, buffer, 0)
  parse_data = { parse_count: 1, content_length: parser.content_length }

  body = nil
  if !parser.finished?
    fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
    return
  elsif parser.has_body?
    body = buffer.byteslice(nread..-1) || ""

    if env[HTTP_TRANSFER_ENCODING]&.include?(TRANSFER_ENCODING_CHUNKED)
      body, chunked_complete = Request.decode_chunked(body)
      if chunked_complete
        env.delete(HTTP_TRANSFER_ENCODING)
      else
        fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
        return
      end
    elsif parser.content_length > body.bytesize
      fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
      return
    end
  end

  thread_pool << proc do
    process_client(socket, id, env, parse_data, body, reactor, thread_pool, 1, remote_addr, url_scheme)
  end
end

#eager_keepalive(socket, id, reactor, thread_pool, request_count, remote_addr, url_scheme) ⇒ void

This method returns an undefined value.

Attempts to read and process subsequent requests inline on a kept-alive connection. Blocks briefly for the next request to avoid a full reactor round-trip. Falls back to the reactor when no data arrives within the timeout, when the thread pool has queued work (deprioritization), or when the request is incomplete.

Parameters:

  • socket (TCPSocket)

    the client socket

  • id (Integer)

    unique client identifier

  • reactor (Reactor)

    the reactor for fallback registration

  • thread_pool (AtomicThreadPool)

    thread pool for deprioritization

  • request_count (Integer)

    number of requests handled on this connection

  • remote_addr (String)

    client IP address

  • url_scheme (String)

    "http" or "https"



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/raptor/request.rb', line 351

def eager_keepalive(socket, id, reactor, thread_pool, request_count, remote_addr, url_scheme)
  loop do
    unless socket.wait_readable(KEEPALIVE_READ_TIMEOUT)
      reactor.persist(socket, id, request_count, remote_addr: remote_addr, url_scheme: url_scheme)
      return
    end

    data = begin
      socket.read_nonblock(READ_BUFFER_SIZE)
    rescue IO::WaitReadable
      reactor.persist(socket, id, request_count, remote_addr: remote_addr, url_scheme: url_scheme)
      return
    rescue EOFError
      socket.close rescue nil
      return
    end

    buffer = String.new
    buffer << data

    while socket.respond_to?(:pending) && socket.pending > 0
      buffer << socket.read_nonblock(socket.pending)
    end

    parser = HttpParser.new
    env = {}
    nread = parser.execute(env, buffer, 0)
    parse_data = { parse_count: 1, content_length: parser.content_length }

    body = nil
    if !parser.finished?
      fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, request_count, remote_addr, url_scheme)
      return
    elsif parser.has_body?
      body = buffer.byteslice(nread..-1) || ""

      chunked = env[HTTP_TRANSFER_ENCODING]&.include?(TRANSFER_ENCODING_CHUNKED)
      if chunked || parser.content_length > body.bytesize
        fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, request_count, remote_addr, url_scheme)
        return
      end
    end

    request_count += 1

    if thread_pool.queue_size > 0
      thread_pool << proc do
        process_client(
          socket,
          id,
          env,
          parse_data,
          body,
          reactor,
          thread_pool,
          request_count,
          remote_addr,
          url_scheme
        )
      end
      return
    end

    keep_alive = process_request(
      socket,
      env,
      parse_data,
      body,
      request_count,
      remote_addr,
      url_scheme
    )
    return unless keep_alive
  end
end

#fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, request_count, remote_addr, url_scheme, persisted: true) ⇒ void

This method returns an undefined value.

Re-registers a socket with the reactor for further processing when an incomplete request is received during eager accept or eager keep-alive.

The persisted flag selects between persistent_data_timeout (for kept-alive connections awaiting the next request) and chunk_data_timeout (for fresh connections awaiting the rest of the first request).

Parameters:

  • socket (TCPSocket)

    the client socket

  • id (Integer)

    unique client identifier

  • buffer (String)

    the partial request data already read

  • env (Hash)

    partial env hash from the HTTP parser

  • parse_data (Hash)

    metadata from the parsing pass

  • reactor (Reactor)

    the reactor to re-register with

  • request_count (Integer)

    number of requests handled on this connection

  • remote_addr (String)

    client IP address

  • url_scheme (String)

    "http" or "https"

  • persisted (Boolean) (defaults to: true)

    whether the connection has already completed at least one request

  • persisted: (Boolean) (defaults to: true)


447
448
449
450
451
452
453
454
455
456
457
458
459
460
# File 'lib/raptor/request.rb', line 447

def fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, request_count, remote_addr, url_scheme, persisted: true)
  reactor.persist(socket, id, request_count, remote_addr: remote_addr, url_scheme: url_scheme)
  state = {
    id: id,
    buffer: buffer,
    env: env,
    request_count: request_count,
    parse_data: parse_data,
    remote_addr: remote_addr,
    url_scheme: url_scheme
  }
  state[:persisted] = true if persisted
  reactor.update_state(Ractor.make_shareable(state))
end

#format_headers(headers) ⇒ String

Formats a headers hash into an HTTP header string.

Skips entries with illegal keys or values. Array values are written as separate header lines.

Parameters:

  • headers (Hash)

    normalized response headers

Returns:

  • (String)

    formatted header lines, each ending with CRLF



977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
# File 'lib/raptor/request.rb', line 977

def format_headers(headers)
  result = +""
  headers.each do |name, value|
    next if illegal_header_key?(name)

    if value.is_a?(Array)
      value.each do |header_value|
        next if illegal_header_value?(header_value.to_s)

        result << "#{name}: #{header_value}\r\n"
      end
    else
      next if illegal_header_value?(value.to_s)

      result << "#{name}: #{value}\r\n"
    end
  end
  result
end

#handle_parsed_request(parsed_request, reactor, thread_pool) ⇒ void

This method returns an undefined value.

Handles a parsed HTTP request by either continuing parsing or dispatching to the Rack app.

For incomplete requests, updates reactor state and re-registers for more I/O. For complete requests, removes from reactor, builds Rack env, and dispatches to thread pool.

Parameters:

  • parsed_request (Hash)

    the parsed request state from the ractor pool

  • reactor (Reactor)

    the reactor managing the client connection

  • thread_pool (AtomicThreadPool)

    thread pool for application processing



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/raptor/request.rb', line 235

def handle_parsed_request(parsed_request, reactor, thread_pool)
  unless parsed_request[:complete]
    reactor.update_state(parsed_request)
  else
    socket = reactor.remove(parsed_request[:id])
    request_count = (parsed_request[:request_count] || 0) + 1
    remote_addr = parsed_request[:remote_addr] || "127.0.0.1"
    url_scheme = parsed_request[:url_scheme] || HTTP_SCHEME

    thread_pool << proc do
      process_client(
        socket,
        parsed_request[:id],
        parsed_request[:env].dup,
        parsed_request[:parse_data],
        parsed_request[:body],
        reactor,
        thread_pool,
        request_count,
        remote_addr,
        url_scheme
      )
    end
  end
end

#http_parser_workerProc

Returns a Proc for HTTP parsing work in Ractor context.

The returned Proc processes raw socket data through the appropriate HTTP parser and returns either a complete request state (ready for app processing) or incomplete request state (needs more data).

Returns:

  • (Proc)

    a Ractor-safe proc that accepts a state hash and returns an updated state hash



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/raptor/request.rb', line 182

def http_parser_worker
  proc do |data|
    next Raptor::Http2.process_frames(data) if data[:protocol] == :http2

    parser = Raptor::HttpParser.new
    env = {}
    nread = parser.execute(env, data[:buffer], 0)
    parse_data = if data[:parse_data]
      data[:parse_data].dup
    else
      { parse_count: 0, content_length: parser.content_length }
    end
    parse_data[:parse_count] += 1

    message = if parser.finished?
      if parser.has_body?
        body_buffer = data[:buffer].byteslice(nread..-1) || ""

        if env[HTTP_TRANSFER_ENCODING]&.include?(TRANSFER_ENCODING_CHUNKED)
          decoded_body, chunked_complete = Raptor::Request.decode_chunked(body_buffer)

          if chunked_complete
            env.delete(HTTP_TRANSFER_ENCODING)
            data.merge(env: env, body: decoded_body, parse_data: parse_data, complete: true)
          else
            data.merge(env: env, parse_data: parse_data)
          end
        elsif parser.content_length > body_buffer.bytesize
          data.merge(env: env, parse_data: parse_data)
        else
          data.merge(env: env, body: body_buffer, parse_data: parse_data, complete: true)
        end
      else
        data.merge(env: env, body: nil, parse_data: parse_data, complete: true)
      end
    else
      data.merge(env: env, parse_data: parse_data)
    end
    Ractor.make_shareable(message)
  end
end

#illegal_header_key?(key) ⇒ Boolean

Returns true if the header key contains characters illegal in HTTP headers.

Parameters:

  • key (String)

    the header key to check

Returns:

  • (Boolean)

    true if the key is illegal



954
955
956
# File 'lib/raptor/request.rb', line 954

def illegal_header_key?(key)
  key.match?(ILLEGAL_HEADER_KEY_REGEX)
end

#illegal_header_value?(value) ⇒ Boolean

Returns true if the header value contains characters illegal in HTTP headers.

Parameters:

  • value (String)

    the header value to check

Returns:

  • (Boolean)

    true if the value is illegal



964
965
966
# File 'lib/raptor/request.rb', line 964

def illegal_header_value?(value)
  value.match?(ILLEGAL_HEADER_VALUE_REGEX)
end

#keep_alive?(env, request_count) ⇒ Boolean

Determines whether the connection should be kept alive after the response.

Returns false if the request limit has been reached. For HTTP/1.1, keep-alive is the default unless the client sent Connection: close. For HTTP/1.0, keep-alive must be explicitly requested.

Parameters:

  • env (Hash)

    the Rack environment

  • request_count (Integer)

    number of requests handled on this connection

Returns:

  • (Boolean)

    true if the connection should be kept alive



534
535
536
537
538
539
540
541
542
543
544
# File 'lib/raptor/request.rb', line 534

def keep_alive?(env, request_count)
  return false if request_count >= MAX_KEEPALIVE_REQUESTS

  connection_header = env[HTTP_CONNECTION]

  if env[Rack::SERVER_PROTOCOL] == HTTP_11
    !connection_header&.casecmp?(CONNECTION_CLOSE)
  else
    connection_header&.casecmp?(CONNECTION_KEEPALIVE) || false
  end
end

#normalize_headers(headers) ⇒ Hash

Normalizes response headers by downcasing keys and filtering invalid entries.

Removes headers with illegal keys, rack.* prefixed headers, and "status" headers. Raises if headers is not a Hash or contains non-String keys.

Parameters:

  • headers (Hash)

    raw headers from the Rack application

Returns:

  • (Hash)

    normalized headers with lowercased string keys

Raises:

  • (TypeError)

    if headers is not a Hash or a key is not a String



640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
# File 'lib/raptor/request.rb', line 640

def normalize_headers(headers)
  raise TypeError, "headers must be a Hash" unless headers.is_a?(Hash)

  normalized = {}
  headers.each do |key, value|
    raise TypeError, "header keys must be Strings" unless key.is_a?(String)

    next if illegal_header_key?(key)

    normalized_key = key.match?(/[A-Z]/) ? key.downcase : key
    next if normalized_key.start_with?(RACK_HEADER_PREFIX)
    next if normalized_key == "status"

    normalized[normalized_key] = value
  end
  normalized
end

#process_client(socket, id, env, parse_data, body, reactor, thread_pool, request_count, remote_addr, url_scheme) ⇒ void

This method returns an undefined value.

Processes a client connection by handling the current request and, if keep-alive, eagerly reading subsequent requests inline.

Parameters:

  • socket (TCPSocket)

    the client socket

  • id (Integer)

    unique client identifier

  • env (Hash)

    partial env hash from the HTTP parser

  • parse_data (Hash)

    metadata from the parsing pass

  • body (String, nil)

    decoded request body

  • reactor (Reactor)

    the reactor managing the client connection

  • thread_pool (AtomicThreadPool)

    thread pool for application processing

  • request_count (Integer)

    number of requests handled on this connection

  • remote_addr (String)

    client IP address

  • url_scheme (String)

    "http" or "https"



279
280
281
282
# File 'lib/raptor/request.rb', line 279

def process_client(socket, id, env, parse_data, body, reactor, thread_pool, request_count, remote_addr, url_scheme)
  keep_alive = process_request(socket, env, parse_data, body, request_count, remote_addr, url_scheme)
  eager_keepalive(socket, id, reactor, thread_pool, request_count, remote_addr, url_scheme) if keep_alive
end

#process_request(socket, env, parse_data, body, request_count, remote_addr, url_scheme) ⇒ Boolean

Builds the Rack env, calls the application, and writes the response. Returns true if the connection should be kept alive for further requests, false otherwise (including hijack and error cases).

Parameters:

  • socket (TCPSocket)

    the client socket

  • env (Hash)

    partial env hash from the HTTP parser

  • parse_data (Hash)

    metadata from the parsing pass

  • body (String, nil)

    decoded request body

  • request_count (Integer)

    number of requests handled on this connection

  • remote_addr (String)

    client IP address

  • url_scheme (String)

    "http" or "https"

Returns:

  • (Boolean)

    true if the connection should be kept alive



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/raptor/request.rb', line 298

def process_request(socket, env, parse_data, body, request_count, remote_addr, url_scheme)
  rack_env = nil
  status = nil
  headers = nil
  hijacked = false
  keep_alive = false
  response_started = false

  begin
    rack_env = build_rack_env(env, parse_data, body, socket, remote_addr: remote_addr, url_scheme: url_scheme)
    status, headers, body = @app.call(rack_env)

    if rack_env[RACK_HIJACKED]
      hijacked = true
      body.close if body.respond_to?(:close)
    else
      hijacked = headers.is_a?(Hash) && !!headers[Rack::RACK_HIJACK]
      streaming = body.respond_to?(:call) && !body.respond_to?(:each)
      keep_alive = (hijacked || streaming) ? false : keep_alive?(rack_env, request_count)
      response_started = true
      write_response(socket, rack_env, status, headers, body, keep_alive: keep_alive)
    end

    call_response_finished(rack_env, status, headers, nil)
    keep_alive && !hijacked
  rescue => error
    call_response_finished(rack_env, status, headers, error) if rack_env
    socket.write(ERROR_RESPONSE_500) rescue nil unless response_started || hijacked
    keep_alive = false
    raise
  ensure
    unless hijacked || keep_alive
      socket.close rescue nil
    end
  end
end

#send_early_hints(socket, hints) ⇒ void

This method returns an undefined value.

Sends an HTTP 103 Early Hints response to the client.

Skips any hints with illegal header keys or values. No-ops if hints is empty.

Parameters:

  • socket (TCPSocket)

    the client socket to write to

  • hints (Hash)

    header name to value (or array of values) pairs



555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
# File 'lib/raptor/request.rb', line 555

def send_early_hints(socket, hints)
  return if hints.empty?

  response = +"#{HTTP_11} 103 Early Hints\r\n"
  hints.each do |key, value|
    next if illegal_header_key?(key)

    values = value.is_a?(Array) ? value : [value]
    values.each do |hint_value|
      next if illegal_header_value?(hint_value.to_s)

      response << "#{key.downcase}: #{hint_value}\r\n"
    end
  end
  response << "\r\n"

  socket_write(socket, response)
end

#socket_write(socket, string) ⇒ void

This method returns an undefined value.

Writes a string to the socket, retrying on partial writes and flow control blocks.

Uses write_nonblock with a 5-second writable timeout to avoid blocking the thread indefinitely on slow clients.

Parameters:

  • socket (TCPSocket)

    the socket to write to

  • string (String)

    the data to write

Raises:

  • (WriteError)

    if the socket is not writable within the timeout or raises IOError



1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
# File 'lib/raptor/request.rb', line 1028

def socket_write(socket, string)
  bytes = 0
  byte_size = string.bytesize

  while bytes < byte_size
    begin
      bytes += socket.write_nonblock(bytes.zero? ? string : string.byteslice(bytes..-1))
    rescue IO::WaitWritable
      raise WriteError unless socket.wait_writable(WRITE_TIMEOUT)
      retry
    rescue IOError
      raise WriteError
    end
  end
end

#uncork_socket(socket) ⇒ void

This method returns an undefined value.

Disables TCP_CORK on the socket, flushing any buffered packets.

Only applies to TCP sockets. No-op on non-TCP sockets. Available on Linux only; this method is not defined on other platforms.

Parameters:

  • socket (TCPSocket)

    the socket to uncork



1067
1068
1069
# File 'lib/raptor/request.rb', line 1067

def uncork_socket(socket)
  socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if socket.is_a?(TCPSocket)
end

#validate_headers(headers, status) ⇒ void

This method returns an undefined value.

Validates that headers are appropriate for the given status code.

Raises if content-type or content-length are present for status codes that must not have an entity body (204, 304, 1xx).

Parameters:

  • headers (Hash)

    normalized response headers

  • status (Integer)

    HTTP status code

Raises:

  • (ArgumentError)

    if a forbidden header is present for the status



669
670
671
672
673
674
675
# File 'lib/raptor/request.rb', line 669

def validate_headers(headers, status)
  if STATUS_WITH_NO_ENTITY_BODY.include?(status)
    raise ArgumentError, "content-type must not be present for status #{status}" if headers.key?(Rack::CONTENT_TYPE)

    raise ArgumentError, "content-length must not be present for status #{status}" if headers.key?(Rack::CONTENT_LENGTH)
  end
end

#validate_status(status) ⇒ void

This method returns an undefined value.

Validates that the status code is a valid integer.

Parameters:

  • status (Object)

    the status value to validate

Raises:

  • (TypeError)

    if status is not an Integer

  • (ArgumentError)

    if status is less than 100



624
625
626
627
628
# File 'lib/raptor/request.rb', line 624

def validate_status(status)
  raise TypeError, "status must be an Integer" unless status.is_a?(Integer)

  raise ArgumentError, "status must be >= 100" unless status >= 100
end

#write_array_body(socket, response, body_array, use_chunked) ⇒ void

This method returns an undefined value.

Writes an array body to the socket.

Dispatches to the single-chunk or multi-chunk path based on array length.

Parameters:

  • socket (TCPSocket)

    the client socket

  • response (String)

    headers already serialized, to be written before the body

  • body_array (Array<String>)

    the response body chunks

  • use_chunked (Boolean)

    whether to use chunked transfer encoding



853
854
855
856
857
858
859
# File 'lib/raptor/request.rb', line 853

def write_array_body(socket, response, body_array, use_chunked)
  if body_array.length == 1
    write_single_chunk(socket, response, body_array.first, use_chunked)
  else
    write_multiple_chunks(socket, response, body_array, use_chunked)
  end
end

#write_enumerable_body(socket, response, body, use_chunked) ⇒ void

This method returns an undefined value.

Writes a generic enumerable body to the socket.

Parameters:

  • socket (TCPSocket)

    the client socket

  • response (String)

    headers already serialized, to be written before the body

  • body (Object)

    any object responding to each

  • use_chunked (Boolean)

    whether to use chunked transfer encoding

Raises:

  • (TypeError)

    if any yielded chunk is not a String



928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
# File 'lib/raptor/request.rb', line 928

def write_enumerable_body(socket, response, body, use_chunked)
  if use_chunked
    socket_write(socket, response)
    body.each do |chunk|
      raise TypeError, "body must yield String values" unless chunk.is_a?(String)

      next if chunk.empty?

      socket_write(socket, "#{chunk.bytesize.to_s(16)}\r\n#{chunk}\r\n")
    end
  else
    body.each do |chunk|
      raise TypeError, "body must yield String values" unless chunk.is_a?(String)

      response << chunk
    end
    socket_write(socket, response)
  end
end

#write_file_body(socket, response, path, content_length, use_chunked) ⇒ void

This method returns an undefined value.

Writes a file body to the socket.

Uses zero-copy IO.copy_stream for large files, direct buffering for small ones, and chunked encoding when required.

Parameters:

  • socket (TCPSocket)

    the client socket

  • response (String)

    headers already serialized, to be written before the body

  • path (String)

    filesystem path of the file to send

  • content_length (Integer, nil)

    pre-calculated file size

  • use_chunked (Boolean)

    whether to use chunked transfer encoding



825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
# File 'lib/raptor/request.rb', line 825

def write_file_body(socket, response, path, content_length, use_chunked)
  File.open(path, "rb") do |file|
    if use_chunked
      socket_write(socket, response)
      while (chunk = file.read(FILE_CHUNK_SIZE))
        socket_write(socket, "#{chunk.bytesize.to_s(16)}\r\n#{chunk}\r\n")
      end
    elsif content_length && content_length < BODY_BUFFER_THRESHOLD
      response << file.read(content_length)
      socket_write(socket, response)
    else
      socket_write(socket, response)
      IO.copy_stream(file, socket)
    end
  end
end

#write_full_response(socket, response, headers, body, http_version) ⇒ void

This method returns an undefined value.

Writes a complete response with a body.

Selects the appropriate write strategy based on body type: callable (streaming), file (zero-copy), array, or generic enumerable. Automatically determines content-length where possible, falling back to chunked transfer encoding for HTTP/1.1 when the length cannot be determined upfront.

Parameters:

  • socket (TCPSocket)

    the client socket

  • response (String)

    the status line accumulated so far

  • headers (Hash)

    normalized response headers

  • body (Object)

    the response body

  • http_version (String)

    "HTTP/1.1" or "HTTP/1.0"



747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
# File 'lib/raptor/request.rb', line 747

def write_full_response(socket, response, headers, body, http_version)
  if body.respond_to?(:call)
    response << format_headers(headers)
    response << "\r\n"
    socket_write(socket, response)
    uncork_socket(socket)
    body.call(socket)
    return
  end

  content_length = headers[Rack::CONTENT_LENGTH]&.to_i
  use_chunked = false

  if !content_length || content_length == 0
    calculated_length = calculate_content_length(body)
    if calculated_length
      content_length = calculated_length
    elsif http_version == HTTP_11 && !headers.key?(Rack::TRANSFER_ENCODING)
      use_chunked = true
    end
  end

  if content_length && content_length >= 0
    headers[Rack::CONTENT_LENGTH] = content_length.to_s
  elsif use_chunked
    headers[Rack::TRANSFER_ENCODING] = TRANSFER_ENCODING_CHUNKED
  end

  response << format_headers(headers)
  response << "\r\n"

  if body.respond_to?(:to_path) && (path = body.to_path) && File.readable?(path)
    write_file_body(socket, response, path, content_length, use_chunked)
  elsif body.respond_to?(:to_ary)
    write_array_body(socket, response, body.to_ary, use_chunked)
  elsif body.respond_to?(:each)
    write_enumerable_body(socket, response, body, use_chunked)
  else
    raise TypeError, "body must respond to each, to_ary, or to_path"
  end

  socket_write(socket, "0\r\n\r\n") if use_chunked
end

#write_hijacked_response(socket, response, headers, response_hijack) ⇒ void

This method returns an undefined value.

Writes response headers and delegates body writing to the hijack callback.

Uncorks the socket before calling the hijack so the app has full control of the raw connection.

Parameters:

  • socket (TCPSocket)

    the client socket

  • response (String)

    the status line accumulated so far

  • headers (Hash)

    normalized response headers

  • response_hijack (Proc)

    callable that receives the socket and writes the body



701
702
703
704
705
706
707
# File 'lib/raptor/request.rb', line 701

def write_hijacked_response(socket, response, headers, response_hijack)
  response << format_headers(headers)
  response << "\r\n"
  socket_write(socket, response)
  uncork_socket(socket)
  response_hijack.call(socket)
end

#write_multiple_chunks(socket, response, body_array, use_chunked) ⇒ void

This method returns an undefined value.

Writes a multi-element array body to the socket.

Parameters:

  • socket (TCPSocket)

    the client socket

  • response (String)

    headers already serialized, to be written before the body

  • body_array (Array<String>)

    the response body chunks

  • use_chunked (Boolean)

    whether to use chunked transfer encoding

Raises:

  • (TypeError)

    if any chunk is not a String



898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
# File 'lib/raptor/request.rb', line 898

def write_multiple_chunks(socket, response, body_array, use_chunked)
  if use_chunked
    socket_write(socket, response)
    body_array.each do |chunk|
      raise TypeError, "body must yield String values" unless chunk.is_a?(String)

      next if chunk.empty?

      socket_write(socket, "#{chunk.bytesize.to_s(16)}\r\n#{chunk}\r\n")
    end
  else
    body_array.each do |chunk|
      raise TypeError, "body must yield String values" unless chunk.is_a?(String)

      response << chunk
    end
    socket_write(socket, response)
  end
end

#write_no_body_response(socket, response, headers, status) ⇒ void

This method returns an undefined value.

Writes a response with no entity body.

Used for HEAD requests and status codes that must not carry a body (204, 304, 1xx). Adds a zero content-length for non-no-body statuses that did not supply one.

Parameters:

  • socket (TCPSocket)

    the client socket

  • response (String)

    the status line accumulated so far

  • headers (Hash)

    normalized response headers

  • status (Integer)

    HTTP status code



722
723
724
725
726
727
728
729
730
# File 'lib/raptor/request.rb', line 722

def write_no_body_response(socket, response, headers, status)
  unless STATUS_WITH_NO_ENTITY_BODY.include?(status)
    headers[Rack::CONTENT_LENGTH] = "0" unless headers.key?(Rack::CONTENT_LENGTH) || headers.key?(Rack::TRANSFER_ENCODING)
  end

  response << format_headers(headers)
  response << "\r\n"
  socket_write(socket, response)
end

#write_response(socket, env, status, headers, body, keep_alive: false) ⇒ void

This method returns an undefined value.

Writes a complete HTTP response to the socket.

Handles header normalization, validation, connection management, TCP corking, and dispatches to the appropriate body write strategy.

Parameters:

  • socket (TCPSocket)

    the client socket to write to

  • env (Hash)

    the Rack environment

  • status (Integer)

    HTTP status code

  • headers (Hash)

    response headers from the Rack application

  • body (Object)

    response body (array, enumerable, file, or callable)

  • keep_alive (Boolean) (defaults to: false)

    whether to send a keep-alive connection header

  • keep_alive: (Boolean) (defaults to: false)


588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
# File 'lib/raptor/request.rb', line 588

def write_response(socket, env, status, headers, body, keep_alive: false)
  validate_status(status)
  response_hijack = headers.is_a?(Hash) ? headers.delete(Rack::RACK_HIJACK) : nil
  headers = normalize_headers(headers)
  validate_headers(headers, status)

  headers["connection"] = keep_alive ? CONNECTION_KEEPALIVE : CONNECTION_CLOSE

  http_version = env[Rack::SERVER_PROTOCOL] == HTTP_11 ? HTTP_11 : HTTP_10
  no_body = env[Rack::REQUEST_METHOD] == "HEAD" || STATUS_WITH_NO_ENTITY_BODY.include?(status)

  response = build_status_line(http_version, status)

  cork_socket(socket)

  if response_hijack
    write_hijacked_response(socket, response, headers, response_hijack)
  elsif no_body
    write_no_body_response(socket, response, headers, status)
  else
    write_full_response(socket, response, headers, body, http_version)
  end
ensure
  body.close if body.respond_to?(:close)
  uncork_socket(socket)
  socket.flush rescue nil
end

#write_single_chunk(socket, response, chunk, use_chunked) ⇒ void

This method returns an undefined value.

Writes a single-element array body, optionally buffering it with the headers.

Small bodies are concatenated with the headers into one write to reduce system call overhead.

Parameters:

  • socket (TCPSocket)

    the client socket

  • response (String)

    headers already serialized, to be written before the body

  • chunk (String)

    the single body chunk

  • use_chunked (Boolean)

    whether to use chunked transfer encoding

Raises:

  • (TypeError)

    if the chunk is not a String



874
875
876
877
878
879
880
881
882
883
884
885
886
# File 'lib/raptor/request.rb', line 874

def write_single_chunk(socket, response, chunk, use_chunked)
  raise TypeError, "body must yield String values" unless chunk.is_a?(String)

  if use_chunked
    response << "#{chunk.bytesize.to_s(16)}\r\n#{chunk}\r\n"
    socket_write(socket, response)
  elsif chunk.bytesize < BODY_BUFFER_THRESHOLD
    socket_write(socket, response << chunk)
  else
    socket_write(socket, response)
    socket_write(socket, chunk)
  end
end