Class: AtomicRuby::AtomicThreadPool
- Inherits:
-
Object
- Object
- AtomicRuby::AtomicThreadPool
- Defined in:
- lib/atomic-ruby/atomic_thread_pool.rb,
sig/generated/atomic-ruby/atomic_thread_pool.rbs
Overview
This class is NOT Ractor-safe as it contains mutable thread state that cannot be safely shared across ractors.
Provides a fixed-size thread pool using atomic operations for work queuing.
AtomicThreadPool maintains a fixed number of worker threads that process work items from an atomic queue. The pool uses compare-and-swap operations for thread-safe work enqueueing and state management.
The queue is implemented as a two-stack structure (in/out) backed by
immutable frozen linked-list nodes. Enqueueing prepends to the in stack
in O(1); dequeueing pops from the out stack in O(1), reversing in into
out only when out is exhausted (amortized O(1) per item).
Defined Under Namespace
Classes: EnqueuedWorkAfterShutdownError, Error
Instance Method Summary collapse
-
#<<(work) ⇒ void
Enqueues work to be executed by the thread pool.
-
#active_count ⇒ Integer
Returns the number of worker threads currently executing work.
-
#initialize(size:, name: nil, on_error: nil) ⇒ AtomicThreadPool
constructor
Creates a new thread pool with the specified size.
-
#length ⇒ Integer
(also: #size)
Returns the number of currently alive worker threads.
-
#queue_length ⇒ Integer
(also: #queue_size)
Returns the number of work items currently queued for execution.
-
#reverse_list(node) ⇒ Hash?
Reverses a linked-list of frozen nodes, returning a new reversed list.
-
#shutdown ⇒ void
Gracefully shuts down the thread pool.
-
#start ⇒ void
Starts the worker threads for the thread pool.
Constructor Details
#initialize(size:, name: nil, on_error: nil) ⇒ AtomicThreadPool
Creates a new thread pool with the specified size.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 79 def initialize(size:, name: nil, on_error: nil) raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0 raise ArgumentError, "name must be a String" unless name.nil? || name.is_a?(String) raise ArgumentError, "on_error must be a Proc" unless on_error.nil? || on_error.is_a?(Proc) @size = size @name = name @on_error = on_error @state = Atom.new(in: nil, out: nil, count: 0, shutdown: false) @started_thread_count = Atom.new(0) @active_thread_count = Atom.new(0) @threads = [] start end |
Instance Method Details
#<<(work) ⇒ void
This method returns an undefined value.
Enqueues work to be executed by the thread pool.
The work item must respond to #call (typically a Proc or lambda). Work items are executed in FIFO order by available worker threads. If all workers are busy, the work is queued atomically. Enqueueing is O(1) regardless of current queue depth.
119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 119 def <<(work) state = @state.swap do |current_state| if current_state[:shutdown] current_state else new_node = { value: work, next: current_state[:in] }.freeze current_state.merge(in: new_node, count: current_state[:count] + 1) end end raise EnqueuedWorkAfterShutdownError if state[:shutdown] @threads.each(&:wakeup) end |
#active_count ⇒ Integer
Returns the number of worker threads currently executing work.
This represents threads that have picked up a work item and are actively processing it. The count includes threads in the middle of executing work.call, but excludes threads that are idle or waiting for work.
201 202 203 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 201 def active_count @active_thread_count.value end |
#length ⇒ Integer Also known as: size
Returns the number of currently alive worker threads.
This count decreases as the pool shuts down and threads terminate. During normal operation, this should equal the size parameter passed to the constructor.
148 149 150 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 148 def length @threads.select(&:alive?).length end |
#queue_length ⇒ Integer Also known as: queue_size
Returns the number of work items currently queued for execution.
This represents work that has been enqueued but not yet picked up by a worker thread. A high queue length indicates that work is being submitted faster than it can be processed.
170 171 172 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 170 def queue_length @state.value[:count] end |
#reverse_list(node) ⇒ Hash?
Reverses a linked-list of frozen nodes, returning a new reversed list. Each node in the returned list is a new frozen hash.
316 317 318 319 320 321 322 323 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 316 def reverse_list(node) reversed = nil while node reversed = { value: node[:value], next: reversed }.freeze node = node[:next] end reversed end |
#shutdown ⇒ void
This method returns an undefined value.
Gracefully shuts down the thread pool.
This method:
- Marks the pool as shutdown (preventing new work from being enqueued)
- Waits for all currently queued work to complete
- Waits for all worker threads to terminate
After shutdown, all worker threads will be terminated and the pool cannot be restarted. Attempting to enqueue work after shutdown will raise an exception.
227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 227 def shutdown already_shutdown = false @state.swap do |current_state| if current_state[:shutdown] already_shutdown = true current_state else current_state.merge(shutdown: true) end end return if already_shutdown @threads.each(&:join) end |
#start ⇒ void
This method returns an undefined value.
Starts the worker threads for the thread pool.
This method is called automatically during initialization. It creates the specified number of worker threads and waits for all threads to be fully started before returning.
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 253 def start @size.times do |num| @threads << Thread.new(num) do |idx| thread_name = String.new("AtomicThreadPool thread #{idx}") thread_name << " for #{@name}" if @name Thread.current.name = thread_name @started_thread_count.swap { |current_count| current_count + 1 } loop do work = nil should_shutdown = false @state.swap do |current_state| if current_state[:shutdown] && current_state[:in].nil? && current_state[:out].nil? should_shutdown = true current_state elsif current_state[:out] work = current_state[:out][:value] current_state.merge(out: current_state[:out][:next], count: current_state[:count] - 1) elsif current_state[:in] new_out = reverse_list(current_state[:in]) work = new_out[:value] current_state.merge(in: nil, out: new_out[:next], count: current_state[:count] - 1) else current_state end end if should_shutdown break elsif work @active_thread_count.swap { |current_count| current_count + 1 } begin work.call rescue => err if @on_error @on_error.call(err) else warn "#{thread_name} rescued:" warn err. end ensure @active_thread_count.swap { |current_count| current_count - 1 } end else sleep 0.001 end end end end @threads.freeze Thread.pass until @started_thread_count.value == @size end |