Class: AtomicRuby::AtomicThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/atomic-ruby/atomic_thread_pool.rb,
sig/generated/atomic-ruby/atomic_thread_pool.rbs

Overview

Note:

This class is NOT Ractor-safe as it contains mutable thread state that cannot be safely shared across ractors.

Provides a fixed-size thread pool using atomic operations for work queuing.

AtomicThreadPool maintains a fixed number of worker threads that process work items from an atomic queue. The pool uses compare-and-swap operations for thread-safe work enqueueing and state management.

The queue is implemented as a two-stack structure (in/out) backed by immutable frozen linked-list nodes. Enqueueing prepends to the in stack in O(1); dequeueing pops from the out stack in O(1), reversing in into out only when out is exhausted (amortized O(1) per item).

Examples:

Basic usage

pool = AtomicThreadPool.new(size: 4)
pool << proc { puts "Hello from worker thread!" }
pool << proc { puts "Another work item" }
pool.shutdown

Processing work with results

results = []
pool = AtomicThreadPool.new(size: 2, name: "Calculator")

10.times do |index|
  pool << proc { results << index * 2 }
end

pool.shutdown
puts results.sort #=> [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Monitoring pool state

pool = AtomicThreadPool.new(size: 3)
puts pool.length        #=> 3
puts pool.queue_length  #=> 0
puts pool.active_count  #=> 0

5.times { pool << proc { sleep(1) } }
puts pool.queue_length  #=> 2 (3 workers busy, 2 queued)
puts pool.active_count  #=> 3 (3 workers processing)

Defined Under Namespace

Classes: EnqueuedWorkAfterShutdownError, Error

Instance Method Summary collapse

Constructor Details

#initialize(size:, name: nil, on_error: nil) ⇒ AtomicThreadPool

Creates a new thread pool with the specified size.

Examples:

Create a basic pool

pool = AtomicThreadPool.new(size: 4)

Create a named pool

pool = AtomicThreadPool.new(size: 2, name: "Database Workers")

Create a pool with a custom error handler

errors = []
pool = AtomicThreadPool.new(size: 2, on_error: ->(err) { errors << err })

Parameters:

  • size (Integer)

    The number of worker threads to create (must be positive)

  • name (String, nil) (defaults to: nil)

    Optional name for the thread pool (used in thread names)

  • on_error (Proc, nil) (defaults to: nil)

    Optional error handler called with the exception when a work item raises. Receives the exception as its argument. When nil, errors are printed to stderr

  • size: (Integer)
  • name: (String, nil) (defaults to: nil)
  • on_error: (Proc, nil) (defaults to: nil)

Raises:

  • (ArgumentError)

    if size is not a positive integer

  • (ArgumentError)

    if name is provided but not a string

  • (ArgumentError)

    if on_error is provided but not a Proc



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 79

def initialize(size:, name: nil, on_error: nil)
  raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0
  raise ArgumentError, "name must be a String" unless name.nil? || name.is_a?(String)
  raise ArgumentError, "on_error must be a Proc" unless on_error.nil? || on_error.is_a?(Proc)

  @size = size
  @name = name
  @on_error = on_error

  @state = Atom.new(in: nil, out: nil, count: 0, shutdown: false)
  @started_thread_count = Atom.new(0)
  @active_thread_count = Atom.new(0)
  @threads = []

  start
end

Instance Method Details

#<<(work) ⇒ void

This method returns an undefined value.

Enqueues work to be executed by the thread pool.

The work item must respond to #call (typically a Proc or lambda). Work items are executed in FIFO order by available worker threads. If all workers are busy, the work is queued atomically. Enqueueing is O(1) regardless of current queue depth.

Examples:

Enqueue a simple task

pool << proc { puts "Hello World" }

Enqueue a lambda with parameters

calculator = ->(a, b) { puts a + b }
pool << proc { calculator.call(2, 3) }

Enqueue work that captures variables

name = "Alice"
pool << proc { puts "Processing #{name}" }

Parameters:

  • work (#call)

    A callable object to be executed by a worker thread

Raises:



119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 119

def <<(work)
  state = @state.swap do |current_state|
    if current_state[:shutdown]
      current_state
    else
      new_node = { value: work, next: current_state[:in] }.freeze
      current_state.merge(in: new_node, count: current_state[:count] + 1)
    end
  end
  raise EnqueuedWorkAfterShutdownError if state[:shutdown]

  @threads.each(&:wakeup)
end

#active_countInteger

Returns the number of worker threads currently executing work.

This represents threads that have picked up a work item and are actively processing it. The count includes threads in the middle of executing work.call, but excludes threads that are idle or waiting for work.

Examples:

Monitor active workers

pool = AtomicThreadPool.new(size: 4)
puts pool.active_count #=> 0

5.times { pool << proc { sleep(1) } }
sleep(0.1) # Give threads time to pick up work
puts pool.active_count #=> 4 (all workers busy)
puts pool.queue_length #=> 1 (one item still queued)

Calculate total load

total_load = pool.active_count + pool.queue_length
puts "Total pending work: #{total_load}"

Returns:

  • (Integer)

    The number of threads actively processing work



201
202
203
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 201

def active_count
  @active_thread_count.value
end

#lengthInteger Also known as: size

Returns the number of currently alive worker threads.

This count decreases as the pool shuts down and threads terminate. During normal operation, this should equal the size parameter passed to the constructor.

Examples:

pool = AtomicThreadPool.new(size: 4)
puts pool.length #=> 4
pool.shutdown
puts pool.length #=> 0

Returns:

  • (Integer)

    The number of alive worker threads



148
149
150
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 148

def length
  @threads.select(&:alive?).length
end

#queue_lengthInteger Also known as: queue_size

Returns the number of work items currently queued for execution.

This represents work that has been enqueued but not yet picked up by a worker thread. A high queue length indicates that work is being submitted faster than it can be processed.

Examples:

pool = AtomicThreadPool.new(size: 2)
5.times { pool << proc { sleep(1) } }
puts pool.queue_length #=> 3 (2 workers busy, 3 queued)

Returns:

  • (Integer)

    The number of queued work items



170
171
172
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 170

def queue_length
  @state.value[:count]
end

#reverse_list(node) ⇒ Hash?

Reverses a linked-list of frozen nodes, returning a new reversed list. Each node in the returned list is a new frozen hash.

Parameters:

  • node (Hash, nil)

    Head of the list to reverse

Returns:

  • (Hash, nil)

    Head of the reversed list



316
317
318
319
320
321
322
323
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 316

def reverse_list(node)
  reversed = nil
  while node
    reversed = { value: node[:value], next: reversed }.freeze
    node = node[:next]
  end
  reversed
end

#shutdownvoid

This method returns an undefined value.

Gracefully shuts down the thread pool.

This method:

  1. Marks the pool as shutdown (preventing new work from being enqueued)
  2. Waits for all currently queued work to complete
  3. Waits for all worker threads to terminate

After shutdown, all worker threads will be terminated and the pool cannot be restarted. Attempting to enqueue work after shutdown will raise an exception.

Examples:

pool = AtomicThreadPool.new(size: 4)
10.times { |index| pool << proc { puts index } }
pool.shutdown # waits for all work to complete
puts pool.length #=> 0

Raises:



227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 227

def shutdown
  already_shutdown = false
  @state.swap do |current_state|
    if current_state[:shutdown]
      already_shutdown = true
      current_state
    else
      current_state.merge(shutdown: true)
    end
  end
  return if already_shutdown

  @threads.each(&:join)
end

#startvoid

This method returns an undefined value.

Starts the worker threads for the thread pool.

This method is called automatically during initialization. It creates the specified number of worker threads and waits for all threads to be fully started before returning.



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 253

def start
  @size.times do |num|
    @threads << Thread.new(num) do |idx|
      thread_name = String.new("AtomicThreadPool thread #{idx}")
      thread_name << " for #{@name}" if @name
      Thread.current.name = thread_name

      @started_thread_count.swap { |current_count| current_count + 1 }

      loop do
        work = nil
        should_shutdown = false

        @state.swap do |current_state|
          if current_state[:shutdown] && current_state[:in].nil? && current_state[:out].nil?
            should_shutdown = true
            current_state
          elsif current_state[:out]
            work = current_state[:out][:value]
            current_state.merge(out: current_state[:out][:next], count: current_state[:count] - 1)
          elsif current_state[:in]
            new_out = reverse_list(current_state[:in])
            work = new_out[:value]
            current_state.merge(in: nil, out: new_out[:next], count: current_state[:count] - 1)
          else
            current_state
          end
        end

        if should_shutdown
          break
        elsif work
          @active_thread_count.swap { |current_count| current_count + 1 }
          begin
            work.call
          rescue => err
            if @on_error
              @on_error.call(err)
            else
              warn "#{thread_name} rescued:"
              warn err.full_message
            end
          ensure
            @active_thread_count.swap { |current_count| current_count - 1 }
          end
        else
          sleep 0.001
        end
      end
    end
  end
  @threads.freeze

  Thread.pass until @started_thread_count.value == @size
end