Class: RactorPool
- Inherits:
-
Object
- Object
- RactorPool
- Defined in:
- lib/ractor-pool.rb,
lib/ractor-pool/version.rb
Overview
A thread-safe, lock-free pool of Ractor workers with a coordinator pattern for distributing work.
RactorPool manages a fixed number of worker ractors that process work items in parallel. Work is distributed on-demand to idle workers, ensuring efficient utilisation. Results are collected and passed to a result handler running in a separate thread.
Defined Under Namespace
Classes: EnqueuedWorkAfterShutdownError, Error
Constant Summary collapse
- VERSION =
"0.1.3"
Instance Method Summary collapse
-
#<<(work) ⇒ void
Queues a work item to be processed by an available worker.
-
#initialize(size: Etc.nprocessors, worker:, name: nil) {|result| ... } ⇒ void
constructor
Creates a new RactorPool with the specified number of workers.
-
#shutdown ⇒ void
Shuts down the pool gracefully.
Constructor Details
#initialize(size: Etc.nprocessors, worker:, name: nil) {|result| ... } ⇒ void
Creates a new RactorPool with the specified number of workers.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/ractor-pool.rb', line 81 def initialize(size: Etc.nprocessors, worker:, name: nil, &result_handler) raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0 raise ArgumentError, "worker must be a Proc" unless worker.is_a?(Proc) @size = size @worker = Ractor.shareable_proc(&worker) @name = name @result_handler = result_handler @state = Atom.new(in_flight: 0, shutdown: false) @result_port = Ractor::Port.new if result_handler @coordinator = start_coordinator if size > 1 @workers = start_workers @collector = start_collector end |
Instance Method Details
#<<(work) ⇒ void
This method returns an undefined value.
Queues a work item to be processed by an available worker.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/ractor-pool.rb', line 109 def <<(work) state = @state.swap do |current_state| if current_state[:shutdown] current_state else current_state.merge(in_flight: current_state[:in_flight] + 1) end end raise EnqueuedWorkAfterShutdownError if state[:shutdown] begin (@coordinator || @workers.first).send(work, move: true) ensure @state.swap do |current_state| current_state.merge(in_flight: current_state[:in_flight] - 1) end end end |
#shutdown ⇒ void
This method returns an undefined value.
Shuts down the pool gracefully.
This method:
-
Prevents new work from being queued
-
Waits for all in-flight work submissions to complete
-
Allows all queued work to complete
-
Waits for all workers to finish
-
Waits for all results to be processed
This method is idempotent and can be called multiple times safely.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/ractor-pool.rb', line 145 def shutdown already_shutdown = false @state.swap do |current_state| if current_state[:shutdown] already_shutdown = true current_state else current_state.merge(shutdown: true) end end return if already_shutdown Thread.pass until @state.value[:in_flight] == 0 @coordinator&.send(SHUTDOWN, move: true) || (@workers.first.send(SHUTDOWN, move: true) && @result_port&.send(SHUTDOWN, move: true)) @workers.each(&:join) @coordinator&.join @collector&.join end |