Class: AtomicRuby::AtomicThreadPool
- Inherits:
-
Object
- Object
- AtomicRuby::AtomicThreadPool
- Defined in:
- lib/atomic-ruby/atomic_thread_pool.rb
Overview
This class is NOT Ractor-safe as it contains mutable thread state that cannot be safely shared across ractors.
Provides a fixed-size thread pool using atomic operations for work queuing.
AtomicThreadPool maintains a fixed number of worker threads that process work items from an atomic queue. The pool uses compare-and-swap operations for thread-safe work enqueueing and state management.
Defined Under Namespace
Classes: EnqueuedWorkAfterShutdownError, Error
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Enqueues work to be executed by the thread pool.
-
#active_count ⇒ Integer
Returns the number of worker threads currently executing work.
-
#initialize(size:, name: nil) ⇒ AtomicThreadPool
constructor
Creates a new thread pool with the specified size.
-
#length ⇒ Integer
(also: #size)
Returns the number of currently alive worker threads.
-
#queue_length ⇒ Integer
(also: #queue_size)
Returns the number of work items currently queued for execution.
-
#shutdown ⇒ void
Gracefully shuts down the thread pool.
Constructor Details
#initialize(size:, name: nil) ⇒ AtomicThreadPool
Creates a new thread pool with the specified size.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 66 def initialize(size:, name: nil) raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0 raise ArgumentError, "name must be a String" unless name.nil? || name.is_a?(String) @size = size @name = name @state = Atom.new(queue: [], shutdown: false) @started_thread_count = Atom.new(0) @active_thread_count = Atom.new(0) @threads = [] start end |
Instance Method Details
#<<(work) ⇒ Object
Enqueues work to be executed by the thread pool.
The work item must respond to #call (typically a Proc or lambda). Work items are executed in FIFO order by available worker threads. If all workers are busy, the work is queued atomically.
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 103 def <<(work) state = @state.swap do |current_state| if current_state[:shutdown] current_state else current_state.merge(queue: [*current_state[:queue], work]) end end raise EnqueuedWorkAfterShutdownError if state[:shutdown] end |
#active_count ⇒ Integer
Returns the number of worker threads currently executing work.
This represents threads that have picked up a work item and are actively processing it. The count includes threads in the middle of executing work.call, but excludes threads that are idle or waiting for work.
180 181 182 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 180 def active_count @active_thread_count.value end |
#length ⇒ Integer Also known as: size
Returns the number of currently alive worker threads.
This count decreases as the pool shuts down and threads terminate. During normal operation, this should equal the size parameter passed to the constructor.
129 130 131 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 129 def length @threads.select(&:alive?).length end |
#queue_length ⇒ Integer Also known as: queue_size
Returns the number of work items currently queued for execution.
This represents work that has been enqueued but not yet picked up by a worker thread. A high queue length indicates that work is being submitted faster than it can be processed.
150 151 152 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 150 def queue_length @state.value[:queue].length end |
#shutdown ⇒ void
This method returns an undefined value.
Gracefully shuts down the thread pool.
This method:
-
Marks the pool as shutdown (preventing new work from being enqueued)
-
Waits for all currently queued work to complete
-
Waits for all worker threads to terminate
After shutdown, all worker threads will be terminated and the pool cannot be restarted. Attempting to enqueue work after shutdown will raise an exception.
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 206 def shutdown already_shutdown = false @state.swap do |current_state| if current_state[:shutdown] already_shutdown = true current_state else current_state.merge(shutdown: true) end end return if already_shutdown Thread.pass until @state.value[:queue].empty? @threads.each(&:join) end |