diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 96f8bcfed..d1372bb0a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -8,7 +8,9 @@ }, // Features to add to the dev container. More info: https://containers.dev/features. "features": { - "ghcr.io/devcontainers/features/docker-in-docker": {}, + "ghcr.io/devcontainers/features/docker-in-docker": { + "moby": false + }, "ghcr.io/devcontainers/features/github-cli:1": { "version": "latest" }, diff --git a/Gemfile.lock b/Gemfile.lock index 7c4662de5..1660d7320 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,7 @@ PATH solid_queue (1.4.0) activejob (>= 7.1) activerecord (>= 7.1) + async (>= 2.24, < 3) concurrent-ruby (>= 1.3.1) fugit (~> 1.11) railties (>= 7.1) @@ -55,12 +56,22 @@ GEM rake thor (>= 0.14.0) ast (2.4.2) + async (2.38.1) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.18) base64 (0.3.0) benchmark (0.4.1) bigdecimal (3.3.1) builder (3.3.0) concurrent-ruby (1.3.5) connection_pool (2.5.4) + console (1.34.3) + fiber-annotation + fiber-local (~> 1.1) + json crass (1.0.6) date (3.4.1) debug (1.9.2) @@ -70,6 +81,10 @@ GEM erubi (1.13.1) et-orbi (1.2.11) tzinfo + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.1) fugit (1.11.1) et-orbi (~> 1, >= 1.2.11) raabro (~> 1.4) @@ -78,6 +93,7 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-event (1.14.5) irb (1.14.3) rdoc (>= 4.0.0) reline (>= 0.4.2) @@ -87,6 +103,7 @@ GEM loofah (2.23.1) crass (~> 1.0.2) nokogiri (>= 1.12.0) + metrics (0.15.0) minitest (5.26.0) mocha (2.1.0) ruby2_keywords (>= 0.0.5) @@ -181,6 +198,7 @@ GEM stringio (3.1.2) thor (1.3.2) timeout (0.4.3) + traces (0.18.2) tzinfo (2.0.6) concurrent-ruby (~> 1.0) unicode-display_width (3.1.3) diff --git a/README.md b/README.md index f77c4d5a1..69e8fff69 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite, - [Optional scheduler configuration](#optional-scheduler-configuration) - [Queue order and priorities](#queue-order-and-priorities) - [Queues specification and performance](#queues-specification-and-performance) - - [Threads, processes, and signals](#threads-processes-and-signals) + - [Threads, fibers, processes, and signals](#threads-fibers-processes-and-signals) - [Database configuration](#database-configuration) - [Other configuration settings](#other-configuration-settings) - [Lifecycle hooks](#lifecycle-hooks) @@ -229,6 +229,11 @@ production: threads: 5 polling_interval: 0.1 processes: 3 + - queues: llm + threads: 2 + concurrency_model: fiber + fibers: 50 + polling_interval: 0.1 scheduler: dynamic_tasks_enabled: true polling_interval: 5 @@ -264,15 +269,18 @@ Here's an overview of the different options: ``` This will create a worker fetching jobs from all queues starting with `staging`. The wildcard `*` is only allowed on its own or at the end of a queue name; you can't specify queue names such as `*_some_queue`. These will be ignored. - + Also, if a wildcard (*) is included alongside explicit queue names, for example: `queues: [default, backend, *]`, then it would behave like `queues: *` Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names. Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance). -- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting. -It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker thread uses one connection, and two additional connections are reserved for polling and heartbeat. +- `threads`: this is the number of worker threads for each worker. In the default `thread` concurrency model, it is also the worker's maximum execution capacity, as each worker will fetch up to this many jobs and post them to the thread pool. In the `fiber` concurrency model, these are the long-lived runner threads that host Async reactors, and total execution capacity becomes `threads * fibers`. By default, this is `3`. Only workers have this setting. +- `concurrency_model`: the execution strategy for each worker. It can be `thread` or `fiber`, and defaults to `thread`. `thread` preserves the existing thread-pool behaviour. `fiber` runs jobs on Async fibers inside each worker thread and is intended for scheduler-friendly, IO-bound workloads. +- `fibers`: the maximum number of concurrent fibers per worker thread when `concurrency_model: fiber`. This setting is required when using the `fiber` concurrency model, so a worker configured with `threads: 2` and `fibers: 50` can execute up to `100` jobs concurrently. Please be aware that if you configure a large `capacity = threads * fibers`, you should also check your database server's maximum client connections setting. For example a default MySQL installation set this to 150 concurrent client connections. +- When you use `concurrency_model: fiber`, Solid Queue switches `ActiveSupport::IsolatedExecutionState.isolation_level` to `:fiber` before the worker runs jobs so Rails execution state remains fiber-local. If your application sets this value manually, keep it at `:fiber` anywhere fiber workers run. +- Fiber workers can drive much higher job concurrency than thread workers, so review the connection pools for any databases those jobs use. In particular, size the queue and application database pools for the amount of concurrent job work your fiber workers can create. - `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#fork-vs-async-mode). - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. @@ -362,9 +370,21 @@ queues: back* ``` -### Threads, processes, and signals +### Threads, fibers, processes, and signals + +Solid Queue now has three concurrency layers for workers: + +- Processes: configured with `processes`, or by running multiple `bin/jobs` instances across machines. +- Threads: configured with `threads` for each worker. +- Fibers: enabled per worker with `concurrency_model: fiber` and sized with `fibers`. + +In the default `thread` concurrency model, a worker runs jobs directly on its thread pool, so total execution capacity equals `threads`. In the `fiber` concurrency model, each worker thread hosts an Async reactor and can run up to `fibers` jobs concurrently, so total execution capacity equals `threads * fibers`. + +Fiber mode is best suited to IO-bound jobs that spend most of their time waiting on network or other scheduler-friendly operations. CPU-bound jobs will not benefit from fiber mode and should continue using the default thread execution model. + +This worker-level concurrency model is independent from the supervisor `fork` vs `async` mode described below. `fork` vs `async` decides how Solid Queue process instances are supervised. `thread` vs `fiber` decides how each worker executes jobs locally. -Workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling. +Fiber workers require Rails execution state isolation to be fiber-local. Solid Queue sets `ActiveSupport::IsolatedExecutionState.isolation_level = :fiber` automatically when a fiber worker boots. If your application configures this manually, do not switch it back to `:thread` in processes that run fiber workers. The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode: - `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit. @@ -781,7 +801,7 @@ SolidQueue.unschedule_recurring_task("my_dynamic_task") Only dynamic tasks can be unscheduled at runtime. Attempting to unschedule a static task (defined in `config/recurring.yml`) will raise an `ActiveRecord::RecordNotFound` error. -Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them. +Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them. ## Inspiration diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index e63a000ca..03649af2a 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -6,6 +6,7 @@ class Configuration validate :ensure_configured_processes validate :ensure_valid_recurring_tasks + validate :ensure_valid_worker_concurrency_settings validate :ensure_correctly_sized_thread_pool class Process < Struct.new(:kind, :attributes) @@ -18,9 +19,12 @@ def instantiate queues: "*", threads: 3, processes: 1, + concurrency_model: :thread, polling_interval: 0.1 } + WORKER_CONCURRENCY_MODELS = %w[ thread fiber ].freeze + DISPATCHER_DEFAULTS = { batch_size: 500, polling_interval: 1, @@ -88,6 +92,31 @@ def ensure_valid_recurring_tasks end end + def ensure_valid_worker_concurrency_settings + workers_options.each_with_index do |worker_options, index| + worker_label = "Worker #{index + 1}" + concurrency_model = worker_concurrency_model_for(worker_options) + + unless WORKER_CONCURRENCY_MODELS.include?(concurrency_model) + errors.add(:base, "#{worker_label} has unsupported `concurrency_model: #{concurrency_model}`. Valid options are: thread, fiber") + next + end + + if concurrency_model == "thread" + if worker_options.key?(:fibers) && !worker_options[:fibers].nil? + errors.add(:base, "#{worker_label} cannot set `fibers` unless `concurrency_model` is `fiber`") + end + + next + end + + fibers = Integer(worker_options[:fibers], exception: false) + unless fibers&.positive? + errors.add(:base, "#{worker_label} with `concurrency_model: fiber` must set a positive integer `fibers` value") + end + end + end + def ensure_correctly_sized_thread_pool if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " + @@ -156,6 +185,10 @@ def workers_options .map { |options| options.dup.symbolize_keys } end + def worker_concurrency_model_for(worker_options) + worker_options.fetch(:concurrency_model, WORKER_DEFAULTS[:concurrency_model]).to_s + end + def dispatchers_options @dispatchers_options ||= processes_config.fetch(:dispatchers, []) .map { |options| options.dup.symbolize_keys } diff --git a/lib/solid_queue/fiber_pool.rb b/lib/solid_queue/fiber_pool.rb new file mode 100644 index 000000000..afd81b20d --- /dev/null +++ b/lib/solid_queue/fiber_pool.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +require "async" +require "async/barrier" +require "async/queue" +require "async/semaphore" + +module SolidQueue + class FiberPool < Worker::ExecutionBackend + ISOLATION_MUTEX = Mutex.new + + class Runner + def initialize(index, fibers, name:, on_execution:) + @queue = Async::Queue.new + @booted = Thread::Queue.new + + @thread = Thread.new do + Thread.current.name = "#{name}-fiber-#{index}" + + Sync do |task| + begin + barrier = Async::Barrier.new(parent: task) + semaphore = Async::Semaphore.new(fibers, parent: task) + execution_state = ActiveSupport::IsolatedExecutionState.context + + @booted << true + + while (execution = queue.dequeue) + barrier.async(parent: semaphore) do + ActiveSupport::IsolatedExecutionState.share_with(execution_state) + on_execution.call(execution) + end + end + + barrier.wait + ensure + barrier&.cancel + end + end + end + + @booted.pop + end + + def enqueue(execution) + queue.enqueue(execution) + end + + def shutdown + queue.close + end + + def wait(timeout) + thread.join(timeout) + end + + private + attr_reader :queue, :thread + end + + attr_reader :threads, :fibers + + def initialize(threads, fibers, on_available: nil, on_idle: nil, name: "worker") + ensure_fiber_isolation! + + @threads = threads + @fibers = fibers + + super(threads * fibers, on_available: on_available || on_idle) + + @available_slots = Concurrent::AtomicFixnum.new(capacity) + @next_runner = Concurrent::AtomicFixnum.new(0) + @mutex = Mutex.new + @shutdown = false + + @runners = Array.new(threads) do |index| + Runner.new(index, fibers, name: name, on_execution: method(:run_execution)) + end + end + + def post(execution) + raise Concurrent::RejectedExecutionError, "fiber pool is shut down" if shutdown? + + available_slots.decrement + next_runner.enqueue(execution) + rescue Exception + available_slots.increment if available_slots.value < capacity + raise + end + + def available_capacity + available_slots.value + end + + def shutdown + mutex.synchronize do + return if @shutdown + + @shutdown = true + runners.each(&:shutdown) + end + end + + def shutdown? + @shutdown + end + + def wait_for_termination(timeout) + deadline = Concurrent.monotonic_time + timeout.to_f + + runners.all? do |runner| + remaining = deadline - Concurrent.monotonic_time + break false if remaining <= 0 + + runner.wait(remaining) + end + end + + private + attr_reader :available_slots, :mutex, :runners + + def ensure_fiber_isolation! + ISOLATION_MUTEX.synchronize do + return if ActiveSupport::IsolatedExecutionState.isolation_level == :fiber + + ActiveSupport::IsolatedExecutionState.isolation_level = :fiber + end + end + + def next_runner + runners[(next_runner_index.increment - 1) % runners.size] + end + + def next_runner_index + @next_runner + end + + def run_execution(execution) + perform(execution) + rescue Exception => error + handle_thread_error(error) + ensure + available_slots.increment + mutex.synchronize { notify_available } + end + end +end diff --git a/lib/solid_queue/pool.rb b/lib/solid_queue/pool.rb index 9c3d2a298..a62394a95 100644 --- a/lib/solid_queue/pool.rb +++ b/lib/solid_queue/pool.rb @@ -1,17 +1,15 @@ # frozen_string_literal: true module SolidQueue - class Pool - include AppExecutor - - attr_reader :size + class Pool < Worker::ExecutionBackend + alias size capacity delegate :shutdown, :shutdown?, :wait_for_termination, to: :executor - def initialize(size, on_idle: nil) - @size = size - @on_idle = on_idle - @available_threads = Concurrent::AtomicFixnum.new(size) + def initialize(capacity, on_available: nil, on_idle: nil) + super(capacity, on_available: on_available || on_idle) + + @available_threads = Concurrent::AtomicFixnum.new(capacity) @mutex = Mutex.new end @@ -19,27 +17,24 @@ def post(execution) available_threads.decrement Concurrent::Promises.future_on(executor, execution) do |thread_execution| - wrap_in_app_executor do - thread_execution.perform - ensure + perform(thread_execution) + ensure available_threads.increment - mutex.synchronize { on_idle.try(:call) if idle? } - end + mutex.synchronize { notify_available } end.on_rejection! do |e| handle_thread_error(e) end end - def idle_threads + def available_capacity available_threads.value end - def idle? - idle_threads > 0 - end + alias idle_threads available_capacity + alias idle? available? private - attr_reader :available_threads, :on_idle, :mutex + attr_reader :available_threads, :mutex DEFAULT_OPTIONS = { min_threads: 0, @@ -48,7 +43,7 @@ def idle? } def executor - @executor ||= Concurrent::ThreadPoolExecutor.new DEFAULT_OPTIONS.merge(max_threads: size, max_queue: size) + @executor ||= Concurrent::ThreadPoolExecutor.new DEFAULT_OPTIONS.merge(max_threads: capacity, max_queue: capacity) end end end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index e036a5fd9..d83ca3c12 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -8,43 +8,67 @@ class Worker < Processes::Poller before_shutdown :run_stop_hooks after_shutdown :run_exit_hooks - attr_reader :queues, :pool + attr_reader :queues, :execution_backend, :concurrency_model, :fibers, :threads + + alias pool execution_backend def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) # Ensure that the queues array is deep frozen to prevent accidental modification @queues = Array(options[:queues]).map(&:freeze).freeze + @threads = options[:threads] + @concurrency_model = options[:concurrency_model].to_s.inquiry + @fibers = options[:fibers] + + ensure_supported_concurrency_model! - @pool = Pool.new(options[:threads], on_idle: -> { wake_up }) + @execution_backend = build_execution_backend(options) super(**options) end def metadata - super.merge(queues: queues.join(","), thread_pool_size: pool.size) + super.merge(queues: queues.join(","), thread_pool_size: threads, concurrency_model: concurrency_model.to_s).tap do |metadata| + metadata[:fiber_pool_size] = fibers if fibers.present? + metadata[:execution_capacity] = execution_backend.capacity if concurrency_model.fiber? + end end private + def ensure_supported_concurrency_model! + return if concurrency_model.thread? || concurrency_model.fiber? + + raise ArgumentError, "Unsupported worker concurrency model: #{concurrency_model}" + end + + def build_execution_backend(options) + if concurrency_model.fiber? + FiberPool.new(options[:threads], options[:fibers], on_available: -> { wake_up }, name: name) + else + Pool.new(options[:threads], on_available: -> { wake_up }) + end + end + def poll claim_executions.then do |executions| executions.each do |execution| - pool.post(execution) + execution_backend.post(execution) end - pool.idle? ? polling_interval : 10.minutes + execution_backend.available? ? polling_interval : 10.minutes end end def claim_executions with_polling_volume do - SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id) + SolidQueue::ReadyExecution.claim(queues, execution_backend.available_capacity, process_id) end end def shutdown - pool.shutdown - pool.wait_for_termination(SolidQueue.shutdown_timeout) + execution_backend.shutdown + execution_backend.wait_for_termination(SolidQueue.shutdown_timeout) super end diff --git a/lib/solid_queue/worker/execution_backend.rb b/lib/solid_queue/worker/execution_backend.rb new file mode 100644 index 000000000..ce3056ca6 --- /dev/null +++ b/lib/solid_queue/worker/execution_backend.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +module SolidQueue + class Worker < Processes::Poller + class ExecutionBackend + include AppExecutor + + attr_reader :capacity + + def initialize(capacity, on_available: nil) + @capacity = capacity + @on_available = on_available + end + + def post(_execution) + raise NotImplementedError + end + + def available_capacity + raise NotImplementedError + end + + def available? + available_capacity.positive? + end + + def shutdown + raise NotImplementedError + end + + def shutdown? + raise NotImplementedError + end + + def wait_for_termination(_timeout) + raise NotImplementedError + end + + private + attr_reader :on_available + + def perform(execution) + wrap_in_app_executor do + execution.perform + end + end + + def notify_available + on_available.try(:call) if available? + end + end + end +end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 3a5c9693e..f1c494367 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -28,6 +28,7 @@ Gem::Specification.new do |spec| spec.add_dependency "activerecord", rails_version spec.add_dependency "activejob", rails_version spec.add_dependency "railties", rails_version + spec.add_dependency "async", ">= 2.24", "< 3" spec.add_dependency "concurrent-ruby", ">= 1.3.1" spec.add_dependency "fugit", "~> 1.11" spec.add_dependency "thor", ">= 1.3.1" diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 34f69658b..ac320de53 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -58,7 +58,58 @@ class ConfigurationTest < ActiveSupport::TestCase configuration = SolidQueue::Configuration.new(workers: [ background_worker, background_worker ], skip_recurring: true) assert_processes configuration, :dispatcher, 0 - assert_processes configuration, :worker, 2 + assert_processes configuration, :worker, 2, concurrency_model: :thread + end + + test "workers default to thread concurrency model" do + configuration = SolidQueue::Configuration.new(workers: [ { queues: "background" } ], dispatchers: [], skip_recurring: true) + + assert configuration.valid? + assert_processes configuration, :worker, 1, concurrency_model: :thread + end + + test "thread workers reject fibers setting" do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "background", concurrency_model: :thread, fibers: 10 } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_includes configuration.errors.full_messages, "Worker 1 cannot set `fibers` unless `concurrency_model` is `fiber`" + end + + test "fiber workers require a positive fibers value" do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "background", concurrency_model: :fiber, fibers: 0 } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_includes configuration.errors.full_messages, "Worker 1 with `concurrency_model: fiber` must set a positive integer `fibers` value" + end + + test "fiber workers are valid with a positive fibers value" do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "background", concurrency_model: :fiber, fibers: 10 } ], + dispatchers: [], + skip_recurring: true + ) + + assert configuration.valid? + assert_processes configuration, :worker, 1, concurrency_model: :fiber, fibers: 10 + end + + test "workers require a supported concurrency model" do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "background", concurrency_model: :green_threads } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_includes configuration.errors.full_messages, "Worker 1 has unsupported `concurrency_model: green_threads`. Valid options are: thread, fiber" end test "mulitple workers with the same configuration" do diff --git a/test/unit/fiber_pool_test.rb b/test/unit/fiber_pool_test.rb new file mode 100644 index 000000000..5ad9cfcf0 --- /dev/null +++ b/test/unit/fiber_pool_test.rb @@ -0,0 +1,133 @@ +require "test_helper" + +class FiberPoolTest < ActiveSupport::TestCase + setup do + @original_on_thread_error = SolidQueue.on_thread_error + end + + teardown do + SolidQueue.on_thread_error = @original_on_thread_error + end + + test "tracks available capacity across fibers" do + started = Queue.new + release = Queue.new + + execution = Struct.new(:started, :release) do + def perform + started << true + release.pop + end + end + + pool = SolidQueue::FiberPool.new(1, 2) + + 2.times { pool.post(execution.new(started, release)) } + 2.times { started.pop } + + wait_for(timeout: 1.second) { pool.available_capacity.zero? } + + assert_equal 2, pool.capacity + assert_equal 0, pool.available_capacity + assert_not pool.available? + + 2.times { release << true } + + wait_for(timeout: 1.second) { pool.available_capacity == 2 } + + assert_equal 2, pool.available_capacity + assert pool.available? + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + + test "runs sleeping work concurrently within a single thread" do + finished = Queue.new + + execution = Struct.new(:finished) do + def perform + sleep 0.2 + finished << true + end + end + + pool = SolidQueue::FiberPool.new(1, 2) + started_at = Concurrent.monotonic_time + + 2.times { pool.post(execution.new(finished)) } + + 2.times { Timeout.timeout(1.second) { finished.pop } } + elapsed = Concurrent.monotonic_time - started_at + + assert_operator elapsed, :<, 0.35 + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + + test "calls on_available when fiber capacity is restored" do + started = Queue.new + release = Queue.new + available = Queue.new + + execution = Struct.new(:started, :release) do + def perform + started << true + release.pop + end + end + + pool = SolidQueue::FiberPool.new(1, 1, on_available: -> { available << true }) + + pool.post(execution.new(started, release)) + started.pop + wait_for(timeout: 1.second) { pool.available_capacity.zero? } + + release << true + + Timeout.timeout(1.second) { available.pop } + wait_for(timeout: 1.second) { pool.available_capacity == 1 } + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + + test "returns false when in-flight work exceeds the shutdown timeout" do + started = Queue.new + release = Queue.new + finished = Queue.new + errors = Queue.new + + SolidQueue.on_thread_error = ->(error) { errors << error } + + execution = Struct.new(:started, :release, :finished) do + def perform + started << true + release.pop + finished << true + end + end + + pool = SolidQueue::FiberPool.new(1, 1) + + pool.post(execution.new(started, release, finished)) + started.pop + + pool.shutdown + + assert_not pool.wait_for_termination(0.05) + assert_nil finished.pop(true) rescue ThreadError + assert_equal 0, pool.available_capacity + + release << true + + Timeout.timeout(1.second) { finished.pop } + assert pool.wait_for_termination(1.second) + assert_equal 1, pool.available_capacity + assert_nil errors.pop(true) rescue ThreadError + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end +end diff --git a/test/unit/pool_test.rb b/test/unit/pool_test.rb new file mode 100644 index 000000000..b25095ff4 --- /dev/null +++ b/test/unit/pool_test.rb @@ -0,0 +1,72 @@ +require "test_helper" + +class PoolTest < ActiveSupport::TestCase + test "tracks available capacity while work is in flight" do + started = Queue.new + release = Queue.new + + execution = Struct.new(:started, :release) do + def perform + started << true + release.pop + end + end.new(started, release) + + pool = SolidQueue::Pool.new(1) + + assert_equal 1, pool.capacity + assert_equal 1, pool.available_capacity + assert_equal 1, pool.idle_threads + assert pool.available? + assert pool.idle? + + pool.post(execution) + started.pop + + wait_for(timeout: 1.second) { pool.available_capacity.zero? } + + assert_equal 0, pool.available_capacity + assert_equal 0, pool.idle_threads + assert_not pool.available? + assert_not pool.idle? + + release << true + + wait_for(timeout: 1.second) { pool.available_capacity == 1 } + + assert_equal 1, pool.available_capacity + assert_equal 1, pool.idle_threads + assert pool.available? + assert pool.idle? + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + + test "calls on_available when capacity is restored" do + started = Queue.new + release = Queue.new + available = Queue.new + + execution = Struct.new(:started, :release) do + def perform + started << true + release.pop + end + end.new(started, release) + + pool = SolidQueue::Pool.new(1, on_available: -> { available << true }) + + pool.post(execution) + started.pop + wait_for(timeout: 1.second) { pool.available_capacity.zero? } + + release << true + + Timeout.timeout(1.second) { available.pop } + wait_for(timeout: 1.second) { pool.available_capacity == 1 } + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end +end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 3d692404b..dc9ecf4ed 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -19,7 +19,32 @@ class WorkerTest < ActiveSupport::TestCase process = SolidQueue::Process.first assert_equal "Worker", process.kind - assert_metadata process, { queues: "background", polling_interval: 0.2, thread_pool_size: 3 } + assert_metadata process, { queues: "background", polling_interval: 0.2, thread_pool_size: 3, concurrency_model: "thread" } + end + + test "worker exposes a generic execution backend" do + assert_equal @worker.pool, @worker.execution_backend + assert_equal 3, @worker.execution_backend.capacity + assert_equal 3, @worker.execution_backend.available_capacity + assert @worker.execution_backend.available? + end + + test "fiber workers use a fiber-backed execution backend" do + worker = SolidQueue::Worker.new( + queues: "background", + threads: 2, + fibers: 4, + concurrency_model: :fiber, + polling_interval: 0.2 + ) + + assert_instance_of SolidQueue::FiberPool, worker.execution_backend + assert_equal 8, worker.execution_backend.capacity + assert_equal 2, worker.metadata[:thread_pool_size] + assert_equal 4, worker.metadata[:fiber_pool_size] + assert_equal 8, worker.metadata[:execution_capacity] + ensure + worker&.stop end test "errors on polling are passed to on_thread_error and re-raised" do @@ -151,6 +176,52 @@ class WorkerTest < ActiveSupport::TestCase assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count end + test "fiber workers claim one slot per thread-fiber combination" do + worker = SolidQueue::Worker.new( + queues: "background", + threads: 2, + fibers: 3, + concurrency_model: :fiber, + polling_interval: 0.2 + ) + + SolidQueue::ReadyExecution.expects(:claim).with(worker.queues, 6, nil).returns([]) + + worker.send(:claim_executions) + ensure + worker&.stop + end + + test "fiber workers can start more than one sleeping job per thread" do + worker = SolidQueue::Worker.new( + queues: "background", + threads: 1, + fibers: 2, + concurrency_model: :fiber, + polling_interval: 0.01 + ) + + 2.times { StoreResultJob.perform_later("fiber-overlap", pause: 0.3.seconds) } + + worker.start + + wait_for(timeout: 0.25.second) do + skip_active_record_query_cache do + JobResult.where(queue_name: :background, status: "started", value: "fiber-overlap").count == 2 + end + end + + skip_active_record_query_cache do + assert_operator JobResult.where(queue_name: :background, status: "completed", value: "fiber-overlap").count, :<, 2 + end + + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal 2, JobResult.where(queue_name: :background, status: "completed", value: "fiber-overlap").count + ensure + worker&.stop + end + test "terminate on heartbeat when unregistered" do old_heartbeat_interval, SolidQueue.process_heartbeat_interval = SolidQueue.process_heartbeat_interval, 1.second