Class: AtomicRuby::AtomicThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/atomic-ruby/atomic_thread_pool.rb

Overview

Note:

This class is NOT Ractor-safe as it contains mutable thread state that cannot be safely shared across ractors.

Provides a fixed-size thread pool using atomic operations for work queuing.

AtomicThreadPool maintains a fixed number of worker threads that process work items from an atomic queue. The pool uses compare-and-swap operations for thread-safe work enqueueing and state management.

Examples:

Basic usage

pool = AtomicThreadPool.new(size: 4)
pool << proc { puts "Hello from worker thread!" }
pool << proc { puts "Another work item" }
pool.shutdown

Processing work with results

results = []
pool = AtomicThreadPool.new(size: 2, name: "Calculator")

10.times do |index|
  pool << proc { results << index * 2 }
end

pool.shutdown
puts results.sort #=> [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Monitoring pool state

pool = AtomicThreadPool.new(size: 3)
puts pool.length        #=> 3
puts pool.queue_length  #=> 0
puts pool.active_count  #=> 0

5.times { pool << proc { sleep(1) } }
puts pool.queue_length  #=> 2 (3 workers busy, 2 queued)
puts pool.active_count  #=> 3 (3 workers processing)

Defined Under Namespace

Classes: EnqueuedWorkAfterShutdownError, Error

Instance Method Summary collapse

Constructor Details

#initialize(size:, name: nil) ⇒ AtomicThreadPool

Creates a new thread pool with the specified size.

Examples:

Create a basic pool

pool = AtomicThreadPool.new(size: 4)

Create a named pool

pool = AtomicThreadPool.new(size: 2, name: "Database Workers")

Parameters:

  • size (Integer)

    The number of worker threads to create (must be positive)

  • name (String, nil) (defaults to: nil)

    Optional name for the thread pool (used in thread names)

Raises:

  • (ArgumentError)

    if size is not a positive integer

  • (ArgumentError)

    if name is provided but not a string



66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 66

def initialize(size:, name: nil)
  raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0
  raise ArgumentError, "name must be a String" unless name.nil? || name.is_a?(String)

  @size = size
  @name = name

  @state = Atom.new(queue: [], shutdown: false)
  @started_thread_count = Atom.new(0)
  @active_thread_count = Atom.new(0)
  @threads = []

  start
end

Instance Method Details

#<<(work) ⇒ Object

Enqueues work to be executed by the thread pool.

The work item must respond to #call (typically a Proc or lambda). Work items are executed in FIFO order by available worker threads. If all workers are busy, the work is queued atomically.

Examples:

Enqueue a simple task

pool << proc { puts "Hello World" }

Enqueue a lambda with parameters

calculator = ->(a, b) { puts a + b }
pool << proc { calculator.call(2, 3) }

Enqueue work that captures variables

name = "Alice"
pool << proc { puts "Processing #{name}" }

Parameters:

  • work (#call)

    A callable object to be executed by a worker thread

Raises:



103
104
105
106
107
108
109
110
111
112
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 103

def <<(work)
  state = @state.swap do |current_state|
    if current_state[:shutdown]
      current_state
    else
      current_state.merge(queue: [*current_state[:queue], work])
    end
  end
  raise EnqueuedWorkAfterShutdownError if state[:shutdown]
end

#active_countInteger

Returns the number of worker threads currently executing work.

This represents threads that have picked up a work item and are actively processing it. The count includes threads in the middle of executing work.call, but excludes threads that are idle or waiting for work.

Examples:

Monitor active workers

pool = AtomicThreadPool.new(size: 4)
puts pool.active_count #=> 0

5.times { pool << proc { sleep(1) } }
sleep(0.1) # Give threads time to pick up work
puts pool.active_count #=> 4 (all workers busy)
puts pool.queue_length #=> 1 (one item still queued)

Calculate total load

total_load = pool.active_count + pool.queue_length
puts "Total pending work: #{total_load}"

Returns:

  • (Integer)

    The number of threads actively processing work



180
181
182
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 180

def active_count
  @active_thread_count.value
end

#lengthInteger Also known as: size

Returns the number of currently alive worker threads.

This count decreases as the pool shuts down and threads terminate. During normal operation, this should equal the size parameter passed to the constructor.

Examples:

pool = AtomicThreadPool.new(size: 4)
puts pool.length #=> 4
pool.shutdown
puts pool.length #=> 0

Returns:

  • (Integer)

    The number of alive worker threads



129
130
131
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 129

def length
  @threads.select(&:alive?).length
end

#queue_lengthInteger Also known as: queue_size

Returns the number of work items currently queued for execution.

This represents work that has been enqueued but not yet picked up by a worker thread. A high queue length indicates that work is being submitted faster than it can be processed.

Examples:

pool = AtomicThreadPool.new(size: 2)
5.times { pool << proc { sleep(1) } }
puts pool.queue_length #=> 3 (2 workers busy, 3 queued)

Returns:

  • (Integer)

    The number of queued work items



150
151
152
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 150

def queue_length
  @state.value[:queue].length
end

#shutdownvoid

This method returns an undefined value.

Gracefully shuts down the thread pool.

This method:

  1. Marks the pool as shutdown (preventing new work from being enqueued)

  2. Waits for all currently queued work to complete

  3. Waits for all worker threads to terminate

After shutdown, all worker threads will be terminated and the pool cannot be restarted. Attempting to enqueue work after shutdown will raise an exception.

Examples:

pool = AtomicThreadPool.new(size: 4)
10.times { |index| pool << proc { puts index } }
pool.shutdown # waits for all work to complete
puts pool.length #=> 0

Raises:



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 206

def shutdown
  already_shutdown = false
  @state.swap do |current_state|
    if current_state[:shutdown]
      already_shutdown = true
      current_state
    else
      current_state.merge(shutdown: true)
    end
  end
  return if already_shutdown

  Thread.pass until @state.value[:queue].empty?

  @threads.each(&:join)
end