Class: RactorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/ractor-pool.rb,
lib/ractor-pool/version.rb

Overview

A thread-safe, lock-free pool of Ractor workers with a coordinator pattern for distributing work.

RactorPool manages a fixed number of worker ractors that process work items in parallel. Work is distributed on-demand to idle workers, ensuring efficient utilisation. Results are collected and passed to a result handler running in a separate thread.

Examples:

Basic usage

results = []
worker = -> (work) { work * 2 }
pool = RactorPool.new(size: 4, worker: worker) { |result| results << result }

10.times { |index| pool << index }
pool.shutdown

p results #=> [2, 0, 6, 4, 14, 10, 8, 16, 18, 12]

Without result handler

counter = Atom.new(0)
worker = proc do |work|
  counter.swap { |current_value| current_value + 1 }
  work * 2
end
pool = RactorPool.new(size: 4, worker: worker)

10.times { |index| pool << index }
pool.shutdown

p counter.value #=> 10

See Also:

Defined Under Namespace

Classes: EnqueuedWorkAfterShutdownError, Error

Constant Summary collapse

VERSION =
"0.1.3"

Instance Method Summary collapse

Constructor Details

#initialize(size: Etc.nprocessors, worker:, name: nil) {|result| ... } ⇒ void

Creates a new RactorPool with the specified number of workers.

Examples:

With result handler

pool = RactorPool.new(size: 4, worker: proc { it }) { |result| puts result }

Without result handler

pool = RactorPool.new(size: 4, worker: proc { it })

Parameters:

  • size (Integer) (defaults to: Etc.nprocessors)

    number of worker ractors to create

  • worker (Proc)

    a shareable proc that processes each work item

  • name (String, nil) (defaults to: nil)

    optional name for the pool, used in thread/ractor names

Yield Parameters:

  • result (Object)

    the result returned by the worker proc

Raises:

  • (ArgumentError)

    if size is not a positive integer

  • (ArgumentError)

    if worker is not a proc



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/ractor-pool.rb', line 81

def initialize(size: Etc.nprocessors, worker:, name: nil, &result_handler)
  raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0
  raise ArgumentError, "worker must be a Proc" unless worker.is_a?(Proc)

  @size = size
  @worker = Ractor.shareable_proc(&worker)
  @name = name
  @result_handler = result_handler

  @state = Atom.new(in_flight: 0, shutdown: false)

  @result_port = Ractor::Port.new if result_handler
  @coordinator = start_coordinator if size > 1
  @workers = start_workers
  @collector = start_collector
end

Instance Method Details

#<<(work) ⇒ void

This method returns an undefined value.

Queues a work item to be processed by an available worker.

Examples:

pool << "http://example.com/page1"
pool << "http://example.com/page2"

Parameters:

  • work (Object)

    the work item to process

Raises:



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/ractor-pool.rb', line 109

def <<(work)
  state = @state.swap do |current_state|
    if current_state[:shutdown]
      current_state
    else
      current_state.merge(in_flight: current_state[:in_flight] + 1)
    end
  end
  raise EnqueuedWorkAfterShutdownError if state[:shutdown]

  begin
    (@coordinator || @workers.first).send(work, move: true)
  ensure
    @state.swap do |current_state|
      current_state.merge(in_flight: current_state[:in_flight] - 1)
    end
  end
end

#shutdownvoid

This method returns an undefined value.

Shuts down the pool gracefully.

This method:

  1. Prevents new work from being queued

  2. Waits for all in-flight work submissions to complete

  3. Allows all queued work to complete

  4. Waits for all workers to finish

  5. Waits for all results to be processed

This method is idempotent and can be called multiple times safely.

Examples:

pool.shutdown


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/ractor-pool.rb', line 145

def shutdown
  already_shutdown = false
  @state.swap do |current_state|
    if current_state[:shutdown]
      already_shutdown = true
      current_state
    else
      current_state.merge(shutdown: true)
    end
  end
  return if already_shutdown

  Thread.pass until @state.value[:in_flight] == 0

  @coordinator&.send(SHUTDOWN, move: true) ||
    (@workers.first.send(SHUTDOWN, move: true) && @result_port&.send(SHUTDOWN, move: true))
  @workers.each(&:join)
  @coordinator&.join
  @collector&.join
end