From e721253e2133c9ebae4f6039de45ebe3c4ed9a47 Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Thu, 2 Apr 2026 11:29:31 +0200 Subject: [PATCH 01/14] Add async worker execution pools --- Gemfile.lock | 18 ++ README.md | 28 ++- lib/solid_queue/configuration.rb | 34 +++- lib/solid_queue/execution_pools.rb | 27 +++ lib/solid_queue/execution_pools/async_pool.rb | 167 ++++++++++++++++++ .../execution_pools/thread_pool.rb | 84 +++++++++ lib/solid_queue/pool.rb | 51 +----- lib/solid_queue/worker.rb | 21 ++- solid_queue.gemspec | 1 + test/unit/configuration_test.rb | 18 ++ test/unit/execution_pools/async_pool_test.rb | 62 +++++++ test/unit/worker_test.rb | 10 +- 12 files changed, 456 insertions(+), 65 deletions(-) create mode 100644 lib/solid_queue/execution_pools.rb create mode 100644 lib/solid_queue/execution_pools/async_pool.rb create mode 100644 lib/solid_queue/execution_pools/thread_pool.rb create mode 100644 test/unit/execution_pools/async_pool_test.rb diff --git a/Gemfile.lock b/Gemfile.lock index 7c4662de5..f1cda3c92 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -55,12 +55,22 @@ GEM rake thor (>= 0.14.0) ast (2.4.2) + async (2.38.1) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.18) base64 (0.3.0) benchmark (0.4.1) bigdecimal (3.3.1) builder (3.3.0) concurrent-ruby (1.3.5) connection_pool (2.5.4) + console (1.34.3) + fiber-annotation + fiber-local (~> 1.1) + json crass (1.0.6) date (3.4.1) debug (1.9.2) @@ -70,6 +80,10 @@ GEM erubi (1.13.1) et-orbi (1.2.11) tzinfo + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.1) fugit (1.11.1) et-orbi (~> 1, >= 1.2.11) raabro (~> 1.4) @@ -78,6 +92,7 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-event (1.14.5) irb (1.14.3) rdoc (>= 4.0.0) reline (>= 0.4.2) @@ -87,6 +102,7 @@ GEM loofah (2.23.1) crass (~> 1.0.2) nokogiri (>= 1.12.0) + metrics (0.15.0) minitest (5.26.0) mocha (2.1.0) ruby2_keywords (>= 0.0.5) @@ -181,6 +197,7 @@ GEM stringio (3.1.2) thor (1.3.2) timeout (0.4.3) + traces (0.18.2) tzinfo (2.0.6) concurrent-ruby (~> 1.0) unicode-display_width (3.1.3) @@ -199,6 +216,7 @@ PLATFORMS DEPENDENCIES appraisal + async (>= 2.24) debug (~> 1.9) logger minitest (~> 5.0) diff --git a/README.md b/README.md index f77c4d5a1..08edb1120 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,10 @@ Or you can also set the environment variable `SOLID_QUEUE_SUPERVISOR_MODE` to `a **The recommended and default mode is `fork`. Only use `async` if you know what you're doing and have strong reasons to** +This supervisor mode is separate from a worker's `execution_mode`. Supervisor mode decides whether supervised processes live in forks or threads. Worker execution mode decides whether a worker runs claimed jobs in a thread pool or as fibers on a single async reactor thread. + +Because these are separate concerns, you can combine the default `fork` supervisor mode with `execution_mode: async` on workers. In that setup, each worker process gets its own async reactor and bounded fiber capacity. + ## Configuration By default, Solid Queue will try to find your configuration under `config/queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG` or by using the `-c/--config_file` option with `bin/jobs`, like this: @@ -222,10 +226,12 @@ production: batch_size: 500 concurrency_maintenance_interval: 300 workers: - - queues: "*" - threads: 3 - polling_interval: 2 + - queues: "llm*" + execution_mode: async + capacity: 100 + polling_interval: 0.05 - queues: [ real_time, background ] + execution_mode: thread threads: 5 polling_interval: 0.1 processes: 3 @@ -271,9 +277,11 @@ Here's an overview of the different options: Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance). -- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting. -It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker thread uses one connection, and two additional connections are reserved for polling and heartbeat. -- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#fork-vs-async-mode). +- `execution_mode`: controls how a worker executes claimed jobs. `thread` is the default and uses the existing thread pool behavior. `async` executes jobs as fibers on a single reactor thread. `fiber` is accepted as an alias for `async`. +- `threads`: this is the execution capacity for a worker, and remains the backward-compatible configuration name. In `thread` mode, it is the max size of the thread pool. In `async` mode, it is the max number of in-flight jobs/fibers. By default, this is `3`. Only workers have this setting. +It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker uses connections for polling and heartbeat and thread mode may use additional connections for job execution. +- `capacity`: an alias for `threads`. This is the clearer name when `execution_mode: async`, because it refers to in-flight execution capacity rather than operating system threads. +- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. This works with both `execution_mode: thread` and `execution_mode: async` as long as the supervisor is running in the default `fork` mode. **Note**: this option is ignored only when the supervisor itself is [running in `async` mode](#fork-vs-async-mode). - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. @@ -364,7 +372,9 @@ queues: back* ### Threads, processes, and signals -Workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling. +By default, workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Workers can also be configured with `execution_mode: async`, in which case claimed jobs are executed as fibers on a single reactor thread and bounded by the worker's execution capacity. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling. + +Async worker execution is best suited for cooperative, mostly I/O-bound jobs. Blocking or CPU-heavy work still blocks the single reactor thread, so it should not be expected to outperform thread mode for every workload. The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode: - `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit. @@ -374,6 +384,10 @@ When receiving a `QUIT` signal, if workers still have jobs in-flight, these will If processes have no chance of cleaning up before exiting (e.g. if someone pulls a cable somewhere), in-flight jobs might remain claimed by the processes executing them. Processes send heartbeats, and the supervisor checks and prunes processes with expired heartbeats. Jobs that were claimed by processes with an expired heartbeat will be marked as failed with a `SolidQueue::Processes::ProcessPrunedError`. You can configure both the frequency of heartbeats and the threshold to consider a process dead. See the section below for this. +Worker heartbeats are driven by a separate timer task, not by the worker execution backend itself. This means async workers do not rely on the reactor loop to prove liveness. However, liveness is still tracked at the worker-process level, not at the individual thread or fiber level. + +This means finished and failed jobs still follow the normal Solid Queue lifecycle, but a single stuck job can remain claimed if the worker process itself is still alive. If you need stronger stuck-job detection, that requires an explicit timeout or watchdog mechanism on top of process heartbeats. + In a similar way, if a worker is terminated in any other way not initiated by the above signals (e.g. a worker is sent a `KILL` signal), jobs in progress will be marked as failed so that they can be inspected, with a `SolidQueue::Processes::ProcessExitError`. Sometimes a job in particular is responsible for this, for example, if it has a memory leak and you have a mechanism to kill processes over a certain memory threshold, so this will help identifying this kind of situation. diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index e63a000ca..16348603c 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -7,6 +7,7 @@ class Configuration validate :ensure_configured_processes validate :ensure_valid_recurring_tasks validate :ensure_correctly_sized_thread_pool + validate :ensure_valid_worker_execution_modes class Process < Struct.new(:kind, :attributes) def instantiate @@ -18,7 +19,8 @@ def instantiate queues: "*", threads: 3, processes: 1, - polling_interval: 0.1 + polling_interval: 0.1, + execution_mode: :thread } DISPATCHER_DEFAULTS = { @@ -95,6 +97,14 @@ def ensure_correctly_sized_thread_pool end end + def ensure_valid_worker_execution_modes + workers_options.each do |options| + SolidQueue::ExecutionPools.normalize_mode(options[:execution_mode] || WORKER_DEFAULTS[:execution_mode]) + rescue ArgumentError => error + errors.add(:base, error.message) + end + end + def default_options { mode: ENV["SOLID_QUEUE_SUPERVISOR_MODE"] || :fork, @@ -153,7 +163,7 @@ def schedulers def workers_options @workers_options ||= processes_config.fetch(:workers, []) - .map { |options| options.dup.symbolize_keys } + .map { |options| normalize_worker_options(options) } end def dispatchers_options @@ -228,8 +238,26 @@ def load_config_from_file(file) def estimated_number_of_threads # At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task - thread_count = workers_options.map { |options| options.fetch(:threads, WORKER_DEFAULTS[:threads]) }.max + thread_count = workers_options.map { |options| worker_capacity(options) }.max (thread_count || 1) + 2 end + + def normalize_worker_options(options) + options = options.dup.symbolize_keys + options[:threads] = worker_capacity(options) + options[:capacity] = options[:threads] if options.key?(:capacity) + options[:execution_mode] = normalized_worker_execution_mode(options) + options + end + + def worker_capacity(options) + options[:capacity] || options[:threads] || WORKER_DEFAULTS[:threads] + end + + def normalized_worker_execution_mode(options) + SolidQueue::ExecutionPools.normalize_mode(options[:execution_mode] || WORKER_DEFAULTS[:execution_mode]) + rescue ArgumentError + options[:execution_mode] || WORKER_DEFAULTS[:execution_mode] + end end end diff --git a/lib/solid_queue/execution_pools.rb b/lib/solid_queue/execution_pools.rb new file mode 100644 index 000000000..262a0b9be --- /dev/null +++ b/lib/solid_queue/execution_pools.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module SolidQueue + module ExecutionPools + class << self + def build(mode:, size:, on_state_change: nil) + case normalize_mode(mode) + when :thread + ThreadPool.new(size, on_state_change: on_state_change) + when :async + AsyncPool.new(size, on_state_change: on_state_change) + end + end + + def normalize_mode(mode) + case mode.to_s + when "", "thread" + :thread + when "async", "fiber" + :async + else + raise ArgumentError, "Unknown execution mode #{mode.inspect}. Expected one of: :thread, :async, :fiber" + end + end + end + end +end diff --git a/lib/solid_queue/execution_pools/async_pool.rb b/lib/solid_queue/execution_pools/async_pool.rb new file mode 100644 index 000000000..fc40926e6 --- /dev/null +++ b/lib/solid_queue/execution_pools/async_pool.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +module SolidQueue + module ExecutionPools + class AsyncPool + include AppExecutor + + class MissingDependencyError < LoadError + def initialize(error) + super( + "Async execution mode requires the `async` gem. " \ + "Add `gem \"async\"` to your Gemfile to use `execution_mode: async`. " \ + "Original error: #{error.message}" + ) + end + end + + attr_reader :size + + def initialize(size, on_state_change: nil) + @size = size + @on_state_change = on_state_change + @available_capacity = size + @mutex = Mutex.new + @state_mutex = Mutex.new + @shutdown = false + @fatal_error = nil + @boot_queue = Thread::Queue.new + + load_dependency! + + @queue = Async::Queue.new + @reactor_thread = start_reactor + + boot_result = @boot_queue.pop + raise boot_result if boot_result.is_a?(Exception) + end + + def post(execution) + reserved = false + raise_if_fatal_error! + raise RuntimeError, "Execution pool is shutting down" if shutdown? + + reserve_capacity! + reserved = true + queue.enqueue(execution) + rescue Exception + restore_capacity if reserved + raise + end + + def available_capacity + raise_if_fatal_error! + mutex.synchronize { @available_capacity } + end + + def idle? + available_capacity.positive? + end + + def shutdown + should_close = state_mutex.synchronize do + next false if @shutdown + + @shutdown = true + end + + queue.close if should_close + end + + def shutdown? + state_mutex.synchronize { @shutdown } + end + + def wait_for_termination(timeout) + reactor_thread.join(timeout) + end + + def metadata + { + execution_mode: "async", + capacity: size, + inflight: size - available_capacity + } + end + + private + attr_reader :boot_queue, :mutex, :on_state_change, :queue, :reactor_thread, :state_mutex + + def name + @name ||= "solid_queue-async-pool-#{object_id}" + end + + def load_dependency! + require "async" + require "async/queue" + require "async/semaphore" + rescue LoadError => error + raise MissingDependencyError.new(error) + end + + def start_reactor + create_thread do + Async do |task| + semaphore = Async::Semaphore.new(size, parent: task) + boot_queue << :ready + + drain_queue(task, semaphore) + task.wait_all + end + rescue Exception => error + register_fatal_error(error) + raise + end + end + + def drain_queue(task, semaphore) + task.async do + while execution = queue.dequeue + semaphore.async(execution) do |_execution_task, scheduled_execution| + perform_execution(scheduled_execution) + end + end + end.wait + end + + def perform_execution(execution) + wrap_in_app_executor { execution.perform } + rescue Exception => error + handle_thread_error(error) + ensure + restore_capacity + end + + def reserve_capacity! + mutex.synchronize do + raise RuntimeError, "Execution pool is at capacity" if @available_capacity <= 0 + + @available_capacity -= 1 + end + end + + def restore_capacity + should_notify = mutex.synchronize do + @available_capacity += 1 + @available_capacity.positive? + end + + on_state_change&.call if should_notify + end + + def register_fatal_error(error) + state_mutex.synchronize do + @fatal_error ||= error + end + + boot_queue << error if boot_queue.empty? + on_state_change&.call + end + + def raise_if_fatal_error! + error = state_mutex.synchronize { @fatal_error } + raise error if error + end + end + end +end diff --git a/lib/solid_queue/execution_pools/thread_pool.rb b/lib/solid_queue/execution_pools/thread_pool.rb new file mode 100644 index 000000000..404e1c533 --- /dev/null +++ b/lib/solid_queue/execution_pools/thread_pool.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +module SolidQueue + module ExecutionPools + class ThreadPool + include AppExecutor + + attr_reader :size + + delegate :shutdown, :shutdown?, :wait_for_termination, to: :executor + + def initialize(size, on_state_change: nil) + @size = size + @on_state_change = on_state_change + @available_capacity = size + @mutex = Mutex.new + end + + def post(execution) + reserved = false + reserve_capacity! + reserved = true + + Concurrent::Promises.future_on(executor, execution) do |thread_execution| + wrap_in_app_executor { thread_execution.perform } + rescue Exception => error + handle_thread_error(error) + ensure + restore_capacity + end + rescue Exception + restore_capacity if reserved + raise + end + + def available_capacity + mutex.synchronize { @available_capacity } + end + + def idle? + available_capacity.positive? + end + + def metadata + { + execution_mode: "thread", + capacity: size, + inflight: size - available_capacity, + thread_pool_size: size + } + end + + private + attr_reader :mutex, :on_state_change + + DEFAULT_OPTIONS = { + min_threads: 0, + idletime: 60, + fallback_policy: :abort + } + + def executor + @executor ||= Concurrent::ThreadPoolExecutor.new DEFAULT_OPTIONS.merge(max_threads: size, max_queue: size) + end + + def reserve_capacity! + mutex.synchronize do + raise RuntimeError, "Execution pool is at capacity" if @available_capacity <= 0 + + @available_capacity -= 1 + end + end + + def restore_capacity + should_notify = mutex.synchronize do + @available_capacity += 1 + @available_capacity.positive? + end + + on_state_change&.call if should_notify + end + end + end +end diff --git a/lib/solid_queue/pool.rb b/lib/solid_queue/pool.rb index 9c3d2a298..356bbaf35 100644 --- a/lib/solid_queue/pool.rb +++ b/lib/solid_queue/pool.rb @@ -1,54 +1,5 @@ # frozen_string_literal: true module SolidQueue - class Pool - include AppExecutor - - attr_reader :size - - delegate :shutdown, :shutdown?, :wait_for_termination, to: :executor - - def initialize(size, on_idle: nil) - @size = size - @on_idle = on_idle - @available_threads = Concurrent::AtomicFixnum.new(size) - @mutex = Mutex.new - end - - def post(execution) - available_threads.decrement - - Concurrent::Promises.future_on(executor, execution) do |thread_execution| - wrap_in_app_executor do - thread_execution.perform - ensure - available_threads.increment - mutex.synchronize { on_idle.try(:call) if idle? } - end - end.on_rejection! do |e| - handle_thread_error(e) - end - end - - def idle_threads - available_threads.value - end - - def idle? - idle_threads > 0 - end - - private - attr_reader :available_threads, :on_idle, :mutex - - DEFAULT_OPTIONS = { - min_threads: 0, - idletime: 60, - fallback_policy: :abort - } - - def executor - @executor ||= Concurrent::ThreadPoolExecutor.new DEFAULT_OPTIONS.merge(max_threads: size, max_queue: size) - end - end + Pool = ExecutionPools::ThreadPool end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index e036a5fd9..145022a68 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -11,18 +11,24 @@ class Worker < Processes::Poller attr_reader :queues, :pool def initialize(**options) - options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) + options = options.dup + options[:threads] = options[:capacity] || options[:threads] + options = options.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) # Ensure that the queues array is deep frozen to prevent accidental modification @queues = Array(options[:queues]).map(&:freeze).freeze - @pool = Pool.new(options[:threads], on_idle: -> { wake_up }) + @pool = ExecutionPools.build( + mode: options[:execution_mode], + size: options[:threads], + on_state_change: -> { wake_up } + ) super(**options) end def metadata - super.merge(queues: queues.join(","), thread_pool_size: pool.size) + super.merge(queues: queues.join(",")).merge(pool.metadata) end private @@ -32,13 +38,15 @@ def poll pool.post(execution) end + reload_metadata if executions.any? + pool.idle? ? polling_interval : 10.minutes end end def claim_executions with_polling_volume do - SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id) + SolidQueue::ReadyExecution.claim(queues, pool.available_capacity, process_id) end end @@ -53,6 +61,11 @@ def all_work_completed? SolidQueue::ReadyExecution.aggregated_count_across(queues).zero? end + def heartbeat + super + reload_metadata + end + def set_procline procline "waiting for jobs in #{queues.join(",")}" end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 3a5c9693e..e444a7212 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -34,6 +34,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "appraisal" spec.add_development_dependency "debug", "~> 1.9" + spec.add_development_dependency "async", ">= 2.24" spec.add_development_dependency "minitest", "~> 5.0" spec.add_development_dependency "mocha" spec.add_development_dependency "puma", "~> 7.0" diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 34f69658b..dc30d4b8b 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -61,6 +61,20 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :worker, 2 end + test "normalize worker execution modes and capacity aliases" do + configuration = SolidQueue::Configuration.new( + workers: [ + { queues: "llm*", execution_mode: :async, capacity: 10 }, + { queues: "*", execution_mode: :fiber, threads: 3 } + ], + dispatchers: [], + skip_recurring: true + ) + + assert configuration.valid? + assert_processes configuration, :worker, 2, execution_mode: [ :async, :async ], threads: [ 10, 3 ], capacity: [ 10, nil ] + end + test "mulitple workers with the same configuration" do background_worker = { queues: "background", polling_interval: 10, processes: 3 } configuration = SolidQueue::Configuration.new(workers: [ background_worker ]) @@ -165,6 +179,10 @@ class ConfigurationTest < ActiveSupport::TestCase assert_not configuration.valid? assert_equal [ "No processes configured" ], configuration.errors.full_messages + configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [ { execution_mode: :invalid } ]) + assert_not configuration.valid? + assert_match /Unknown execution mode/, configuration.errors.full_messages.first + # Not enough DB connections configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ]) assert_not configuration.valid? diff --git a/test/unit/execution_pools/async_pool_test.rb b/test/unit/execution_pools/async_pool_test.rb new file mode 100644 index 000000000..892327267 --- /dev/null +++ b/test/unit/execution_pools/async_pool_test.rb @@ -0,0 +1,62 @@ +require "test_helper" + +class AsyncPoolTest < Minitest::Test + Execution = Struct.new(:started, :results, :pause) do + def perform + started << true if started + sleep(pause) if pause + results << [ Thread.current.object_id, Fiber.current.object_id ] if results + end + end + + def test_raises_a_clear_error_when_the_async_gem_is_unavailable + load_error = LoadError.new("cannot load such file -- async") + + SolidQueue::ExecutionPools::AsyncPool.any_instance.expects(:require).with("async").raises(load_error) + + error = assert_raises SolidQueue::ExecutionPools::AsyncPool::MissingDependencyError do + SolidQueue::ExecutionPools::AsyncPool.new(3) + end + + assert_match /gem "async"/, error.message + end + + def test_build_treats_fiber_as_an_alias_for_async + pool = mock + + SolidQueue::ExecutionPools::AsyncPool.expects(:new).with(5, on_state_change: nil).returns(pool) + + assert_equal pool, SolidQueue::ExecutionPools.build(mode: :fiber, size: 5) + end + + def test_executes_jobs_as_fibers_on_a_single_reactor_thread + pool = SolidQueue::ExecutionPools::AsyncPool.new(2) + results = Thread::Queue.new + + pool.post Execution.new(nil, results, 0.05) + pool.post Execution.new(nil, results, 0.05) + + entries = 2.times.map { Timeout.timeout(1.second) { results.pop } } + + assert_equal 1, entries.map(&:first).uniq.count + assert_equal 2, entries.map(&:last).uniq.count + assert_equal 2, pool.available_capacity + assert_equal 0, pool.metadata[:inflight] + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + + def test_waits_for_in_flight_executions_during_shutdown + pool = SolidQueue::ExecutionPools::AsyncPool.new(1) + started = Thread::Queue.new + + pool.post Execution.new(started, nil, 0.1) + Timeout.timeout(1.second) { started.pop } + + pool.shutdown + + assert_nil pool.wait_for_termination(0.01) + assert pool.wait_for_termination(1.second) + end +end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 3d692404b..01ae22e9b 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -3,6 +3,7 @@ class WorkerTest < ActiveSupport::TestCase include ActiveSupport::Testing::MethodCallAssertions + self.use_transactional_tests = false setup do @worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2) @@ -19,7 +20,14 @@ class WorkerTest < ActiveSupport::TestCase process = SolidQueue::Process.first assert_equal "Worker", process.kind - assert_metadata process, { queues: "background", polling_interval: 0.2, thread_pool_size: 3 } + assert_metadata process, { + queues: "background", + polling_interval: 0.2, + execution_mode: "thread", + capacity: 3, + inflight: 0, + thread_pool_size: 3 + } end test "errors on polling are passed to on_thread_error and re-raised" do From 191883e4569c6f4b92cc893d3ae61e43c4c4c74c Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Thu, 2 Apr 2026 11:58:09 +0200 Subject: [PATCH 02/14] Relax async worker connection pool validation --- README.md | 5 +++-- lib/solid_queue/configuration.rb | 30 ++++++++++++++++++++++++------ lib/solid_queue/worker.rb | 2 +- test/unit/configuration_test.rb | 14 ++++++++++++-- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 08edb1120..fd7e47826 100644 --- a/README.md +++ b/README.md @@ -278,9 +278,10 @@ Here's an overview of the different options: Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance). - `execution_mode`: controls how a worker executes claimed jobs. `thread` is the default and uses the existing thread pool behavior. `async` executes jobs as fibers on a single reactor thread. `fiber` is accepted as an alias for `async`. -- `threads`: this is the execution capacity for a worker, and remains the backward-compatible configuration name. In `thread` mode, it is the max size of the thread pool. In `async` mode, it is the max number of in-flight jobs/fibers. By default, this is `3`. Only workers have this setting. +- `threads`: this is the execution capacity for a worker in `thread` mode. It is the max size of the thread pool. By default, this is `3`. Only workers have this setting. It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker uses connections for polling and heartbeat and thread mode may use additional connections for job execution. -- `capacity`: an alias for `threads`. This is the clearer name when `execution_mode: async`, because it refers to in-flight execution capacity rather than operating system threads. +- `capacity`: an alias for worker execution capacity. This is the clearer name when `execution_mode: async`, because it refers to in-flight execution capacity rather than operating system threads. +- `fibers`: an alias for `capacity` when `execution_mode: async`. - `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. This works with both `execution_mode: thread` and `execution_mode: async` as long as the supervisor is running in the default `fork` mode. **Note**: this option is ignored only when the supervisor itself is [running in `async` mode](#fork-vs-async-mode). - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 16348603c..52d9ab906 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -141,7 +141,8 @@ def workers 1 end - processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) } + defaults = worker_defaults_for(worker_options) + processes.times.map { Process.new(:worker, worker_options.with_defaults(defaults)) } end end @@ -237,21 +238,22 @@ def load_config_from_file(file) end def estimated_number_of_threads - # At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task - thread_count = workers_options.map { |options| worker_capacity(options) }.max + # At most one execution thread for async workers, or "threads" for thread workers, + # plus 1 thread for the worker loop and 1 thread for the heartbeat task. + thread_count = workers_options.map { |options| execution_threads_for_pool(options) }.max (thread_count || 1) + 2 end def normalize_worker_options(options) options = options.dup.symbolize_keys - options[:threads] = worker_capacity(options) - options[:capacity] = options[:threads] if options.key?(:capacity) options[:execution_mode] = normalized_worker_execution_mode(options) + options[:capacity] = worker_capacity(options) if options.key?(:capacity) || options.key?(:fibers) + options[:threads] = worker_capacity(options) unless async_worker?(options) && !options.key?(:threads) options end def worker_capacity(options) - options[:capacity] || options[:threads] || WORKER_DEFAULTS[:threads] + options[:capacity] || options[:fibers] || options[:threads] || WORKER_DEFAULTS[:threads] end def normalized_worker_execution_mode(options) @@ -259,5 +261,21 @@ def normalized_worker_execution_mode(options) rescue ArgumentError options[:execution_mode] || WORKER_DEFAULTS[:execution_mode] end + + def execution_threads_for_pool(options) + async_worker?(options) ? 1 : worker_capacity(options) + end + + def async_worker?(options) + normalized_worker_execution_mode(options) == :async + end + + def worker_defaults_for(options) + if async_worker?(options) + WORKER_DEFAULTS.except(:threads).merge(capacity: WORKER_DEFAULTS[:threads]) + else + WORKER_DEFAULTS + end + end end end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 145022a68..f7827ea24 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -12,7 +12,7 @@ class Worker < Processes::Poller def initialize(**options) options = options.dup - options[:threads] = options[:capacity] || options[:threads] + options[:threads] = options[:capacity] || options[:fibers] || options[:threads] options = options.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) # Ensure that the queues array is deep frozen to prevent accidental modification diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index dc30d4b8b..dcadc8ef1 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -65,14 +65,24 @@ class ConfigurationTest < ActiveSupport::TestCase configuration = SolidQueue::Configuration.new( workers: [ { queues: "llm*", execution_mode: :async, capacity: 10 }, - { queues: "*", execution_mode: :fiber, threads: 3 } + { queues: "*", execution_mode: :fiber, fibers: 3 } ], dispatchers: [], skip_recurring: true ) assert configuration.valid? - assert_processes configuration, :worker, 2, execution_mode: [ :async, :async ], threads: [ 10, 3 ], capacity: [ 10, nil ] + assert_processes configuration, :worker, 2, execution_mode: [ :async, :async ], capacity: [ 10, 3 ], threads: [ nil, nil ] + end + + test "async worker capacity does not inflate required database pool size" do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "llm*", execution_mode: :async, capacity: 1000 } ], + dispatchers: [], + skip_recurring: true + ) + + assert configuration.valid? end test "mulitple workers with the same configuration" do From 444091d31ed78a5d28c8f0df12614d1e7e0a10f3 Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Thu, 2 Apr 2026 12:46:44 +0200 Subject: [PATCH 03/14] Handle async pool cancellations as fatal --- lib/solid_queue/execution_pools/async_pool.rb | 3 ++ test/unit/execution_pools/async_pool_test.rb | 30 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/lib/solid_queue/execution_pools/async_pool.rb b/lib/solid_queue/execution_pools/async_pool.rb index fc40926e6..bcc6ff005 100644 --- a/lib/solid_queue/execution_pools/async_pool.rb +++ b/lib/solid_queue/execution_pools/async_pool.rb @@ -126,6 +126,9 @@ def drain_queue(task, semaphore) def perform_execution(execution) wrap_in_app_executor { execution.perform } + rescue Async::Cancel => error + handle_thread_error(error) + register_fatal_error(error) rescue Exception => error handle_thread_error(error) ensure diff --git a/test/unit/execution_pools/async_pool_test.rb b/test/unit/execution_pools/async_pool_test.rb index 892327267..177f14e00 100644 --- a/test/unit/execution_pools/async_pool_test.rb +++ b/test/unit/execution_pools/async_pool_test.rb @@ -9,6 +9,13 @@ def perform end end + CancelledExecution = Struct.new(:started) do + def perform + started << true if started + raise Async::Cancel.new + end + end + def test_raises_a_clear_error_when_the_async_gem_is_unavailable load_error = LoadError.new("cannot load such file -- async") @@ -59,4 +66,27 @@ def test_waits_for_in_flight_executions_during_shutdown assert_nil pool.wait_for_termination(0.01) assert pool.wait_for_termination(1.second) end + + def test_marks_the_pool_as_fatal_when_an_execution_is_cancelled + notifications = Thread::Queue.new + started = Thread::Queue.new + reported_errors = [] + original_on_thread_error = SolidQueue.on_thread_error + SolidQueue.on_thread_error = ->(error) { reported_errors << error.class.name } + + pool = SolidQueue::ExecutionPools::AsyncPool.new(1, on_state_change: -> { notifications << :changed }) + + pool.post CancelledExecution.new(started) + Timeout.timeout(1.second) { started.pop } + Timeout.timeout(1.second) { notifications.pop } + + error = assert_raises(Async::Cancel) { pool.available_capacity } + assert_equal "Task was cancelled", error.message + assert_equal [ "Async::Cancel" ], reported_errors + assert_raises(Async::Cancel) { pool.metadata } + ensure + SolidQueue.on_thread_error = original_on_thread_error + pool&.shutdown + pool&.wait_for_termination(1.second) + end end From ae0efbd7dc884454cbbb2494b1a0d2b50d93ac34 Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Thu, 2 Apr 2026 13:46:10 +0200 Subject: [PATCH 04/14] Build worker pools after fork --- lib/solid_queue/worker.rb | 13 +++++++++++-- test/unit/worker_test.rb | 20 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index f7827ea24..63c825033 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -18,11 +18,11 @@ def initialize(**options) # Ensure that the queues array is deep frozen to prevent accidental modification @queues = Array(options[:queues]).map(&:freeze).freeze - @pool = ExecutionPools.build( + @pool_options = { mode: options[:execution_mode], size: options[:threads], on_state_change: -> { wake_up } - ) + } super(**options) end @@ -50,6 +50,11 @@ def claim_executions end end + def boot + build_pool + super + end + def shutdown pool.shutdown pool.wait_for_termination(SolidQueue.shutdown_timeout) @@ -69,5 +74,9 @@ def heartbeat def set_procline procline "waiting for jobs in #{queues.join(",")}" end + + def build_pool + @pool ||= ExecutionPools.build(**@pool_options) + end end end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 01ae22e9b..fe0b84587 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -30,6 +30,26 @@ class WorkerTest < ActiveSupport::TestCase } end + test "builds the execution pool on boot instead of initialize" do + pool = SolidQueue::ExecutionPools::ThreadPool.new(3) + + SolidQueue::ExecutionPools.expects(:build).once.with do |**options| + options[:mode] == :thread && options[:size] == 3 && options[:on_state_change].respond_to?(:call) + end.returns(pool) + + worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2) + + assert_nil worker.pool + + worker.start + wait_for_registered_processes(1, timeout: 1.second) + + assert_equal pool, worker.pool + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) + end + test "errors on polling are passed to on_thread_error and re-raised" do errors = Concurrent::Array.new From abbbbb4be6aa1ca980dac1e0b9699403c8644e04 Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 14:30:32 +0200 Subject: [PATCH 05/14] Gate async workers on fiber isolation --- README.md | 3 + lib/solid_queue/configuration.rb | 9 ++ lib/solid_queue/execution_pools/async_pool.rb | 24 ++++ lib/solid_queue/worker.rb | 24 +++- test/test_helper.rb | 13 ++ test/unit/configuration_test.rb | 40 +++--- test/unit/execution_pools/async_pool_test.rb | 99 ++++++++------ test/unit/worker_test.rb | 125 +++++++++++++----- 8 files changed, 247 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index fd7e47826..31dc06235 100644 --- a/README.md +++ b/README.md @@ -278,6 +278,7 @@ Here's an overview of the different options: Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance). - `execution_mode`: controls how a worker executes claimed jobs. `thread` is the default and uses the existing thread pool behavior. `async` executes jobs as fibers on a single reactor thread. `fiber` is accepted as an alias for `async`. + Async worker execution requires fiber-scoped isolated execution state. In Rails apps, set `config.active_support.isolation_level = :fiber` before using `execution_mode: async`. Solid Queue refuses to boot async workers when isolation remains thread-scoped. - `threads`: this is the execution capacity for a worker in `thread` mode. It is the max size of the thread pool. By default, this is `3`. Only workers have this setting. It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker uses connections for polling and heartbeat and thread mode may use additional connections for job execution. - `capacity`: an alias for worker execution capacity. This is the clearer name when `execution_mode: async`, because it refers to in-flight execution capacity rather than operating system threads. @@ -377,6 +378,8 @@ By default, workers in Solid Queue use a thread pool to run work in multiple thr Async worker execution is best suited for cooperative, mostly I/O-bound jobs. Blocking or CPU-heavy work still blocks the single reactor thread, so it should not be expected to outperform thread mode for every workload. +Because async workers run multiple fibers on a single thread, Rails must also isolate execution state per fiber rather than per thread. If your app keeps the default thread-scoped isolation level, Solid Queue will raise a boot-time error instead of running async workers with shared Active Record state. + The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode: - `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit. - `QUIT`: starts immediate termination. The supervisor will send a `QUIT` signal to its supervised processes, causing them to exit immediately. diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 52d9ab906..134979682 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -8,6 +8,7 @@ class Configuration validate :ensure_valid_recurring_tasks validate :ensure_correctly_sized_thread_pool validate :ensure_valid_worker_execution_modes + validate :ensure_async_workers_use_supported_isolation_level class Process < Struct.new(:kind, :attributes) def instantiate @@ -105,6 +106,14 @@ def ensure_valid_worker_execution_modes end end + def ensure_async_workers_use_supported_isolation_level + return unless workers_options.any? { |options| async_worker?(options) } + + SolidQueue::ExecutionPools::AsyncPool.ensure_supported_isolation_level! + rescue ArgumentError => error + errors.add(:base, error.message) + end + def default_options { mode: ENV["SOLID_QUEUE_SUPERVISOR_MODE"] || :fork, diff --git a/lib/solid_queue/execution_pools/async_pool.rb b/lib/solid_queue/execution_pools/async_pool.rb index bcc6ff005..0d912561f 100644 --- a/lib/solid_queue/execution_pools/async_pool.rb +++ b/lib/solid_queue/execution_pools/async_pool.rb @@ -15,6 +15,29 @@ def initialize(error) end end + class UnsupportedIsolationLevelError < ArgumentError + def initialize(level) + super( + "Async execution mode requires fiber-scoped isolated execution state. " \ + "Set `ActiveSupport::IsolatedExecutionState.isolation_level = :fiber` " \ + "(or `config.active_support.isolation_level = :fiber` in Rails). " \ + "Current isolation level: #{level.inspect}" + ) + end + end + + class << self + def ensure_supported_isolation_level! + return if supported_isolation_level? + + raise UnsupportedIsolationLevelError.new(ActiveSupport::IsolatedExecutionState.isolation_level) + end + + def supported_isolation_level? + ActiveSupport::IsolatedExecutionState.isolation_level == :fiber + end + end + attr_reader :size def initialize(size, on_state_change: nil) @@ -28,6 +51,7 @@ def initialize(size, on_state_change: nil) @boot_queue = Thread::Queue.new load_dependency! + self.class.ensure_supported_isolation_level! @queue = Async::Queue.new @reactor_thread = start_reactor diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 63c825033..4b6e185f3 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -17,11 +17,13 @@ def initialize(**options) # Ensure that the queues array is deep frozen to prevent accidental modification @queues = Array(options[:queues]).map(&:freeze).freeze + @metadata_state_mutex = Mutex.new + @metadata_dirty = false @pool_options = { mode: options[:execution_mode], size: options[:threads], - on_state_change: -> { wake_up } + on_state_change: -> { mark_metadata_dirty; wake_up } } super(**options) @@ -38,7 +40,7 @@ def poll pool.post(execution) end - reload_metadata if executions.any? + reload_metadata_if_needed(executions.any?) pool.idle? ? polling_interval : 10.minutes end @@ -78,5 +80,23 @@ def set_procline def build_pool @pool ||= ExecutionPools.build(**@pool_options) end + + def mark_metadata_dirty + metadata_state_mutex.synchronize { @metadata_dirty = true } + end + + def metadata_state_mutex + @metadata_state_mutex + end + + def reload_metadata_if_needed(executions_claimed) + needs_reload = metadata_state_mutex.synchronize do + claimed_or_dirty = executions_claimed || @metadata_dirty + @metadata_dirty = false + claimed_or_dirty + end + + reload_metadata if needs_reload + end end end diff --git a/test/test_helper.rb b/test/test_helper.rb index db5bd5c34..cd60eb7af 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -29,6 +29,19 @@ def write(...) Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions) class ExpectedTestError < RuntimeError; end +module ExecutionIsolationTestHelper + def with_execution_isolation(level) + previous_level = ActiveSupport::IsolatedExecutionState.isolation_level + ActiveSupport::IsolatedExecutionState.isolation_level = level + yield + ensure + ActiveSupport::IsolatedExecutionState.isolation_level = previous_level + end +end + +class Minitest::Test + include ExecutionIsolationTestHelper +end class ActiveSupport::TestCase include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index dcadc8ef1..1a0f20f3c 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -62,27 +62,31 @@ class ConfigurationTest < ActiveSupport::TestCase end test "normalize worker execution modes and capacity aliases" do - configuration = SolidQueue::Configuration.new( - workers: [ - { queues: "llm*", execution_mode: :async, capacity: 10 }, - { queues: "*", execution_mode: :fiber, fibers: 3 } - ], - dispatchers: [], - skip_recurring: true - ) + with_execution_isolation(:fiber) do + configuration = SolidQueue::Configuration.new( + workers: [ + { queues: "llm*", execution_mode: :async, capacity: 10 }, + { queues: "*", execution_mode: :fiber, fibers: 3 } + ], + dispatchers: [], + skip_recurring: true + ) - assert configuration.valid? - assert_processes configuration, :worker, 2, execution_mode: [ :async, :async ], capacity: [ 10, 3 ], threads: [ nil, nil ] + assert configuration.valid? + assert_processes configuration, :worker, 2, execution_mode: [ :async, :async ], capacity: [ 10, 3 ], threads: [ nil, nil ] + end end test "async worker capacity does not inflate required database pool size" do - configuration = SolidQueue::Configuration.new( - workers: [ { queues: "llm*", execution_mode: :async, capacity: 1000 } ], - dispatchers: [], - skip_recurring: true - ) + with_execution_isolation(:fiber) do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "llm*", execution_mode: :async, capacity: 1000 } ], + dispatchers: [], + skip_recurring: true + ) - assert configuration.valid? + assert configuration.valid? + end end test "mulitple workers with the same configuration" do @@ -193,6 +197,10 @@ class ConfigurationTest < ActiveSupport::TestCase assert_not configuration.valid? assert_match /Unknown execution mode/, configuration.errors.full_messages.first + configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [ { execution_mode: :async } ]) + assert_not configuration.valid? + assert_match /requires fiber-scoped isolated execution state/, configuration.errors.full_messages.first + # Not enough DB connections configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ]) assert_not configuration.valid? diff --git a/test/unit/execution_pools/async_pool_test.rb b/test/unit/execution_pools/async_pool_test.rb index 177f14e00..93b06850c 100644 --- a/test/unit/execution_pools/async_pool_test.rb +++ b/test/unit/execution_pools/async_pool_test.rb @@ -36,57 +36,74 @@ def test_build_treats_fiber_as_an_alias_for_async assert_equal pool, SolidQueue::ExecutionPools.build(mode: :fiber, size: 5) end - def test_executes_jobs_as_fibers_on_a_single_reactor_thread - pool = SolidQueue::ExecutionPools::AsyncPool.new(2) - results = Thread::Queue.new - - pool.post Execution.new(nil, results, 0.05) - pool.post Execution.new(nil, results, 0.05) + def test_raises_a_clear_error_when_isolation_level_is_not_fiber + error = assert_raises SolidQueue::ExecutionPools::AsyncPool::UnsupportedIsolationLevelError do + SolidQueue::ExecutionPools::AsyncPool.new(3) + end - entries = 2.times.map { Timeout.timeout(1.second) { results.pop } } + assert_match /isolation_level = :fiber/, error.message + end - assert_equal 1, entries.map(&:first).uniq.count - assert_equal 2, entries.map(&:last).uniq.count - assert_equal 2, pool.available_capacity - assert_equal 0, pool.metadata[:inflight] - ensure - pool&.shutdown - pool&.wait_for_termination(1.second) + def test_executes_jobs_as_fibers_on_a_single_reactor_thread + with_execution_isolation(:fiber) do + pool = SolidQueue::ExecutionPools::AsyncPool.new(2) + results = Thread::Queue.new + + pool.post Execution.new(nil, results, 0.05) + pool.post Execution.new(nil, results, 0.05) + + entries = 2.times.map { Timeout.timeout(1.second) { results.pop } } + + assert_equal 1, entries.map(&:first).uniq.count + assert_equal 2, entries.map(&:last).uniq.count + assert_equal 2, pool.available_capacity + assert_equal 0, pool.metadata[:inflight] + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end end def test_waits_for_in_flight_executions_during_shutdown - pool = SolidQueue::ExecutionPools::AsyncPool.new(1) - started = Thread::Queue.new + with_execution_isolation(:fiber) do + pool = SolidQueue::ExecutionPools::AsyncPool.new(1) + started = Thread::Queue.new - pool.post Execution.new(started, nil, 0.1) - Timeout.timeout(1.second) { started.pop } + pool.post Execution.new(started, nil, 0.1) + Timeout.timeout(1.second) { started.pop } - pool.shutdown + pool.shutdown - assert_nil pool.wait_for_termination(0.01) - assert pool.wait_for_termination(1.second) + assert_nil pool.wait_for_termination(0.01) + assert pool.wait_for_termination(1.second) + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end end def test_marks_the_pool_as_fatal_when_an_execution_is_cancelled - notifications = Thread::Queue.new - started = Thread::Queue.new - reported_errors = [] - original_on_thread_error = SolidQueue.on_thread_error - SolidQueue.on_thread_error = ->(error) { reported_errors << error.class.name } - - pool = SolidQueue::ExecutionPools::AsyncPool.new(1, on_state_change: -> { notifications << :changed }) - - pool.post CancelledExecution.new(started) - Timeout.timeout(1.second) { started.pop } - Timeout.timeout(1.second) { notifications.pop } - - error = assert_raises(Async::Cancel) { pool.available_capacity } - assert_equal "Task was cancelled", error.message - assert_equal [ "Async::Cancel" ], reported_errors - assert_raises(Async::Cancel) { pool.metadata } - ensure - SolidQueue.on_thread_error = original_on_thread_error - pool&.shutdown - pool&.wait_for_termination(1.second) + with_execution_isolation(:fiber) do + notifications = Thread::Queue.new + started = Thread::Queue.new + reported_errors = [] + original_on_thread_error = SolidQueue.on_thread_error + SolidQueue.on_thread_error = ->(error) { reported_errors << error.class.name } + + pool = SolidQueue::ExecutionPools::AsyncPool.new(1, on_state_change: -> { notifications << :changed }) + + pool.post CancelledExecution.new(started) + Timeout.timeout(1.second) { started.pop } + Timeout.timeout(1.second) { notifications.pop } + + error = assert_raises(Async::Cancel) { pool.available_capacity } + assert_equal "Task was cancelled", error.message + assert_equal [ "Async::Cancel" ], reported_errors + assert_raises(Async::Cancel) { pool.metadata } + ensure + SolidQueue.on_thread_error = original_on_thread_error + pool&.shutdown + pool&.wait_for_termination(1.second) + end end end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index fe0b84587..e0fb18a92 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -5,6 +5,26 @@ class WorkerTest < ActiveSupport::TestCase include ActiveSupport::Testing::MethodCallAssertions self.use_transactional_tests = false + EXECUTION_MODES = [ + { + name: "thread", + options: { threads: 3 }, + expected_metadata: { + execution_mode: "thread", + capacity: 3, + thread_pool_size: 3 + } + }, + { + name: "async", + options: { execution_mode: :async, capacity: 3 }, + expected_metadata: { + execution_mode: "async", + capacity: 3 + } + } + ].freeze + setup do @worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2) end @@ -14,20 +34,73 @@ class WorkerTest < ActiveSupport::TestCase JobBuffer.clear end - test "worker is registered as process" do - @worker.start - wait_for_registered_processes(1, timeout: 1.second) + EXECUTION_MODES.each do |mode| + test "worker is registered as process in #{mode[:name]} mode" do + with_worker_execution_support(mode[:options]) do + worker = SolidQueue::Worker.new(queues: "background", polling_interval: 0.2, **mode[:options]) + + worker.start + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Worker", process.kind + assert_metadata process, { + queues: "background", + polling_interval: 0.2, + inflight: 0 + }.merge(mode[:expected_metadata]) + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) + end + end - process = SolidQueue::Process.first - assert_equal "Worker", process.kind - assert_metadata process, { - queues: "background", - polling_interval: 0.2, - execution_mode: "thread", - capacity: 3, - inflight: 0, - thread_pool_size: 3 - } + test "claim and process more enqueued jobs than the pool size allows to process at once in #{mode[:name]} mode" do + 5.times do + StoreResultJob.perform_later(:paused, pause: 0.1.second) + end + + 3.times do + StoreResultJob.perform_later(:immediate) + end + + with_worker_execution_support(mode[:options]) do + worker = SolidQueue::Worker.new(queues: "background", polling_interval: 0.2, **mode[:options]) + + worker.start + + wait_for_jobs_to_finish_for(2.second) + worker.wake_up + + assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :paused).count + assert_equal 3, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) + end + end + + test "updates inflight metadata after jobs finish in #{mode[:name]} mode" do + StoreResultJob.perform_later(:slow, pause: 0.1.second) + + with_worker_execution_support(mode[:options]) do + worker = SolidQueue::Worker.new(queues: "background", polling_interval: 0.05, **mode[:options]) + + worker.start + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + + wait_for(timeout: 2.seconds) { process.reload.metadata["inflight"] == 1 } + wait_for(timeout: 2.seconds) do + process.reload.metadata["inflight"] == 0 && + JobResult.where(queue_name: :background, status: "completed", value: :slow).count == 1 + end + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) + end + end end test "builds the execution pool on boot instead of initialize" do @@ -113,24 +186,6 @@ class WorkerTest < ActiveSupport::TestCase Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe) end - test "claim and process more enqueued jobs than the pool size allows to process at once" do - 5.times do |i| - StoreResultJob.perform_later(:paused, pause: 0.1.second) - end - - 3.times do |i| - StoreResultJob.perform_later(:immediate) - end - - @worker.start - - wait_for_jobs_to_finish_for(2.second) - @worker.wake_up - - assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :paused).count - assert_equal 3, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count - end - test "polling queries are logged" do log = StringIO.new with_active_record_logger(ActiveSupport::Logger.new(log)) do @@ -223,6 +278,14 @@ class WorkerTest < ActiveSupport::TestCase end private + def with_worker_execution_support(options, &block) + if options[:execution_mode] == :async + with_execution_isolation(:fiber, &block) + else + yield + end + end + def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence yield From 74e443eeca33067a210fab0029f0697f72be6137 Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 15:32:58 +0200 Subject: [PATCH 06/14] Fail fast for async worker config errors --- lib/solid_queue/configuration.rb | 22 +++++++++++++++++- lib/solid_queue/execution_pools/async_pool.rb | 18 +++++++-------- test/unit/configuration_test.rb | 23 +++++++++++++++++++ test/unit/execution_pools/async_pool_test.rb | 2 +- 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 134979682..0e2c63ce8 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -8,6 +8,8 @@ class Configuration validate :ensure_valid_recurring_tasks validate :ensure_correctly_sized_thread_pool validate :ensure_valid_worker_execution_modes + validate :ensure_async_workers_use_capacity_aliases + validate :ensure_async_workers_have_required_dependency validate :ensure_async_workers_use_supported_isolation_level class Process < Struct.new(:kind, :attributes) @@ -106,6 +108,22 @@ def ensure_valid_worker_execution_modes end end + def ensure_async_workers_use_capacity_aliases + workers_options.each do |options| + if async_worker?(options) && options.key?(:threads) + errors.add(:base, "Async workers do not accept `threads`. Use `capacity` or `fibers` instead.") + end + end + end + + def ensure_async_workers_have_required_dependency + return unless workers_options.any? { |options| async_worker?(options) } + + SolidQueue::ExecutionPools::AsyncPool.ensure_dependency! + rescue LoadError => error + errors.add(:base, error.message) + end + def ensure_async_workers_use_supported_isolation_level return unless workers_options.any? { |options| async_worker?(options) } @@ -281,7 +299,9 @@ def async_worker?(options) def worker_defaults_for(options) if async_worker?(options) - WORKER_DEFAULTS.except(:threads).merge(capacity: WORKER_DEFAULTS[:threads]) + WORKER_DEFAULTS.except(:threads).tap do |defaults| + defaults[:capacity] = WORKER_DEFAULTS[:threads] unless options.key?(:threads) + end else WORKER_DEFAULTS end diff --git a/lib/solid_queue/execution_pools/async_pool.rb b/lib/solid_queue/execution_pools/async_pool.rb index 0d912561f..f28b72b2b 100644 --- a/lib/solid_queue/execution_pools/async_pool.rb +++ b/lib/solid_queue/execution_pools/async_pool.rb @@ -27,6 +27,14 @@ def initialize(level) end class << self + def ensure_dependency! + require "async" + require "async/queue" + require "async/semaphore" + rescue LoadError => error + raise MissingDependencyError.new(error) + end + def ensure_supported_isolation_level! return if supported_isolation_level? @@ -50,7 +58,7 @@ def initialize(size, on_state_change: nil) @fatal_error = nil @boot_queue = Thread::Queue.new - load_dependency! + self.class.ensure_dependency! self.class.ensure_supported_isolation_level! @queue = Async::Queue.new @@ -115,14 +123,6 @@ def name @name ||= "solid_queue-async-pool-#{object_id}" end - def load_dependency! - require "async" - require "async/queue" - require "async/semaphore" - rescue LoadError => error - raise MissingDependencyError.new(error) - end - def start_reactor create_thread do Async do |task| diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 1a0f20f3c..624b0939d 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -77,6 +77,19 @@ class ConfigurationTest < ActiveSupport::TestCase end end + test "async workers reject threads in favor of capacity aliases" do + with_execution_isolation(:fiber) do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "llm*", execution_mode: :async, threads: 10 } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_match /Async workers do not accept `threads`/, configuration.errors.full_messages.first + end + end + test "async worker capacity does not inflate required database pool size" do with_execution_isolation(:fiber) do configuration = SolidQueue::Configuration.new( @@ -201,6 +214,16 @@ class ConfigurationTest < ActiveSupport::TestCase assert_not configuration.valid? assert_match /requires fiber-scoped isolated execution state/, configuration.errors.full_messages.first + with_execution_isolation(:fiber) do + load_error = LoadError.new("cannot load such file -- async") + missing_dependency_error = SolidQueue::ExecutionPools::AsyncPool::MissingDependencyError.new(load_error) + SolidQueue::ExecutionPools::AsyncPool.expects(:ensure_dependency!).raises(missing_dependency_error) + + configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [ { execution_mode: :async } ]) + assert_not configuration.valid? + assert_match /gem "async"/, configuration.errors.full_messages.first + end + # Not enough DB connections configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ]) assert_not configuration.valid? diff --git a/test/unit/execution_pools/async_pool_test.rb b/test/unit/execution_pools/async_pool_test.rb index 93b06850c..44faeb05d 100644 --- a/test/unit/execution_pools/async_pool_test.rb +++ b/test/unit/execution_pools/async_pool_test.rb @@ -19,7 +19,7 @@ def perform def test_raises_a_clear_error_when_the_async_gem_is_unavailable load_error = LoadError.new("cannot load such file -- async") - SolidQueue::ExecutionPools::AsyncPool.any_instance.expects(:require).with("async").raises(load_error) + SolidQueue::ExecutionPools::AsyncPool.expects(:require).with("async").raises(load_error) error = assert_raises SolidQueue::ExecutionPools::AsyncPool::MissingDependencyError do SolidQueue::ExecutionPools::AsyncPool.new(3) From a0e944491e7baabe7fa1107fdd3e5c31865db0e0 Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 15:39:09 +0200 Subject: [PATCH 07/14] Make async DB pool guidance Rails-version aware --- README.md | 5 +++++ lib/solid_queue/configuration.rb | 36 +++++++++++++++++++++----------- test/unit/configuration_test.rb | 25 ++++++++++++++++++++-- 3 files changed, 52 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 31dc06235..bc65fafa3 100644 --- a/README.md +++ b/README.md @@ -282,6 +282,7 @@ Here's an overview of the different options: - `threads`: this is the execution capacity for a worker in `thread` mode. It is the max size of the thread pool. By default, this is `3`. Only workers have this setting. It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker uses connections for polling and heartbeat and thread mode may use additional connections for job execution. - `capacity`: an alias for worker execution capacity. This is the clearer name when `execution_mode: async`, because it refers to in-flight execution capacity rather than operating system threads. + Async workers reject `threads`; use `capacity` or `fibers` instead. On Rails 7.2 and later, a practical starting point is usually `3-5` queue database connections per worker process rather than `capacity`, because ordinary Active Record query paths can release connections between async waits. On Rails 7.1, size the queue database pool more conservatively, as in-flight async jobs may still retain connections roughly in proportion to `capacity`. - `fibers`: an alias for `capacity` when `execution_mode: async`. - `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. This works with both `execution_mode: thread` and `execution_mode: async` as long as the supervisor is running in the default `fork` mode. **Note**: this option is ignored only when the supervisor itself is [running in `async` mode](#fork-vs-async-mode). - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. @@ -380,6 +381,10 @@ Async worker execution is best suited for cooperative, mostly I/O-bound jobs. Bl Because async workers run multiple fibers on a single thread, Rails must also isolate execution state per fiber rather than per thread. If your app keeps the default thread-scoped isolation level, Solid Queue will raise a boot-time error instead of running async workers with shared Active Record state. +On Rails 7.2 and later, async workers can often use a much smaller queue database pool than an equivalent thread pool. A practical starting point is `3-5` queue database connections per worker process: one for job execution, one for polling, one for heartbeats, plus some headroom. In the default `fork` supervisor mode, that guidance applies per worker process. In supervisor `async` mode, all workers share one process, so add together the requirements for the workers running there. + +That lower-pool guidance depends on job code not holding connections open across async waits. APIs such as `ActiveRecord::Base.connection`, `lease_connection`, `connection_pool.checkout`, or long-lived `with_connection` / transaction blocks can pin connections and push async workers back toward thread-like pool usage. On Rails 7.1, plan conservatively and assume async capacity can still grow queue database connection usage. + The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode: - `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit. - `QUIT`: starts immediate termination. The supervisor will send a `QUIT` signal to its supervised processes, causing them to exit immediately. diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 0e2c63ce8..605b3884d 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -6,7 +6,7 @@ class Configuration validate :ensure_configured_processes validate :ensure_valid_recurring_tasks - validate :ensure_correctly_sized_thread_pool + validate :ensure_correctly_sized_database_pool validate :ensure_valid_worker_execution_modes validate :ensure_async_workers_use_capacity_aliases validate :ensure_async_workers_have_required_dependency @@ -40,6 +40,7 @@ def instantiate DEFAULT_CONFIG_FILE_PATH = "config/queue.yml" DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" + ASYNC_QUERY_SCOPED_CONNECTIONS_VERSION = Gem::Version.new("7.2.0") def initialize(**options) @options = options.with_defaults(default_options) @@ -93,10 +94,11 @@ def ensure_valid_recurring_tasks end end - def ensure_correctly_sized_thread_pool - if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads - errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " + - "database connection pool is #{db_pool_size}. Increase it in `config/database.yml`") + def ensure_correctly_sized_database_pool + if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_database_pool_size + errors.add(:base, "Solid Queue requires at least #{estimated_database_pool_size} database connections " + + "for the configured workers, but the queue database connection pool is #{db_pool_size}. " + + "Increase it in `config/database.yml`") end end @@ -264,11 +266,13 @@ def load_config_from_file(file) end end - def estimated_number_of_threads - # At most one execution thread for async workers, or "threads" for thread workers, - # plus 1 thread for the worker loop and 1 thread for the heartbeat task. - thread_count = workers_options.map { |options| execution_threads_for_pool(options) }.max - (thread_count || 1) + 2 + def estimated_database_pool_size + worker_pool_size = workers_options.map { |options| estimated_database_pool_size_for_worker(options) }.max + worker_pool_size || 1 + end + + def estimated_database_pool_size_for_worker(options) + estimated_execution_connections_for_worker(options) + 2 end def normalize_worker_options(options) @@ -289,8 +293,16 @@ def normalized_worker_execution_mode(options) options[:execution_mode] || WORKER_DEFAULTS[:execution_mode] end - def execution_threads_for_pool(options) - async_worker?(options) ? 1 : worker_capacity(options) + def estimated_execution_connections_for_worker(options) + async_worker?(options) ? async_execution_connections_for_worker(options) : worker_capacity(options) + end + + def async_execution_connections_for_worker(options) + async_jobs_release_connections_between_queries? ? 1 : worker_capacity(options) + end + + def async_jobs_release_connections_between_queries? + ActiveRecord.gem_version >= ASYNC_QUERY_SCOPED_CONNECTIONS_VERSION end def async_worker?(options) diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 624b0939d..10aaed6e6 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -90,7 +90,24 @@ class ConfigurationTest < ActiveSupport::TestCase end end - test "async worker capacity does not inflate required database pool size" do + test "async worker capacity inflates required database pool size on Rails 7.1" do + skip if async_workers_release_connections_between_queries? + + with_execution_isolation(:fiber) do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "llm*", execution_mode: :async, capacity: 1000 } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_match /requires at least 1002 database connections/, configuration.errors.full_messages.first + end + end + + test "async worker capacity does not inflate required database pool size on Rails 7.2+" do + skip unless async_workers_release_connections_between_queries? + with_execution_isolation(:fiber) do configuration = SolidQueue::Configuration.new( workers: [ { queues: "llm*", execution_mode: :async, capacity: 1000 } ], @@ -227,11 +244,15 @@ class ConfigurationTest < ActiveSupport::TestCase # Not enough DB connections configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ]) assert_not configuration.valid? - assert_match /Solid Queue is configured to use \d+ threads but the database connection pool is \d+. Increase it in `config\/database.yml`/, + assert_match /Solid Queue requires at least \d+ database connections for the configured workers, but the queue database connection pool is \d+. Increase it in `config\/database.yml`/, configuration.errors.full_messages.first end private + def async_workers_release_connections_between_queries? + ActiveRecord.gem_version >= SolidQueue::Configuration::ASYNC_QUERY_SCOPED_CONNECTIONS_VERSION + end + def assert_processes(configuration, kind, count, **attributes) processes = configuration.configured_processes.select { |p| p.kind == kind } assert_equal count, processes.size From cac54bf2cd7f2741cf3753f4678c5d3d5908f7c4 Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 16:15:11 +0200 Subject: [PATCH 08/14] Fix worker pool defaults and async shutdown --- lib/solid_queue/execution_pools/async_pool.rb | 8 ++++++-- lib/solid_queue/worker.rb | 2 +- test/unit/execution_pools/async_pool_test.rb | 15 +++++++++++++++ test/unit/worker_test.rb | 13 +++++++++++++ 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/lib/solid_queue/execution_pools/async_pool.rb b/lib/solid_queue/execution_pools/async_pool.rb index f28b72b2b..f49869cab 100644 --- a/lib/solid_queue/execution_pools/async_pool.rb +++ b/lib/solid_queue/execution_pools/async_pool.rb @@ -5,6 +5,8 @@ module ExecutionPools class AsyncPool include AppExecutor + SHUTDOWN_SENTINEL = Object.new + class MissingDependencyError < LoadError def initialize(error) super( @@ -91,13 +93,13 @@ def idle? end def shutdown - should_close = state_mutex.synchronize do + should_enqueue_shutdown = state_mutex.synchronize do next false if @shutdown @shutdown = true end - queue.close if should_close + queue.enqueue(SHUTDOWN_SENTINEL) if should_enqueue_shutdown end def shutdown? @@ -141,6 +143,8 @@ def start_reactor def drain_queue(task, semaphore) task.async do while execution = queue.dequeue + break if execution.equal?(SHUTDOWN_SENTINEL) + semaphore.async(execution) do |_execution_task, scheduled_execution| perform_execution(scheduled_execution) end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 4b6e185f3..0c7507866 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -12,7 +12,7 @@ class Worker < Processes::Poller def initialize(**options) options = options.dup - options[:threads] = options[:capacity] || options[:fibers] || options[:threads] + options[:threads] = options[:capacity] || options[:fibers] if options.key?(:capacity) || options.key?(:fibers) options = options.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) # Ensure that the queues array is deep frozen to prevent accidental modification diff --git a/test/unit/execution_pools/async_pool_test.rb b/test/unit/execution_pools/async_pool_test.rb index 44faeb05d..b76208d41 100644 --- a/test/unit/execution_pools/async_pool_test.rb +++ b/test/unit/execution_pools/async_pool_test.rb @@ -82,6 +82,21 @@ def test_waits_for_in_flight_executions_during_shutdown end end + def test_shutdown_does_not_depend_on_async_queue_close + with_execution_isolation(:fiber) do + pool = SolidQueue::ExecutionPools::AsyncPool.new(1) + queue = pool.instance_variable_get(:@queue) + queue.define_singleton_method(:close) { raise "should not be called" } + + pool.shutdown + + assert pool.wait_for_termination(1.second) + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + end + def test_marks_the_pool_as_fatal_when_an_execution_is_cancelled with_execution_isolation(:fiber) do notifications = Thread::Queue.new diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index e0fb18a92..e76d27e74 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -123,6 +123,19 @@ class WorkerTest < ActiveSupport::TestCase wait_for_registered_processes(0, timeout: 1.second) end + test "defaults thread workers to the configured thread pool size" do + worker = SolidQueue::Worker.new(queues: "background", polling_interval: 0.2) + + worker.start + wait_for_registered_processes(1, timeout: 1.second) + + assert_equal 3, worker.pool.size + assert_metadata SolidQueue::Process.first, thread_pool_size: 3, capacity: 3, execution_mode: "thread" + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) + end + test "errors on polling are passed to on_thread_error and re-raised" do errors = Concurrent::Array.new From 9ee9f75248d7574ee541234059a019f5e516bb8c Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 16:38:11 +0200 Subject: [PATCH 09/14] Fix async pool compatibility and plugin tests --- lib/solid_queue/execution_pools/async_pool.rb | 80 ++++++++++++++----- test/integration/puma/plugin_testing.rb | 9 ++- test/unit/execution_pools/async_pool_test.rb | 13 ++- 3 files changed, 75 insertions(+), 27 deletions(-) diff --git a/lib/solid_queue/execution_pools/async_pool.rb b/lib/solid_queue/execution_pools/async_pool.rb index f49869cab..7e4188658 100644 --- a/lib/solid_queue/execution_pools/async_pool.rb +++ b/lib/solid_queue/execution_pools/async_pool.rb @@ -5,7 +5,7 @@ module ExecutionPools class AsyncPool include AppExecutor - SHUTDOWN_SENTINEL = Object.new + WAKEUP_SIGNAL = ".".b class MissingDependencyError < LoadError def initialize(error) @@ -31,7 +31,6 @@ def initialize(level) class << self def ensure_dependency! require "async" - require "async/queue" require "async/semaphore" rescue LoadError => error raise MissingDependencyError.new(error) @@ -59,11 +58,12 @@ def initialize(size, on_state_change: nil) @shutdown = false @fatal_error = nil @boot_queue = Thread::Queue.new + @pending_executions = Thread::Queue.new + @wakeup_reader, @wakeup_writer = IO.pipe self.class.ensure_dependency! self.class.ensure_supported_isolation_level! - @queue = Async::Queue.new @reactor_thread = start_reactor boot_result = @boot_queue.pop @@ -77,7 +77,8 @@ def post(execution) reserve_capacity! reserved = true - queue.enqueue(execution) + pending_executions << execution + signal_reactor rescue Exception restore_capacity if reserved raise @@ -93,13 +94,13 @@ def idle? end def shutdown - should_enqueue_shutdown = state_mutex.synchronize do + should_shutdown = state_mutex.synchronize do next false if @shutdown @shutdown = true end - queue.enqueue(SHUTDOWN_SENTINEL) if should_enqueue_shutdown + signal_reactor if should_shutdown end def shutdown? @@ -119,7 +120,7 @@ def metadata end private - attr_reader :boot_queue, :mutex, :on_state_change, :queue, :reactor_thread, :state_mutex + attr_reader :boot_queue, :mutex, :on_state_change, :pending_executions, :reactor_thread, :state_mutex, :wakeup_reader, :wakeup_writer def name @name ||= "solid_queue-async-pool-#{object_id}" @@ -131,30 +132,52 @@ def start_reactor semaphore = Async::Semaphore.new(size, parent: task) boot_queue << :ready - drain_queue(task, semaphore) - task.wait_all + wait_for_executions(semaphore) + wait_for_child_tasks(task) end rescue Exception => error register_fatal_error(error) raise + ensure + close_wakeup_pipe end end - def drain_queue(task, semaphore) - task.async do - while execution = queue.dequeue - break if execution.equal?(SHUTDOWN_SENTINEL) + def wait_for_executions(semaphore) + loop do + wakeup_reader.wait_readable + clear_wakeup_signal + schedule_pending_executions(semaphore) - semaphore.async(execution) do |_execution_task, scheduled_execution| - perform_execution(scheduled_execution) - end + break if shutdown? && pending_executions.empty? + end + end + + def clear_wakeup_signal + loop do + wakeup_reader.read_nonblock(1024) + rescue IO::WaitReadable, EOFError + break + end + end + + def schedule_pending_executions(semaphore) + while execution = next_pending_execution + semaphore.async(execution) do |_execution_task, scheduled_execution| + perform_execution(scheduled_execution) end - end.wait + end + end + + def next_pending_execution + pending_executions.pop(true) + rescue ThreadError + nil end def perform_execution(execution) wrap_in_app_executor { execution.perform } - rescue Async::Cancel => error + rescue Async::Stop => error handle_thread_error(error) register_fatal_error(error) rescue Exception => error @@ -193,6 +216,27 @@ def raise_if_fatal_error! error = state_mutex.synchronize { @fatal_error } raise error if error end + + def signal_reactor + wakeup_writer.write_nonblock(WAKEUP_SIGNAL) + rescue IO::WaitWritable, Errno::EPIPE, IOError + nil + end + + def wait_for_child_tasks(task) + if task.respond_to?(:wait_all) + task.wait_all + else + task.children&.each(&:wait) + end + end + + def close_wakeup_pipe + wakeup_reader.close unless wakeup_reader.closed? + wakeup_writer.close unless wakeup_writer.closed? + rescue IOError + nil + end end end end diff --git a/test/integration/puma/plugin_testing.rb b/test/integration/puma/plugin_testing.rb index 14165c9b8..2ae30c0c0 100644 --- a/test/integration/puma/plugin_testing.rb +++ b/test/integration/puma/plugin_testing.rb @@ -11,11 +11,12 @@ module PluginTesting setup do FileUtils.mkdir_p Rails.root.join("tmp", "pids") + @port = TCPServer.open("127.0.0.1", 0) { |server| server.addr[1] } Dir.chdir("test/dummy") do cmd = %W[ bundle exec puma - -b tcp://127.0.0.1:9222 + -b tcp://127.0.0.1:#{@port} -C config/puma_#{solid_queue_mode}.rb -s config.ru @@ -27,6 +28,7 @@ module PluginTesting end wait_for_registered_processes(5, timeout: 5.second) + wait_for(timeout: 5.seconds) { find_processes_registered_as(supervisor_kind).exists? } end teardown do @@ -47,6 +49,7 @@ module PluginTesting # Ensure the restart finishes before we try to continue with the test wait_for_registered_processes(0, timeout: 5.second) wait_for_registered_processes(5, timeout: 5.second) + wait_for(timeout: 5.seconds) { find_processes_registered_as(supervisor_kind).exists? } StoreResultJob.perform_later(:puma_plugin) wait_for_jobs_to_finish_for(2.seconds) @@ -54,6 +57,10 @@ module PluginTesting end private + def supervisor_kind + "Supervisor(#{solid_queue_mode})" + end + def solid_queue_mode raise NotImplementedError end diff --git a/test/unit/execution_pools/async_pool_test.rb b/test/unit/execution_pools/async_pool_test.rb index b76208d41..7723d1272 100644 --- a/test/unit/execution_pools/async_pool_test.rb +++ b/test/unit/execution_pools/async_pool_test.rb @@ -12,7 +12,7 @@ def perform CancelledExecution = Struct.new(:started) do def perform started << true if started - raise Async::Cancel.new + raise Async::Stop.new end end @@ -82,11 +82,9 @@ def test_waits_for_in_flight_executions_during_shutdown end end - def test_shutdown_does_not_depend_on_async_queue_close + def test_shutdown_wakes_the_reactor_when_idle with_execution_isolation(:fiber) do pool = SolidQueue::ExecutionPools::AsyncPool.new(1) - queue = pool.instance_variable_get(:@queue) - queue.define_singleton_method(:close) { raise "should not be called" } pool.shutdown @@ -111,10 +109,9 @@ def test_marks_the_pool_as_fatal_when_an_execution_is_cancelled Timeout.timeout(1.second) { started.pop } Timeout.timeout(1.second) { notifications.pop } - error = assert_raises(Async::Cancel) { pool.available_capacity } - assert_equal "Task was cancelled", error.message - assert_equal [ "Async::Cancel" ], reported_errors - assert_raises(Async::Cancel) { pool.metadata } + error = assert_raises(Async::Stop) { pool.available_capacity } + assert_equal [ error.class.name ], reported_errors + assert_raises(Async::Stop) { pool.metadata } ensure SolidQueue.on_thread_error = original_on_thread_error pool&.shutdown From 92a9ccf8848b86f84e31b3fa21d0d9cd6b59b229 Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 16:54:35 +0200 Subject: [PATCH 10/14] Fix async pool compatibility on Ruby 3.1 --- lib/solid_queue/execution_pools/async_pool.rb | 47 +++++-------------- test/unit/worker_test.rb | 34 ++++++++++---- 2 files changed, 35 insertions(+), 46 deletions(-) diff --git a/lib/solid_queue/execution_pools/async_pool.rb b/lib/solid_queue/execution_pools/async_pool.rb index 7e4188658..582c5de91 100644 --- a/lib/solid_queue/execution_pools/async_pool.rb +++ b/lib/solid_queue/execution_pools/async_pool.rb @@ -5,7 +5,7 @@ module ExecutionPools class AsyncPool include AppExecutor - WAKEUP_SIGNAL = ".".b + IDLE_WAIT_INTERVAL = 0.01 class MissingDependencyError < LoadError def initialize(error) @@ -59,7 +59,6 @@ def initialize(size, on_state_change: nil) @fatal_error = nil @boot_queue = Thread::Queue.new @pending_executions = Thread::Queue.new - @wakeup_reader, @wakeup_writer = IO.pipe self.class.ensure_dependency! self.class.ensure_supported_isolation_level! @@ -78,7 +77,6 @@ def post(execution) reserve_capacity! reserved = true pending_executions << execution - signal_reactor rescue Exception restore_capacity if reserved raise @@ -94,13 +92,11 @@ def idle? end def shutdown - should_shutdown = state_mutex.synchronize do + state_mutex.synchronize do next false if @shutdown @shutdown = true end - - signal_reactor if should_shutdown end def shutdown? @@ -120,7 +116,7 @@ def metadata end private - attr_reader :boot_queue, :mutex, :on_state_change, :pending_executions, :reactor_thread, :state_mutex, :wakeup_reader, :wakeup_writer + attr_reader :boot_queue, :mutex, :on_state_change, :pending_executions, :reactor_thread, :state_mutex def name @name ||= "solid_queue-async-pool-#{object_id}" @@ -133,31 +129,23 @@ def start_reactor boot_queue << :ready wait_for_executions(semaphore) - wait_for_child_tasks(task) + wait_for_inflight_executions end rescue Exception => error register_fatal_error(error) raise - ensure - close_wakeup_pipe end end def wait_for_executions(semaphore) loop do - wakeup_reader.wait_readable - clear_wakeup_signal schedule_pending_executions(semaphore) break if shutdown? && pending_executions.empty? - end - end - def clear_wakeup_signal - loop do - wakeup_reader.read_nonblock(1024) - rescue IO::WaitReadable, EOFError - break + # Older async releases don't support waking the reactor from another + # thread reliably, so we cooperatively poll for newly posted work. + sleep(IDLE_WAIT_INTERVAL) if pending_executions.empty? end end @@ -217,25 +205,12 @@ def raise_if_fatal_error! raise error if error end - def signal_reactor - wakeup_writer.write_nonblock(WAKEUP_SIGNAL) - rescue IO::WaitWritable, Errno::EPIPE, IOError - nil + def wait_for_inflight_executions + sleep(IDLE_WAIT_INTERVAL) while executions_in_flight? end - def wait_for_child_tasks(task) - if task.respond_to?(:wait_all) - task.wait_all - else - task.children&.each(&:wait) - end - end - - def close_wakeup_pipe - wakeup_reader.close unless wakeup_reader.closed? - wakeup_writer.close unless wakeup_writer.closed? - rescue IOError - nil + def executions_in_flight? + mutex.synchronize { @available_capacity < size } end end end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index e76d27e74..5548f70b6 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -269,28 +269,42 @@ class WorkerTest < ActiveSupport::TestCase end test "sleeps `10.minutes` if at capacity" do - 3.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + 3.times { |i| StoreResultJob.perform_later(i, pause: 5.seconds) } - @worker.expects(:interruptible_sleep).with(10.minutes).at_least_once - @worker.expects(:interruptible_sleep).with(@worker.polling_interval).never - @worker.expects(:handle_thread_error).never + delays = stub_interruptible_sleep(@worker) @worker.start - sleep 1.second + + first_delay = Timeout.timeout(1.second) { delays.pop } + + assert_equal 10.minutes, first_delay end test "sleeps `polling_interval` if worker not at capacity" do - 2.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + 2.times { |i| StoreResultJob.perform_later(i, pause: 5.seconds) } - @worker.expects(:interruptible_sleep).with(@worker.polling_interval).at_least_once - @worker.expects(:interruptible_sleep).with(10.minutes).never - @worker.expects(:handle_thread_error).never + delays = stub_interruptible_sleep(@worker) @worker.start - sleep 1.second + + first_delay = Timeout.timeout(1.second) { delays.pop } + + assert_equal @worker.polling_interval, first_delay end private + def stub_interruptible_sleep(worker) + delays = Thread::Queue.new + + worker.stubs(:handle_thread_error) + worker.define_singleton_method(:interruptible_sleep) do |delay| + delays << delay + sleep 0.01 + end + + delays + end + def with_worker_execution_support(options, &block) if options[:execution_mode] == :async with_execution_isolation(:fiber, &block) From c6fca1847cc9070fef6cba522da2f3ec06e09b2c Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 17:01:46 +0200 Subject: [PATCH 11/14] Add Ruby 3.1 async IO timeout compatibility --- lib/solid_queue/execution_pools/async_pool.rb | 23 +++++++++++++++++++ test/unit/execution_pools/async_pool_test.rb | 14 +++++++++++ 2 files changed, 37 insertions(+) diff --git a/lib/solid_queue/execution_pools/async_pool.rb b/lib/solid_queue/execution_pools/async_pool.rb index 582c5de91..4b0ed6958 100644 --- a/lib/solid_queue/execution_pools/async_pool.rb +++ b/lib/solid_queue/execution_pools/async_pool.rb @@ -30,6 +30,8 @@ def initialize(level) class << self def ensure_dependency! + ensure_io_timeout_compatibility! + require "async" require "async/semaphore" rescue LoadError => error @@ -45,6 +47,27 @@ def ensure_supported_isolation_level! def supported_isolation_level? ActiveSupport::IsolatedExecutionState.isolation_level == :fiber end + + def ensure_io_timeout_compatibility!(io_class = IO) + unless io_class.method_defined?(:timeout) && io_class.method_defined?(:timeout=) + # Async 2.24, which Ruby 3.1 resolves to, expects Ruby's newer IO + # timeout API to exist on any socket it waits on. Older Rubies don't + # provide it, so give async the minimal accessor interface it needs. + io_class.class_eval do + def timeout + @timeout + end + + def timeout=(value) + @timeout = value + end + end + end + + return if io_class.const_defined?(:TimeoutError, false) + + io_class.const_set(:TimeoutError, Class.new(StandardError)) + end end attr_reader :size diff --git a/test/unit/execution_pools/async_pool_test.rb b/test/unit/execution_pools/async_pool_test.rb index 7723d1272..52b3c36d6 100644 --- a/test/unit/execution_pools/async_pool_test.rb +++ b/test/unit/execution_pools/async_pool_test.rb @@ -44,6 +44,20 @@ def test_raises_a_clear_error_when_isolation_level_is_not_fiber assert_match /isolation_level = :fiber/, error.message end + def test_adds_io_timeout_compatibility_for_older_rubies + io_class = Class.new + + SolidQueue::ExecutionPools::AsyncPool.ensure_io_timeout_compatibility!(io_class) + + io = io_class.new + assert_nil io.timeout + + io.timeout = 1.second + + assert_equal 1.second, io.timeout + assert io_class.const_defined?(:TimeoutError, false) + end + def test_executes_jobs_as_fibers_on_a_single_reactor_thread with_execution_isolation(:fiber) do pool = SolidQueue::ExecutionPools::AsyncPool.new(2) From 302fc68cdfaaae08b56ce3b0a6325e6bb1f9401c Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 17:17:57 +0200 Subject: [PATCH 12/14] Harden async shutdown lifecycle test --- test/integration/async_processes_lifecycle_test.rb | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/test/integration/async_processes_lifecycle_test.rb b/test/integration/async_processes_lifecycle_test.rb index fd284210e..c01e8cf19 100644 --- a/test/integration/async_processes_lifecycle_test.rb +++ b/test/integration/async_processes_lifecycle_test.rb @@ -125,10 +125,15 @@ class AsyncProcessesLifecycleTest < ActiveSupport::TestCase no_pause = enqueue_store_result_job("no pause") pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second) - # Wait for the "no pause" job to complete and the pause job to be claimed. - # This ensures the pause job is actively being processed. + # Wait for the "no pause" job to complete and the pause job to start. + # A claimed execution alone is not enough here because the worker may have + # claimed the job but not yet entered `perform`. wait_for_jobs_to_finish_for(3.seconds, except: pause) - wait_for(timeout: 2.seconds) { SolidQueue::ClaimedExecution.exists?(job_id: SolidQueue::Job.find_by(active_job_id: pause.job_id)&.id) } + wait_for(timeout: 2.seconds) do + skip_active_record_query_cache do + JobResult.where(queue_name: :background, status: "started", value: "pause").exists? + end + end signal_process(@pid, :TERM, wait: 0.2.second) wait_for_jobs_to_finish_for(2.seconds, except: pause) From 98d13dce34ae9d34587cda8bc8eccc2b62379258 Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 17:26:40 +0200 Subject: [PATCH 13/14] Stabilize concurrency controls assertions --- test/integration/concurrency_controls_test.rb | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index a12c48e42..4c281aa90 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -256,6 +256,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase private def assert_stored_sequence(result, sequence) expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join + wait_for(timeout: 1.second) do + skip_active_record_query_cache do + result.reload.status.split(" + ").sort.join == expected + end + end + skip_active_record_query_cache do assert_equal expected, result.reload.status.split(" + ").sort.join end From 305bf4018352e099019f9f24502a18ee4794e64e Mon Sep 17 00:00:00 2001 From: Carmine Paolino Date: Sat, 4 Apr 2026 17:31:59 +0200 Subject: [PATCH 14/14] Relax async lifecycle start wait --- test/integration/async_processes_lifecycle_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/async_processes_lifecycle_test.rb b/test/integration/async_processes_lifecycle_test.rb index c01e8cf19..5eeb172c4 100644 --- a/test/integration/async_processes_lifecycle_test.rb +++ b/test/integration/async_processes_lifecycle_test.rb @@ -129,7 +129,7 @@ class AsyncProcessesLifecycleTest < ActiveSupport::TestCase # A claimed execution alone is not enough here because the worker may have # claimed the job but not yet entered `perform`. wait_for_jobs_to_finish_for(3.seconds, except: pause) - wait_for(timeout: 2.seconds) do + wait_for(timeout: 5.seconds) do skip_active_record_query_cache do JobResult.where(queue_name: :background, status: "started", value: "pause").exists? end