From e08efd627ef367be07ec62b3c54c3fe1fb8aac76 Mon Sep 17 00:00:00 2001 From: Sean Harmer Date: Sat, 4 Apr 2026 11:51:52 +0000 Subject: [PATCH 1/6] Configure Docker-in-Docker feature in devcontainer Set 'moby' option to false for the Docker-in-Docker feature in the devcontainer configuration as Debian no longer supports it. --- .devcontainer/devcontainer.json | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 96f8bcfed..d1372bb0a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -8,7 +8,9 @@ }, // Features to add to the dev container. More info: https://containers.dev/features. "features": { - "ghcr.io/devcontainers/features/docker-in-docker": {}, + "ghcr.io/devcontainers/features/docker-in-docker": { + "moby": false + }, "ghcr.io/devcontainers/features/github-cli:1": { "version": "latest" }, From 226b8e8dbafa75e666d408cac485f37326eec60e Mon Sep 17 00:00:00 2001 From: Sean Harmer Date: Sat, 4 Apr 2026 11:57:31 +0000 Subject: [PATCH 2/6] Add worker concurrency model validation and support Implement validation for worker concurrency settings in the configuration, ensuring that the concurrency model is supported and that fibers are only set when using the fiber model. Update tests to cover new concurrency model behaviors and ensure proper error handling for unsupported configurations until such time as they get implemented. --- lib/solid_queue/configuration.rb | 35 +++++++++++++++++++++ lib/solid_queue/worker.rb | 16 ++++++++-- test/unit/configuration_test.rb | 53 +++++++++++++++++++++++++++++++- test/unit/worker_test.rb | 10 +++++- 4 files changed, 110 insertions(+), 4 deletions(-) diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index e63a000ca..db20a407d 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -6,6 +6,7 @@ class Configuration validate :ensure_configured_processes validate :ensure_valid_recurring_tasks + validate :ensure_valid_worker_concurrency_settings validate :ensure_correctly_sized_thread_pool class Process < Struct.new(:kind, :attributes) @@ -18,9 +19,12 @@ def instantiate queues: "*", threads: 3, processes: 1, + concurrency_model: :thread, polling_interval: 0.1 } + WORKER_CONCURRENCY_MODELS = %w[ thread fiber ].freeze + DISPATCHER_DEFAULTS = { batch_size: 500, polling_interval: 1, @@ -88,6 +92,33 @@ def ensure_valid_recurring_tasks end end + def ensure_valid_worker_concurrency_settings + workers_options.each_with_index do |worker_options, index| + worker_label = "Worker #{index + 1}" + concurrency_model = worker_concurrency_model_for(worker_options) + + unless WORKER_CONCURRENCY_MODELS.include?(concurrency_model) + errors.add(:base, "#{worker_label} has unsupported `concurrency_model: #{concurrency_model}`. Valid options are: thread, fiber") + next + end + + if concurrency_model == "thread" + if worker_options.key?(:fibers) && !worker_options[:fibers].nil? + errors.add(:base, "#{worker_label} cannot set `fibers` unless `concurrency_model` is `fiber`") + end + + next + end + + fibers = Integer(worker_options[:fibers], exception: false) + unless fibers&.positive? + errors.add(:base, "#{worker_label} with `concurrency_model: fiber` must set a positive integer `fibers` value") + end + + errors.add(:base, "#{worker_label} uses `concurrency_model: fiber`, but fiber worker execution is not implemented yet") + end + end + def ensure_correctly_sized_thread_pool if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " + @@ -156,6 +187,10 @@ def workers_options .map { |options| options.dup.symbolize_keys } end + def worker_concurrency_model_for(worker_options) + worker_options.fetch(:concurrency_model, WORKER_DEFAULTS[:concurrency_model]).to_s + end + def dispatchers_options @dispatchers_options ||= processes_config.fetch(:dispatchers, []) .map { |options| options.dup.symbolize_keys } diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index e036a5fd9..de82a06e8 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -4,17 +4,23 @@ module SolidQueue class Worker < Processes::Poller include LifecycleHooks + CONCURRENCY_MODE_NOT_IMPLEMENTED_MESSAGE = "Worker `concurrency_model: fiber` is not implemented yet".freeze + after_boot :run_start_hooks before_shutdown :run_stop_hooks after_shutdown :run_exit_hooks - attr_reader :queues, :pool + attr_reader :queues, :pool, :concurrency_model, :fibers def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) # Ensure that the queues array is deep frozen to prevent accidental modification @queues = Array(options[:queues]).map(&:freeze).freeze + @concurrency_model = options[:concurrency_model].to_s.inquiry + @fibers = options[:fibers] + + ensure_supported_concurrency_model! @pool = Pool.new(options[:threads], on_idle: -> { wake_up }) @@ -22,10 +28,16 @@ def initialize(**options) end def metadata - super.merge(queues: queues.join(","), thread_pool_size: pool.size) + super.merge(queues: queues.join(","), thread_pool_size: pool.size, concurrency_model: concurrency_model.to_s).tap do |metadata| + metadata[:fiber_pool_size] = fibers if fibers.present? + end end private + def ensure_supported_concurrency_model! + raise NotImplementedError, CONCURRENCY_MODE_NOT_IMPLEMENTED_MESSAGE if concurrency_model.fiber? + end + def poll claim_executions.then do |executions| executions.each do |execution| diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 34f69658b..7260f5aba 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -58,7 +58,58 @@ class ConfigurationTest < ActiveSupport::TestCase configuration = SolidQueue::Configuration.new(workers: [ background_worker, background_worker ], skip_recurring: true) assert_processes configuration, :dispatcher, 0 - assert_processes configuration, :worker, 2 + assert_processes configuration, :worker, 2, concurrency_model: :thread + end + + test "workers default to thread concurrency model" do + configuration = SolidQueue::Configuration.new(workers: [ { queues: "background" } ], dispatchers: [], skip_recurring: true) + + assert configuration.valid? + assert_processes configuration, :worker, 1, concurrency_model: :thread + end + + test "thread workers reject fibers setting" do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "background", concurrency_model: :thread, fibers: 10 } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_includes configuration.errors.full_messages, "Worker 1 cannot set `fibers` unless `concurrency_model` is `fiber`" + end + + test "fiber workers require a positive fibers value" do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "background", concurrency_model: :fiber, fibers: 0 } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_includes configuration.errors.full_messages, "Worker 1 with `concurrency_model: fiber` must set a positive integer `fibers` value" + end + + test "fiber workers are rejected until runtime support is implemented" do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "background", concurrency_model: :fiber, fibers: 10 } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_includes configuration.errors.full_messages, "Worker 1 uses `concurrency_model: fiber`, but fiber worker execution is not implemented yet" + end + + test "workers require a supported concurrency model" do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "background", concurrency_model: :green_threads } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_includes configuration.errors.full_messages, "Worker 1 has unsupported `concurrency_model: green_threads`. Valid options are: thread, fiber" end test "mulitple workers with the same configuration" do diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 3d692404b..f59aae76e 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -19,7 +19,7 @@ class WorkerTest < ActiveSupport::TestCase process = SolidQueue::Process.first assert_equal "Worker", process.kind - assert_metadata process, { queues: "background", polling_interval: 0.2, thread_pool_size: 3 } + assert_metadata process, { queues: "background", polling_interval: 0.2, thread_pool_size: 3, concurrency_model: "thread" } end test "errors on polling are passed to on_thread_error and re-raised" do @@ -151,6 +151,14 @@ class WorkerTest < ActiveSupport::TestCase assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count end + test "reject fiber concurrency model until backend support is implemented" do + error = assert_raises(NotImplementedError) do + SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2, concurrency_model: :fiber, fibers: 10) + end + + assert_equal SolidQueue::Worker::CONCURRENCY_MODE_NOT_IMPLEMENTED_MESSAGE, error.message + end + test "terminate on heartbeat when unregistered" do old_heartbeat_interval, SolidQueue.process_heartbeat_interval = SolidQueue.process_heartbeat_interval, 1.second From 4f9a2ac1fe73fb1e964c7d9a2114cdade2751f54 Mon Sep 17 00:00:00 2001 From: Sean Harmer Date: Sat, 4 Apr 2026 12:29:16 +0000 Subject: [PATCH 3/6] Refactor Pool and Worker classes to use ExecutionBackend This commit introduces the ExecutionBackend class to encapsulate execution logic and capacity management. The Pool class now inherits from Worker::ExecutionBackend. Additionally, tests have been added to ensure the correct behavior of available capacity tracking and the execution backend's functionality. A new fiber-aware execution backend will be added in a follow-up commit. --- lib/solid_queue/pool.rb | 33 ++++------ lib/solid_queue/worker.rb | 22 ++++--- lib/solid_queue/worker/execution_backend.rb | 53 +++++++++++++++ test/unit/pool_test.rb | 72 +++++++++++++++++++++ test/unit/worker_test.rb | 7 ++ 5 files changed, 160 insertions(+), 27 deletions(-) create mode 100644 lib/solid_queue/worker/execution_backend.rb create mode 100644 test/unit/pool_test.rb diff --git a/lib/solid_queue/pool.rb b/lib/solid_queue/pool.rb index 9c3d2a298..a62394a95 100644 --- a/lib/solid_queue/pool.rb +++ b/lib/solid_queue/pool.rb @@ -1,17 +1,15 @@ # frozen_string_literal: true module SolidQueue - class Pool - include AppExecutor - - attr_reader :size + class Pool < Worker::ExecutionBackend + alias size capacity delegate :shutdown, :shutdown?, :wait_for_termination, to: :executor - def initialize(size, on_idle: nil) - @size = size - @on_idle = on_idle - @available_threads = Concurrent::AtomicFixnum.new(size) + def initialize(capacity, on_available: nil, on_idle: nil) + super(capacity, on_available: on_available || on_idle) + + @available_threads = Concurrent::AtomicFixnum.new(capacity) @mutex = Mutex.new end @@ -19,27 +17,24 @@ def post(execution) available_threads.decrement Concurrent::Promises.future_on(executor, execution) do |thread_execution| - wrap_in_app_executor do - thread_execution.perform - ensure + perform(thread_execution) + ensure available_threads.increment - mutex.synchronize { on_idle.try(:call) if idle? } - end + mutex.synchronize { notify_available } end.on_rejection! do |e| handle_thread_error(e) end end - def idle_threads + def available_capacity available_threads.value end - def idle? - idle_threads > 0 - end + alias idle_threads available_capacity + alias idle? available? private - attr_reader :available_threads, :on_idle, :mutex + attr_reader :available_threads, :mutex DEFAULT_OPTIONS = { min_threads: 0, @@ -48,7 +43,7 @@ def idle? } def executor - @executor ||= Concurrent::ThreadPoolExecutor.new DEFAULT_OPTIONS.merge(max_threads: size, max_queue: size) + @executor ||= Concurrent::ThreadPoolExecutor.new DEFAULT_OPTIONS.merge(max_threads: capacity, max_queue: capacity) end end end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index de82a06e8..c45a66955 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -10,7 +10,9 @@ class Worker < Processes::Poller before_shutdown :run_stop_hooks after_shutdown :run_exit_hooks - attr_reader :queues, :pool, :concurrency_model, :fibers + attr_reader :queues, :execution_backend, :concurrency_model, :fibers + + alias pool execution_backend def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) @@ -22,13 +24,13 @@ def initialize(**options) ensure_supported_concurrency_model! - @pool = Pool.new(options[:threads], on_idle: -> { wake_up }) + @execution_backend = build_execution_backend(options) super(**options) end def metadata - super.merge(queues: queues.join(","), thread_pool_size: pool.size, concurrency_model: concurrency_model.to_s).tap do |metadata| + super.merge(queues: queues.join(","), thread_pool_size: execution_backend.capacity, concurrency_model: concurrency_model.to_s).tap do |metadata| metadata[:fiber_pool_size] = fibers if fibers.present? end end @@ -38,25 +40,29 @@ def ensure_supported_concurrency_model! raise NotImplementedError, CONCURRENCY_MODE_NOT_IMPLEMENTED_MESSAGE if concurrency_model.fiber? end + def build_execution_backend(options) + Pool.new(options[:threads], on_available: -> { wake_up }) + end + def poll claim_executions.then do |executions| executions.each do |execution| - pool.post(execution) + execution_backend.post(execution) end - pool.idle? ? polling_interval : 10.minutes + execution_backend.available? ? polling_interval : 10.minutes end end def claim_executions with_polling_volume do - SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id) + SolidQueue::ReadyExecution.claim(queues, execution_backend.available_capacity, process_id) end end def shutdown - pool.shutdown - pool.wait_for_termination(SolidQueue.shutdown_timeout) + execution_backend.shutdown + execution_backend.wait_for_termination(SolidQueue.shutdown_timeout) super end diff --git a/lib/solid_queue/worker/execution_backend.rb b/lib/solid_queue/worker/execution_backend.rb new file mode 100644 index 000000000..ce3056ca6 --- /dev/null +++ b/lib/solid_queue/worker/execution_backend.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +module SolidQueue + class Worker < Processes::Poller + class ExecutionBackend + include AppExecutor + + attr_reader :capacity + + def initialize(capacity, on_available: nil) + @capacity = capacity + @on_available = on_available + end + + def post(_execution) + raise NotImplementedError + end + + def available_capacity + raise NotImplementedError + end + + def available? + available_capacity.positive? + end + + def shutdown + raise NotImplementedError + end + + def shutdown? + raise NotImplementedError + end + + def wait_for_termination(_timeout) + raise NotImplementedError + end + + private + attr_reader :on_available + + def perform(execution) + wrap_in_app_executor do + execution.perform + end + end + + def notify_available + on_available.try(:call) if available? + end + end + end +end diff --git a/test/unit/pool_test.rb b/test/unit/pool_test.rb new file mode 100644 index 000000000..b25095ff4 --- /dev/null +++ b/test/unit/pool_test.rb @@ -0,0 +1,72 @@ +require "test_helper" + +class PoolTest < ActiveSupport::TestCase + test "tracks available capacity while work is in flight" do + started = Queue.new + release = Queue.new + + execution = Struct.new(:started, :release) do + def perform + started << true + release.pop + end + end.new(started, release) + + pool = SolidQueue::Pool.new(1) + + assert_equal 1, pool.capacity + assert_equal 1, pool.available_capacity + assert_equal 1, pool.idle_threads + assert pool.available? + assert pool.idle? + + pool.post(execution) + started.pop + + wait_for(timeout: 1.second) { pool.available_capacity.zero? } + + assert_equal 0, pool.available_capacity + assert_equal 0, pool.idle_threads + assert_not pool.available? + assert_not pool.idle? + + release << true + + wait_for(timeout: 1.second) { pool.available_capacity == 1 } + + assert_equal 1, pool.available_capacity + assert_equal 1, pool.idle_threads + assert pool.available? + assert pool.idle? + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + + test "calls on_available when capacity is restored" do + started = Queue.new + release = Queue.new + available = Queue.new + + execution = Struct.new(:started, :release) do + def perform + started << true + release.pop + end + end.new(started, release) + + pool = SolidQueue::Pool.new(1, on_available: -> { available << true }) + + pool.post(execution) + started.pop + wait_for(timeout: 1.second) { pool.available_capacity.zero? } + + release << true + + Timeout.timeout(1.second) { available.pop } + wait_for(timeout: 1.second) { pool.available_capacity == 1 } + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end +end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index f59aae76e..40b5f432b 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -22,6 +22,13 @@ class WorkerTest < ActiveSupport::TestCase assert_metadata process, { queues: "background", polling_interval: 0.2, thread_pool_size: 3, concurrency_model: "thread" } end + test "worker exposes a generic execution backend" do + assert_equal @worker.pool, @worker.execution_backend + assert_equal 3, @worker.execution_backend.capacity + assert_equal 3, @worker.execution_backend.available_capacity + assert @worker.execution_backend.available? + end + test "errors on polling are passed to on_thread_error and re-raised" do errors = Concurrent::Array.new From c313038c6bf70269876b473206a447b98f6c5317 Mon Sep 17 00:00:00 2001 From: Sean Harmer Date: Sat, 4 Apr 2026 13:12:55 +0000 Subject: [PATCH 4/6] Implement fiber concurrency model with FiberPool execution backend Add support for fiber-based concurrency in the SolidQueue worker by introducing a new FiberPool class. Update the worker and configuration to handle fiber settings, ensuring proper validation and metadata reporting. Modify tests to validate the new functionality and ensure compatibility with existing features. We make no attempt to find the least busy Runner (thread) within a FibrePool yet. Work is simply distributed to the next Runner in sequence. --- Gemfile.lock | 18 ++++ lib/solid_queue/configuration.rb | 2 - lib/solid_queue/fiber_pool.rb | 147 +++++++++++++++++++++++++++++++ lib/solid_queue/worker.rb | 18 ++-- solid_queue.gemspec | 1 + test/unit/configuration_test.rb | 6 +- test/unit/fiber_pool_test.rb | 87 ++++++++++++++++++ test/unit/worker_test.rb | 64 +++++++++++++- 8 files changed, 328 insertions(+), 15 deletions(-) create mode 100644 lib/solid_queue/fiber_pool.rb create mode 100644 test/unit/fiber_pool_test.rb diff --git a/Gemfile.lock b/Gemfile.lock index 7c4662de5..1660d7320 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,7 @@ PATH solid_queue (1.4.0) activejob (>= 7.1) activerecord (>= 7.1) + async (>= 2.24, < 3) concurrent-ruby (>= 1.3.1) fugit (~> 1.11) railties (>= 7.1) @@ -55,12 +56,22 @@ GEM rake thor (>= 0.14.0) ast (2.4.2) + async (2.38.1) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.18) base64 (0.3.0) benchmark (0.4.1) bigdecimal (3.3.1) builder (3.3.0) concurrent-ruby (1.3.5) connection_pool (2.5.4) + console (1.34.3) + fiber-annotation + fiber-local (~> 1.1) + json crass (1.0.6) date (3.4.1) debug (1.9.2) @@ -70,6 +81,10 @@ GEM erubi (1.13.1) et-orbi (1.2.11) tzinfo + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.1) fugit (1.11.1) et-orbi (~> 1, >= 1.2.11) raabro (~> 1.4) @@ -78,6 +93,7 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-event (1.14.5) irb (1.14.3) rdoc (>= 4.0.0) reline (>= 0.4.2) @@ -87,6 +103,7 @@ GEM loofah (2.23.1) crass (~> 1.0.2) nokogiri (>= 1.12.0) + metrics (0.15.0) minitest (5.26.0) mocha (2.1.0) ruby2_keywords (>= 0.0.5) @@ -181,6 +198,7 @@ GEM stringio (3.1.2) thor (1.3.2) timeout (0.4.3) + traces (0.18.2) tzinfo (2.0.6) concurrent-ruby (~> 1.0) unicode-display_width (3.1.3) diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index db20a407d..03649af2a 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -114,8 +114,6 @@ def ensure_valid_worker_concurrency_settings unless fibers&.positive? errors.add(:base, "#{worker_label} with `concurrency_model: fiber` must set a positive integer `fibers` value") end - - errors.add(:base, "#{worker_label} uses `concurrency_model: fiber`, but fiber worker execution is not implemented yet") end end diff --git a/lib/solid_queue/fiber_pool.rb b/lib/solid_queue/fiber_pool.rb new file mode 100644 index 000000000..afd81b20d --- /dev/null +++ b/lib/solid_queue/fiber_pool.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +require "async" +require "async/barrier" +require "async/queue" +require "async/semaphore" + +module SolidQueue + class FiberPool < Worker::ExecutionBackend + ISOLATION_MUTEX = Mutex.new + + class Runner + def initialize(index, fibers, name:, on_execution:) + @queue = Async::Queue.new + @booted = Thread::Queue.new + + @thread = Thread.new do + Thread.current.name = "#{name}-fiber-#{index}" + + Sync do |task| + begin + barrier = Async::Barrier.new(parent: task) + semaphore = Async::Semaphore.new(fibers, parent: task) + execution_state = ActiveSupport::IsolatedExecutionState.context + + @booted << true + + while (execution = queue.dequeue) + barrier.async(parent: semaphore) do + ActiveSupport::IsolatedExecutionState.share_with(execution_state) + on_execution.call(execution) + end + end + + barrier.wait + ensure + barrier&.cancel + end + end + end + + @booted.pop + end + + def enqueue(execution) + queue.enqueue(execution) + end + + def shutdown + queue.close + end + + def wait(timeout) + thread.join(timeout) + end + + private + attr_reader :queue, :thread + end + + attr_reader :threads, :fibers + + def initialize(threads, fibers, on_available: nil, on_idle: nil, name: "worker") + ensure_fiber_isolation! + + @threads = threads + @fibers = fibers + + super(threads * fibers, on_available: on_available || on_idle) + + @available_slots = Concurrent::AtomicFixnum.new(capacity) + @next_runner = Concurrent::AtomicFixnum.new(0) + @mutex = Mutex.new + @shutdown = false + + @runners = Array.new(threads) do |index| + Runner.new(index, fibers, name: name, on_execution: method(:run_execution)) + end + end + + def post(execution) + raise Concurrent::RejectedExecutionError, "fiber pool is shut down" if shutdown? + + available_slots.decrement + next_runner.enqueue(execution) + rescue Exception + available_slots.increment if available_slots.value < capacity + raise + end + + def available_capacity + available_slots.value + end + + def shutdown + mutex.synchronize do + return if @shutdown + + @shutdown = true + runners.each(&:shutdown) + end + end + + def shutdown? + @shutdown + end + + def wait_for_termination(timeout) + deadline = Concurrent.monotonic_time + timeout.to_f + + runners.all? do |runner| + remaining = deadline - Concurrent.monotonic_time + break false if remaining <= 0 + + runner.wait(remaining) + end + end + + private + attr_reader :available_slots, :mutex, :runners + + def ensure_fiber_isolation! + ISOLATION_MUTEX.synchronize do + return if ActiveSupport::IsolatedExecutionState.isolation_level == :fiber + + ActiveSupport::IsolatedExecutionState.isolation_level = :fiber + end + end + + def next_runner + runners[(next_runner_index.increment - 1) % runners.size] + end + + def next_runner_index + @next_runner + end + + def run_execution(execution) + perform(execution) + rescue Exception => error + handle_thread_error(error) + ensure + available_slots.increment + mutex.synchronize { notify_available } + end + end +end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index c45a66955..d83ca3c12 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -4,13 +4,11 @@ module SolidQueue class Worker < Processes::Poller include LifecycleHooks - CONCURRENCY_MODE_NOT_IMPLEMENTED_MESSAGE = "Worker `concurrency_model: fiber` is not implemented yet".freeze - after_boot :run_start_hooks before_shutdown :run_stop_hooks after_shutdown :run_exit_hooks - attr_reader :queues, :execution_backend, :concurrency_model, :fibers + attr_reader :queues, :execution_backend, :concurrency_model, :fibers, :threads alias pool execution_backend @@ -19,6 +17,7 @@ def initialize(**options) # Ensure that the queues array is deep frozen to prevent accidental modification @queues = Array(options[:queues]).map(&:freeze).freeze + @threads = options[:threads] @concurrency_model = options[:concurrency_model].to_s.inquiry @fibers = options[:fibers] @@ -30,18 +29,25 @@ def initialize(**options) end def metadata - super.merge(queues: queues.join(","), thread_pool_size: execution_backend.capacity, concurrency_model: concurrency_model.to_s).tap do |metadata| + super.merge(queues: queues.join(","), thread_pool_size: threads, concurrency_model: concurrency_model.to_s).tap do |metadata| metadata[:fiber_pool_size] = fibers if fibers.present? + metadata[:execution_capacity] = execution_backend.capacity if concurrency_model.fiber? end end private def ensure_supported_concurrency_model! - raise NotImplementedError, CONCURRENCY_MODE_NOT_IMPLEMENTED_MESSAGE if concurrency_model.fiber? + return if concurrency_model.thread? || concurrency_model.fiber? + + raise ArgumentError, "Unsupported worker concurrency model: #{concurrency_model}" end def build_execution_backend(options) - Pool.new(options[:threads], on_available: -> { wake_up }) + if concurrency_model.fiber? + FiberPool.new(options[:threads], options[:fibers], on_available: -> { wake_up }, name: name) + else + Pool.new(options[:threads], on_available: -> { wake_up }) + end end def poll diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 3a5c9693e..f1c494367 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -28,6 +28,7 @@ Gem::Specification.new do |spec| spec.add_dependency "activerecord", rails_version spec.add_dependency "activejob", rails_version spec.add_dependency "railties", rails_version + spec.add_dependency "async", ">= 2.24", "< 3" spec.add_dependency "concurrent-ruby", ">= 1.3.1" spec.add_dependency "fugit", "~> 1.11" spec.add_dependency "thor", ">= 1.3.1" diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 7260f5aba..ac320de53 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -90,15 +90,15 @@ class ConfigurationTest < ActiveSupport::TestCase assert_includes configuration.errors.full_messages, "Worker 1 with `concurrency_model: fiber` must set a positive integer `fibers` value" end - test "fiber workers are rejected until runtime support is implemented" do + test "fiber workers are valid with a positive fibers value" do configuration = SolidQueue::Configuration.new( workers: [ { queues: "background", concurrency_model: :fiber, fibers: 10 } ], dispatchers: [], skip_recurring: true ) - assert_not configuration.valid? - assert_includes configuration.errors.full_messages, "Worker 1 uses `concurrency_model: fiber`, but fiber worker execution is not implemented yet" + assert configuration.valid? + assert_processes configuration, :worker, 1, concurrency_model: :fiber, fibers: 10 end test "workers require a supported concurrency model" do diff --git a/test/unit/fiber_pool_test.rb b/test/unit/fiber_pool_test.rb new file mode 100644 index 000000000..d68749f62 --- /dev/null +++ b/test/unit/fiber_pool_test.rb @@ -0,0 +1,87 @@ +require "test_helper" + +class FiberPoolTest < ActiveSupport::TestCase + test "tracks available capacity across fibers" do + started = Queue.new + release = Queue.new + + execution = Struct.new(:started, :release) do + def perform + started << true + release.pop + end + end + + pool = SolidQueue::FiberPool.new(1, 2) + + 2.times { pool.post(execution.new(started, release)) } + 2.times { started.pop } + + wait_for(timeout: 1.second) { pool.available_capacity.zero? } + + assert_equal 2, pool.capacity + assert_equal 0, pool.available_capacity + assert_not pool.available? + + 2.times { release << true } + + wait_for(timeout: 1.second) { pool.available_capacity == 2 } + + assert_equal 2, pool.available_capacity + assert pool.available? + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + + test "runs sleeping work concurrently within a single thread" do + finished = Queue.new + + execution = Struct.new(:finished) do + def perform + sleep 0.2 + finished << true + end + end + + pool = SolidQueue::FiberPool.new(1, 2) + started_at = Concurrent.monotonic_time + + 2.times { pool.post(execution.new(finished)) } + + 2.times { Timeout.timeout(1.second) { finished.pop } } + elapsed = Concurrent.monotonic_time - started_at + + assert_operator elapsed, :<, 0.35 + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + + test "calls on_available when fiber capacity is restored" do + started = Queue.new + release = Queue.new + available = Queue.new + + execution = Struct.new(:started, :release) do + def perform + started << true + release.pop + end + end + + pool = SolidQueue::FiberPool.new(1, 1, on_available: -> { available << true }) + + pool.post(execution.new(started, release)) + started.pop + wait_for(timeout: 1.second) { pool.available_capacity.zero? } + + release << true + + Timeout.timeout(1.second) { available.pop } + wait_for(timeout: 1.second) { pool.available_capacity == 1 } + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end +end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 40b5f432b..dc9ecf4ed 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -29,6 +29,24 @@ class WorkerTest < ActiveSupport::TestCase assert @worker.execution_backend.available? end + test "fiber workers use a fiber-backed execution backend" do + worker = SolidQueue::Worker.new( + queues: "background", + threads: 2, + fibers: 4, + concurrency_model: :fiber, + polling_interval: 0.2 + ) + + assert_instance_of SolidQueue::FiberPool, worker.execution_backend + assert_equal 8, worker.execution_backend.capacity + assert_equal 2, worker.metadata[:thread_pool_size] + assert_equal 4, worker.metadata[:fiber_pool_size] + assert_equal 8, worker.metadata[:execution_capacity] + ensure + worker&.stop + end + test "errors on polling are passed to on_thread_error and re-raised" do errors = Concurrent::Array.new @@ -158,12 +176,50 @@ class WorkerTest < ActiveSupport::TestCase assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count end - test "reject fiber concurrency model until backend support is implemented" do - error = assert_raises(NotImplementedError) do - SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2, concurrency_model: :fiber, fibers: 10) + test "fiber workers claim one slot per thread-fiber combination" do + worker = SolidQueue::Worker.new( + queues: "background", + threads: 2, + fibers: 3, + concurrency_model: :fiber, + polling_interval: 0.2 + ) + + SolidQueue::ReadyExecution.expects(:claim).with(worker.queues, 6, nil).returns([]) + + worker.send(:claim_executions) + ensure + worker&.stop + end + + test "fiber workers can start more than one sleeping job per thread" do + worker = SolidQueue::Worker.new( + queues: "background", + threads: 1, + fibers: 2, + concurrency_model: :fiber, + polling_interval: 0.01 + ) + + 2.times { StoreResultJob.perform_later("fiber-overlap", pause: 0.3.seconds) } + + worker.start + + wait_for(timeout: 0.25.second) do + skip_active_record_query_cache do + JobResult.where(queue_name: :background, status: "started", value: "fiber-overlap").count == 2 + end end - assert_equal SolidQueue::Worker::CONCURRENCY_MODE_NOT_IMPLEMENTED_MESSAGE, error.message + skip_active_record_query_cache do + assert_operator JobResult.where(queue_name: :background, status: "completed", value: "fiber-overlap").count, :<, 2 + end + + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal 2, JobResult.where(queue_name: :background, status: "completed", value: "fiber-overlap").count + ensure + worker&.stop end test "terminate on heartbeat when unregistered" do From ba775238244defd594f1f6289418561ebcdc8c6e Mon Sep 17 00:00:00 2001 From: Sean Harmer Date: Sat, 4 Apr 2026 14:03:34 +0000 Subject: [PATCH 5/6] Add tests for shutdown timeout behavior in FiberPool Implement a test to verify that the FiberPool correctly returns false when in-flight work exceeds the shutdown timeout. This ensures that the pool behaves as expected under concurrent conditions and maintains accurate capacity tracking. We do not try to cancel in-flight jobs. This is the same approach as the existing thread-pool based backend. Solid queue already manages orphaned jobs at the process level. --- test/unit/fiber_pool_test.rb | 46 ++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/test/unit/fiber_pool_test.rb b/test/unit/fiber_pool_test.rb index d68749f62..5ad9cfcf0 100644 --- a/test/unit/fiber_pool_test.rb +++ b/test/unit/fiber_pool_test.rb @@ -1,6 +1,14 @@ require "test_helper" class FiberPoolTest < ActiveSupport::TestCase + setup do + @original_on_thread_error = SolidQueue.on_thread_error + end + + teardown do + SolidQueue.on_thread_error = @original_on_thread_error + end + test "tracks available capacity across fibers" do started = Queue.new release = Queue.new @@ -84,4 +92,42 @@ def perform pool&.shutdown pool&.wait_for_termination(1.second) end + + test "returns false when in-flight work exceeds the shutdown timeout" do + started = Queue.new + release = Queue.new + finished = Queue.new + errors = Queue.new + + SolidQueue.on_thread_error = ->(error) { errors << error } + + execution = Struct.new(:started, :release, :finished) do + def perform + started << true + release.pop + finished << true + end + end + + pool = SolidQueue::FiberPool.new(1, 1) + + pool.post(execution.new(started, release, finished)) + started.pop + + pool.shutdown + + assert_not pool.wait_for_termination(0.05) + assert_nil finished.pop(true) rescue ThreadError + assert_equal 0, pool.available_capacity + + release << true + + Timeout.timeout(1.second) { finished.pop } + assert pool.wait_for_termination(1.second) + assert_equal 1, pool.available_capacity + assert_nil errors.pop(true) rescue ThreadError + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end end From b5d4bca217cf2d21f089d7ddbb11574f36bc2bf9 Mon Sep 17 00:00:00 2001 From: Sean Harmer Date: Sat, 4 Apr 2026 14:13:52 +0000 Subject: [PATCH 6/6] Update README to include fiber concurrency model details --- README.md | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index f77c4d5a1..69e8fff69 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite, - [Optional scheduler configuration](#optional-scheduler-configuration) - [Queue order and priorities](#queue-order-and-priorities) - [Queues specification and performance](#queues-specification-and-performance) - - [Threads, processes, and signals](#threads-processes-and-signals) + - [Threads, fibers, processes, and signals](#threads-fibers-processes-and-signals) - [Database configuration](#database-configuration) - [Other configuration settings](#other-configuration-settings) - [Lifecycle hooks](#lifecycle-hooks) @@ -229,6 +229,11 @@ production: threads: 5 polling_interval: 0.1 processes: 3 + - queues: llm + threads: 2 + concurrency_model: fiber + fibers: 50 + polling_interval: 0.1 scheduler: dynamic_tasks_enabled: true polling_interval: 5 @@ -264,15 +269,18 @@ Here's an overview of the different options: ``` This will create a worker fetching jobs from all queues starting with `staging`. The wildcard `*` is only allowed on its own or at the end of a queue name; you can't specify queue names such as `*_some_queue`. These will be ignored. - + Also, if a wildcard (*) is included alongside explicit queue names, for example: `queues: [default, backend, *]`, then it would behave like `queues: *` Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names. Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance). -- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting. -It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker thread uses one connection, and two additional connections are reserved for polling and heartbeat. +- `threads`: this is the number of worker threads for each worker. In the default `thread` concurrency model, it is also the worker's maximum execution capacity, as each worker will fetch up to this many jobs and post them to the thread pool. In the `fiber` concurrency model, these are the long-lived runner threads that host Async reactors, and total execution capacity becomes `threads * fibers`. By default, this is `3`. Only workers have this setting. +- `concurrency_model`: the execution strategy for each worker. It can be `thread` or `fiber`, and defaults to `thread`. `thread` preserves the existing thread-pool behaviour. `fiber` runs jobs on Async fibers inside each worker thread and is intended for scheduler-friendly, IO-bound workloads. +- `fibers`: the maximum number of concurrent fibers per worker thread when `concurrency_model: fiber`. This setting is required when using the `fiber` concurrency model, so a worker configured with `threads: 2` and `fibers: 50` can execute up to `100` jobs concurrently. Please be aware that if you configure a large `capacity = threads * fibers`, you should also check your database server's maximum client connections setting. For example a default MySQL installation set this to 150 concurrent client connections. +- When you use `concurrency_model: fiber`, Solid Queue switches `ActiveSupport::IsolatedExecutionState.isolation_level` to `:fiber` before the worker runs jobs so Rails execution state remains fiber-local. If your application sets this value manually, keep it at `:fiber` anywhere fiber workers run. +- Fiber workers can drive much higher job concurrency than thread workers, so review the connection pools for any databases those jobs use. In particular, size the queue and application database pools for the amount of concurrent job work your fiber workers can create. - `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#fork-vs-async-mode). - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. @@ -362,9 +370,21 @@ queues: back* ``` -### Threads, processes, and signals +### Threads, fibers, processes, and signals + +Solid Queue now has three concurrency layers for workers: + +- Processes: configured with `processes`, or by running multiple `bin/jobs` instances across machines. +- Threads: configured with `threads` for each worker. +- Fibers: enabled per worker with `concurrency_model: fiber` and sized with `fibers`. + +In the default `thread` concurrency model, a worker runs jobs directly on its thread pool, so total execution capacity equals `threads`. In the `fiber` concurrency model, each worker thread hosts an Async reactor and can run up to `fibers` jobs concurrently, so total execution capacity equals `threads * fibers`. + +Fiber mode is best suited to IO-bound jobs that spend most of their time waiting on network or other scheduler-friendly operations. CPU-bound jobs will not benefit from fiber mode and should continue using the default thread execution model. + +This worker-level concurrency model is independent from the supervisor `fork` vs `async` mode described below. `fork` vs `async` decides how Solid Queue process instances are supervised. `thread` vs `fiber` decides how each worker executes jobs locally. -Workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling. +Fiber workers require Rails execution state isolation to be fiber-local. Solid Queue sets `ActiveSupport::IsolatedExecutionState.isolation_level = :fiber` automatically when a fiber worker boots. If your application configures this manually, do not switch it back to `:thread` in processes that run fiber workers. The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode: - `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit. @@ -781,7 +801,7 @@ SolidQueue.unschedule_recurring_task("my_dynamic_task") Only dynamic tasks can be unscheduled at runtime. Attempting to unschedule a static task (defined in `config/recurring.yml`) will raise an `ActiveRecord::RecordNotFound` error. -Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them. +Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them. ## Inspiration