Class: RactorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/ractor-pool.rb,
lib/ractor-pool/version.rb,
sig/generated/ractor-pool.rbs,
sig/generated/ractor-pool/version.rbs

Overview

A thread-safe, lock-free pool of Ractor workers with a coordinator pattern for distributing work.

RactorPool manages a fixed number of worker ractors that process work items in parallel. Work is distributed on-demand to idle workers, ensuring efficient utilisation. Results are collected and passed to a result handler running in a separate thread.

Examples:

Basic usage

results = []
worker = -> (work) { work * 2 }
pool = RactorPool.new(size: 4, worker: worker) { |result| results << result }

10.times { |index| pool << index }
pool.shutdown

p results #=> [2, 0, 6, 4, 14, 10, 8, 16, 18, 12]

Without result handler

counter = Atom.new(0)
worker = proc do |work|
  counter.swap { |current_value| current_value + 1 }
  work * 2
end
pool = RactorPool.new(size: 4, worker: worker)

10.times { |index| pool << index }
pool.shutdown

p counter.value #=> 10

See Also:

Defined Under Namespace

Classes: EnqueuedWorkAfterShutdownError, Error

Constant Summary collapse

SHUTDOWN =

Returns:

  • (::Symbol)
:shutdown
VERSION =

Returns:

  • (::String)
"0.3.1"

Instance Method Summary collapse

Constructor Details

#initialize(size: Etc.nprocessors, worker:, name: nil, on_error: nil) {|result| ... } ⇒ RactorPool

Creates a new RactorPool with the specified number of workers.

Examples:

With result handler

pool = RactorPool.new(size: 4, worker: proc { it }) { |result| puts result }

Without result handler

pool = RactorPool.new(size: 4, worker: proc { it })

With error handler

error_count = Atom.new(0)
on_error = proc { error_count.swap { |count| count + 1 } }
pool = RactorPool.new(size: 4, worker: proc { raise }, on_error: on_error)

Parameters:

  • size (Integer) (defaults to: Etc.nprocessors)

    number of worker ractors to create

  • worker (Proc)

    a shareable proc that processes each work item

  • name (String, nil) (defaults to: nil)

    optional name for the pool, used in thread/ractor names

  • on_error (Proc, nil) (defaults to: nil)

    optional shareable proc called with the raised exception when a worker raises

  • worker: (^(untyped) -> untyped)
  • size: (Integer) (defaults to: Etc.nprocessors)
  • name: (String, nil) (defaults to: nil)
  • on_error: (^(Exception) -> void, nil) (defaults to: nil)

Yield Parameters:

  • result (Object)

    the result returned by the worker proc

Raises:

  • (ArgumentError)

    if size is not a positive integer

  • (ArgumentError)

    if worker is not a proc

  • (ArgumentError)

    if on_error is given but is not a proc



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/ractor-pool.rb', line 92

def initialize(size: Etc.nprocessors, worker:, name: nil, on_error: nil, &result_handler)
  raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0
  raise ArgumentError, "worker must be a Proc" unless worker.is_a?(Proc)
  raise ArgumentError, "on_error must be a Proc" if on_error && !on_error.is_a?(Proc)

  @size = size
  @worker = Ractor.shareable_proc(&worker)
  @name = name
  @on_error = Ractor.shareable_proc(&on_error) if on_error
  @result_handler = result_handler

  @in_flight = Atom.new(0)
  @shutdown  = Atom.new(false)

  @result_port = Ractor::Port.new if result_handler
  @error_port  = Ractor::Port.new unless on_error
  @coordinator = start_coordinator if size > 1
  @workers = start_workers
  @collector = start_collector
  @error_collector = start_error_collector
end

Instance Method Details

#<<(work) ⇒ void

This method returns an undefined value.

Queues a work item to be processed by an available worker.

Examples:

pool << "http://example.com/page1"
pool << "http://example.com/page2"

Parameters:

  • work (Object)

    the work item to process

Raises:



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/ractor-pool.rb', line 125

def <<(work)
  raise EnqueuedWorkAfterShutdownError if @shutdown.value

  @in_flight.swap { |count| count + 1 }

  if @shutdown.value
    @in_flight.swap { |count| count - 1 }
    raise EnqueuedWorkAfterShutdownError
  end

  begin
    (@coordinator || @workers.first).send(work, move: true)
  ensure
    @in_flight.swap { |count| count - 1 }
  end
end

#shutdownvoid

This method returns an undefined value.

Shuts down the pool gracefully.

This method:

  1. Prevents new work from being queued
  2. Waits for all in-flight work submissions to complete
  3. Allows all queued work to complete
  4. Waits for all workers to finish
  5. Waits for all results and errors to be processed

This method is idempotent and can be called multiple times safely.

Examples:

pool.shutdown


159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/ractor-pool.rb', line 159

def shutdown
  already_shutdown = false
  @shutdown.swap do |current|
    if current
      already_shutdown = true
      current
    else
      true
    end
  end
  return if already_shutdown

  Thread.pass until @in_flight.value.zero?

  if @coordinator
    @coordinator.send(SHUTDOWN, move: true)
    @workers.each(&:join)
    @coordinator.join
  else
    @workers.first.send(SHUTDOWN, move: true)
    @workers.each(&:join)
    @result_port&.send(SHUTDOWN, move: true)
  end
  @error_port&.send(SHUTDOWN, move: true)
  @error_collector&.join
  @collector&.join
end

#start_collectorThread?

Returns:

  • (Thread, nil)


269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/ractor-pool.rb', line 269

def start_collector
  return unless @result_handler

  thread_name = String.new("#{self.class.name} collector thread")
  thread_name << " for #{@name}" if @name

  Thread.new(@result_port, @result_handler, thread_name) do |result_port, result_handler, name|
    Thread.current.name = name

    loop do
      result = result_port.receive
      break if result == SHUTDOWN

      result_handler.call(result)
    end
  end
end

#start_coordinatorRactor

Returns:

  • (Ractor)


190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/ractor-pool.rb', line 190

def start_coordinator
  ractor_name = String.new("#{self.class.name} coordinator ractor")
  ractor_name << " for #{@name}" if @name

  Ractor.new(@size, @result_port, name: ractor_name) do |worker_count, result_port|
    work_queue = []
    waiting_workers = []
    shutdown_received = false
    workers_finished = 0

    loop do
      case data = Ractor.receive
      when SHUTDOWN
        shutdown_received = true

        workers_finished += waiting_workers.size
        waiting_workers.each { |worker| worker.send(SHUTDOWN, move: true) }
        waiting_workers.clear
        if workers_finished == worker_count
          result_port&.send(SHUTDOWN, move: true)
          break
        end

      when Ractor
        ractor = data

        if work_queue.any?
          ractor.send(work_queue.shift, move: true)
        elsif shutdown_received
          ractor.send(SHUTDOWN, move: true)

          workers_finished += 1
          if workers_finished == worker_count
            result_port&.send(SHUTDOWN, move: true)
            break
          end
        else
          waiting_workers << ractor
        end

      else
        work = data

        if waiting_workers.any?
          waiting_workers.shift.send(work, move: true)
        else
          work_queue << work
        end
      end
    end
  end
end

#start_error_collectorThread?

Returns:

  • (Thread, nil)


288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/ractor-pool.rb', line 288

def start_error_collector
  return if @on_error

  thread_name = String.new("#{self.class.name} error collector thread")
  thread_name << " for #{@name}" if @name

  Thread.new(@error_port, thread_name) do |error_port, name|
    Thread.current.name = name

    loop do
      message = error_port.receive
      break if message == SHUTDOWN

      warn message
    end
  end
end

#start_workersArray[Ractor]

Returns:

  • (Array[Ractor])


244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/ractor-pool.rb', line 244

def start_workers
  @size.times.map do |index|
    ractor_name = String.new("#{self.class.name} ractor #{index}")
    ractor_name << " for #{@name}" if @name

    Ractor.new(@worker, @on_error, @error_port, @coordinator, @result_port, name: ractor_name) do |worker, on_error, error_port, coordinator, result_port|
      loop do
        coordinator&.send(Ractor.current, move: true)

        work = Ractor.receive
        break if work == SHUTDOWN

        begin
          result = worker.call(work)

          result_port&.send(result, move: true)
        rescue => error
          on_error ? on_error.call(error) : error_port.send(error.full_message, move: true)
        end
      end
    end
  end
end