diff --git a/Gemfile.lock b/Gemfile.lock index 7c4662de5..f1cda3c92 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -55,12 +55,22 @@ GEM rake thor (>= 0.14.0) ast (2.4.2) + async (2.38.1) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.18) base64 (0.3.0) benchmark (0.4.1) bigdecimal (3.3.1) builder (3.3.0) concurrent-ruby (1.3.5) connection_pool (2.5.4) + console (1.34.3) + fiber-annotation + fiber-local (~> 1.1) + json crass (1.0.6) date (3.4.1) debug (1.9.2) @@ -70,6 +80,10 @@ GEM erubi (1.13.1) et-orbi (1.2.11) tzinfo + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.1) fugit (1.11.1) et-orbi (~> 1, >= 1.2.11) raabro (~> 1.4) @@ -78,6 +92,7 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-event (1.14.5) irb (1.14.3) rdoc (>= 4.0.0) reline (>= 0.4.2) @@ -87,6 +102,7 @@ GEM loofah (2.23.1) crass (~> 1.0.2) nokogiri (>= 1.12.0) + metrics (0.15.0) minitest (5.26.0) mocha (2.1.0) ruby2_keywords (>= 0.0.5) @@ -181,6 +197,7 @@ GEM stringio (3.1.2) thor (1.3.2) timeout (0.4.3) + traces (0.18.2) tzinfo (2.0.6) concurrent-ruby (~> 1.0) unicode-display_width (3.1.3) @@ -199,6 +216,7 @@ PLATFORMS DEPENDENCIES appraisal + async (>= 2.24) debug (~> 1.9) logger minitest (~> 5.0) diff --git a/README.md b/README.md index f77c4d5a1..bc65fafa3 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,10 @@ Or you can also set the environment variable `SOLID_QUEUE_SUPERVISOR_MODE` to `a **The recommended and default mode is `fork`. Only use `async` if you know what you're doing and have strong reasons to** +This supervisor mode is separate from a worker's `execution_mode`. Supervisor mode decides whether supervised processes live in forks or threads. Worker execution mode decides whether a worker runs claimed jobs in a thread pool or as fibers on a single async reactor thread. + +Because these are separate concerns, you can combine the default `fork` supervisor mode with `execution_mode: async` on workers. In that setup, each worker process gets its own async reactor and bounded fiber capacity. + ## Configuration By default, Solid Queue will try to find your configuration under `config/queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG` or by using the `-c/--config_file` option with `bin/jobs`, like this: @@ -222,10 +226,12 @@ production: batch_size: 500 concurrency_maintenance_interval: 300 workers: - - queues: "*" - threads: 3 - polling_interval: 2 + - queues: "llm*" + execution_mode: async + capacity: 100 + polling_interval: 0.05 - queues: [ real_time, background ] + execution_mode: thread threads: 5 polling_interval: 0.1 processes: 3 @@ -271,9 +277,14 @@ Here's an overview of the different options: Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance). -- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting. -It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker thread uses one connection, and two additional connections are reserved for polling and heartbeat. -- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#fork-vs-async-mode). +- `execution_mode`: controls how a worker executes claimed jobs. `thread` is the default and uses the existing thread pool behavior. `async` executes jobs as fibers on a single reactor thread. `fiber` is accepted as an alias for `async`. + Async worker execution requires fiber-scoped isolated execution state. In Rails apps, set `config.active_support.isolation_level = :fiber` before using `execution_mode: async`. Solid Queue refuses to boot async workers when isolation remains thread-scoped. +- `threads`: this is the execution capacity for a worker in `thread` mode. It is the max size of the thread pool. By default, this is `3`. Only workers have this setting. +It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker uses connections for polling and heartbeat and thread mode may use additional connections for job execution. +- `capacity`: an alias for worker execution capacity. This is the clearer name when `execution_mode: async`, because it refers to in-flight execution capacity rather than operating system threads. + Async workers reject `threads`; use `capacity` or `fibers` instead. On Rails 7.2 and later, a practical starting point is usually `3-5` queue database connections per worker process rather than `capacity`, because ordinary Active Record query paths can release connections between async waits. On Rails 7.1, size the queue database pool more conservatively, as in-flight async jobs may still retain connections roughly in proportion to `capacity`. +- `fibers`: an alias for `capacity` when `execution_mode: async`. +- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. This works with both `execution_mode: thread` and `execution_mode: async` as long as the supervisor is running in the default `fork` mode. **Note**: this option is ignored only when the supervisor itself is [running in `async` mode](#fork-vs-async-mode). - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. @@ -364,7 +375,15 @@ queues: back* ### Threads, processes, and signals -Workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling. +By default, workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Workers can also be configured with `execution_mode: async`, in which case claimed jobs are executed as fibers on a single reactor thread and bounded by the worker's execution capacity. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling. + +Async worker execution is best suited for cooperative, mostly I/O-bound jobs. Blocking or CPU-heavy work still blocks the single reactor thread, so it should not be expected to outperform thread mode for every workload. + +Because async workers run multiple fibers on a single thread, Rails must also isolate execution state per fiber rather than per thread. If your app keeps the default thread-scoped isolation level, Solid Queue will raise a boot-time error instead of running async workers with shared Active Record state. + +On Rails 7.2 and later, async workers can often use a much smaller queue database pool than an equivalent thread pool. A practical starting point is `3-5` queue database connections per worker process: one for job execution, one for polling, one for heartbeats, plus some headroom. In the default `fork` supervisor mode, that guidance applies per worker process. In supervisor `async` mode, all workers share one process, so add together the requirements for the workers running there. + +That lower-pool guidance depends on job code not holding connections open across async waits. APIs such as `ActiveRecord::Base.connection`, `lease_connection`, `connection_pool.checkout`, or long-lived `with_connection` / transaction blocks can pin connections and push async workers back toward thread-like pool usage. On Rails 7.1, plan conservatively and assume async capacity can still grow queue database connection usage. The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode: - `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit. @@ -374,6 +393,10 @@ When receiving a `QUIT` signal, if workers still have jobs in-flight, these will If processes have no chance of cleaning up before exiting (e.g. if someone pulls a cable somewhere), in-flight jobs might remain claimed by the processes executing them. Processes send heartbeats, and the supervisor checks and prunes processes with expired heartbeats. Jobs that were claimed by processes with an expired heartbeat will be marked as failed with a `SolidQueue::Processes::ProcessPrunedError`. You can configure both the frequency of heartbeats and the threshold to consider a process dead. See the section below for this. +Worker heartbeats are driven by a separate timer task, not by the worker execution backend itself. This means async workers do not rely on the reactor loop to prove liveness. However, liveness is still tracked at the worker-process level, not at the individual thread or fiber level. + +This means finished and failed jobs still follow the normal Solid Queue lifecycle, but a single stuck job can remain claimed if the worker process itself is still alive. If you need stronger stuck-job detection, that requires an explicit timeout or watchdog mechanism on top of process heartbeats. + In a similar way, if a worker is terminated in any other way not initiated by the above signals (e.g. a worker is sent a `KILL` signal), jobs in progress will be marked as failed so that they can be inspected, with a `SolidQueue::Processes::ProcessExitError`. Sometimes a job in particular is responsible for this, for example, if it has a memory leak and you have a mechanism to kill processes over a certain memory threshold, so this will help identifying this kind of situation. diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index e63a000ca..605b3884d 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -6,7 +6,11 @@ class Configuration validate :ensure_configured_processes validate :ensure_valid_recurring_tasks - validate :ensure_correctly_sized_thread_pool + validate :ensure_correctly_sized_database_pool + validate :ensure_valid_worker_execution_modes + validate :ensure_async_workers_use_capacity_aliases + validate :ensure_async_workers_have_required_dependency + validate :ensure_async_workers_use_supported_isolation_level class Process < Struct.new(:kind, :attributes) def instantiate @@ -18,7 +22,8 @@ def instantiate queues: "*", threads: 3, processes: 1, - polling_interval: 0.1 + polling_interval: 0.1, + execution_mode: :thread } DISPATCHER_DEFAULTS = { @@ -35,6 +40,7 @@ def instantiate DEFAULT_CONFIG_FILE_PATH = "config/queue.yml" DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" + ASYNC_QUERY_SCOPED_CONNECTIONS_VERSION = Gem::Version.new("7.2.0") def initialize(**options) @options = options.with_defaults(default_options) @@ -88,13 +94,46 @@ def ensure_valid_recurring_tasks end end - def ensure_correctly_sized_thread_pool - if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads - errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " + - "database connection pool is #{db_pool_size}. Increase it in `config/database.yml`") + def ensure_correctly_sized_database_pool + if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_database_pool_size + errors.add(:base, "Solid Queue requires at least #{estimated_database_pool_size} database connections " + + "for the configured workers, but the queue database connection pool is #{db_pool_size}. " + + "Increase it in `config/database.yml`") end end + def ensure_valid_worker_execution_modes + workers_options.each do |options| + SolidQueue::ExecutionPools.normalize_mode(options[:execution_mode] || WORKER_DEFAULTS[:execution_mode]) + rescue ArgumentError => error + errors.add(:base, error.message) + end + end + + def ensure_async_workers_use_capacity_aliases + workers_options.each do |options| + if async_worker?(options) && options.key?(:threads) + errors.add(:base, "Async workers do not accept `threads`. Use `capacity` or `fibers` instead.") + end + end + end + + def ensure_async_workers_have_required_dependency + return unless workers_options.any? { |options| async_worker?(options) } + + SolidQueue::ExecutionPools::AsyncPool.ensure_dependency! + rescue LoadError => error + errors.add(:base, error.message) + end + + def ensure_async_workers_use_supported_isolation_level + return unless workers_options.any? { |options| async_worker?(options) } + + SolidQueue::ExecutionPools::AsyncPool.ensure_supported_isolation_level! + rescue ArgumentError => error + errors.add(:base, error.message) + end + def default_options { mode: ENV["SOLID_QUEUE_SUPERVISOR_MODE"] || :fork, @@ -131,7 +170,8 @@ def workers 1 end - processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) } + defaults = worker_defaults_for(worker_options) + processes.times.map { Process.new(:worker, worker_options.with_defaults(defaults)) } end end @@ -153,7 +193,7 @@ def schedulers def workers_options @workers_options ||= processes_config.fetch(:workers, []) - .map { |options| options.dup.symbolize_keys } + .map { |options| normalize_worker_options(options) } end def dispatchers_options @@ -226,10 +266,57 @@ def load_config_from_file(file) end end - def estimated_number_of_threads - # At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task - thread_count = workers_options.map { |options| options.fetch(:threads, WORKER_DEFAULTS[:threads]) }.max - (thread_count || 1) + 2 + def estimated_database_pool_size + worker_pool_size = workers_options.map { |options| estimated_database_pool_size_for_worker(options) }.max + worker_pool_size || 1 + end + + def estimated_database_pool_size_for_worker(options) + estimated_execution_connections_for_worker(options) + 2 + end + + def normalize_worker_options(options) + options = options.dup.symbolize_keys + options[:execution_mode] = normalized_worker_execution_mode(options) + options[:capacity] = worker_capacity(options) if options.key?(:capacity) || options.key?(:fibers) + options[:threads] = worker_capacity(options) unless async_worker?(options) && !options.key?(:threads) + options + end + + def worker_capacity(options) + options[:capacity] || options[:fibers] || options[:threads] || WORKER_DEFAULTS[:threads] + end + + def normalized_worker_execution_mode(options) + SolidQueue::ExecutionPools.normalize_mode(options[:execution_mode] || WORKER_DEFAULTS[:execution_mode]) + rescue ArgumentError + options[:execution_mode] || WORKER_DEFAULTS[:execution_mode] + end + + def estimated_execution_connections_for_worker(options) + async_worker?(options) ? async_execution_connections_for_worker(options) : worker_capacity(options) + end + + def async_execution_connections_for_worker(options) + async_jobs_release_connections_between_queries? ? 1 : worker_capacity(options) + end + + def async_jobs_release_connections_between_queries? + ActiveRecord.gem_version >= ASYNC_QUERY_SCOPED_CONNECTIONS_VERSION + end + + def async_worker?(options) + normalized_worker_execution_mode(options) == :async + end + + def worker_defaults_for(options) + if async_worker?(options) + WORKER_DEFAULTS.except(:threads).tap do |defaults| + defaults[:capacity] = WORKER_DEFAULTS[:threads] unless options.key?(:threads) + end + else + WORKER_DEFAULTS + end end end end diff --git a/lib/solid_queue/execution_pools.rb b/lib/solid_queue/execution_pools.rb new file mode 100644 index 000000000..262a0b9be --- /dev/null +++ b/lib/solid_queue/execution_pools.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module SolidQueue + module ExecutionPools + class << self + def build(mode:, size:, on_state_change: nil) + case normalize_mode(mode) + when :thread + ThreadPool.new(size, on_state_change: on_state_change) + when :async + AsyncPool.new(size, on_state_change: on_state_change) + end + end + + def normalize_mode(mode) + case mode.to_s + when "", "thread" + :thread + when "async", "fiber" + :async + else + raise ArgumentError, "Unknown execution mode #{mode.inspect}. Expected one of: :thread, :async, :fiber" + end + end + end + end +end diff --git a/lib/solid_queue/execution_pools/async_pool.rb b/lib/solid_queue/execution_pools/async_pool.rb new file mode 100644 index 000000000..4b0ed6958 --- /dev/null +++ b/lib/solid_queue/execution_pools/async_pool.rb @@ -0,0 +1,240 @@ +# frozen_string_literal: true + +module SolidQueue + module ExecutionPools + class AsyncPool + include AppExecutor + + IDLE_WAIT_INTERVAL = 0.01 + + class MissingDependencyError < LoadError + def initialize(error) + super( + "Async execution mode requires the `async` gem. " \ + "Add `gem \"async\"` to your Gemfile to use `execution_mode: async`. " \ + "Original error: #{error.message}" + ) + end + end + + class UnsupportedIsolationLevelError < ArgumentError + def initialize(level) + super( + "Async execution mode requires fiber-scoped isolated execution state. " \ + "Set `ActiveSupport::IsolatedExecutionState.isolation_level = :fiber` " \ + "(or `config.active_support.isolation_level = :fiber` in Rails). " \ + "Current isolation level: #{level.inspect}" + ) + end + end + + class << self + def ensure_dependency! + ensure_io_timeout_compatibility! + + require "async" + require "async/semaphore" + rescue LoadError => error + raise MissingDependencyError.new(error) + end + + def ensure_supported_isolation_level! + return if supported_isolation_level? + + raise UnsupportedIsolationLevelError.new(ActiveSupport::IsolatedExecutionState.isolation_level) + end + + def supported_isolation_level? + ActiveSupport::IsolatedExecutionState.isolation_level == :fiber + end + + def ensure_io_timeout_compatibility!(io_class = IO) + unless io_class.method_defined?(:timeout) && io_class.method_defined?(:timeout=) + # Async 2.24, which Ruby 3.1 resolves to, expects Ruby's newer IO + # timeout API to exist on any socket it waits on. Older Rubies don't + # provide it, so give async the minimal accessor interface it needs. + io_class.class_eval do + def timeout + @timeout + end + + def timeout=(value) + @timeout = value + end + end + end + + return if io_class.const_defined?(:TimeoutError, false) + + io_class.const_set(:TimeoutError, Class.new(StandardError)) + end + end + + attr_reader :size + + def initialize(size, on_state_change: nil) + @size = size + @on_state_change = on_state_change + @available_capacity = size + @mutex = Mutex.new + @state_mutex = Mutex.new + @shutdown = false + @fatal_error = nil + @boot_queue = Thread::Queue.new + @pending_executions = Thread::Queue.new + + self.class.ensure_dependency! + self.class.ensure_supported_isolation_level! + + @reactor_thread = start_reactor + + boot_result = @boot_queue.pop + raise boot_result if boot_result.is_a?(Exception) + end + + def post(execution) + reserved = false + raise_if_fatal_error! + raise RuntimeError, "Execution pool is shutting down" if shutdown? + + reserve_capacity! + reserved = true + pending_executions << execution + rescue Exception + restore_capacity if reserved + raise + end + + def available_capacity + raise_if_fatal_error! + mutex.synchronize { @available_capacity } + end + + def idle? + available_capacity.positive? + end + + def shutdown + state_mutex.synchronize do + next false if @shutdown + + @shutdown = true + end + end + + def shutdown? + state_mutex.synchronize { @shutdown } + end + + def wait_for_termination(timeout) + reactor_thread.join(timeout) + end + + def metadata + { + execution_mode: "async", + capacity: size, + inflight: size - available_capacity + } + end + + private + attr_reader :boot_queue, :mutex, :on_state_change, :pending_executions, :reactor_thread, :state_mutex + + def name + @name ||= "solid_queue-async-pool-#{object_id}" + end + + def start_reactor + create_thread do + Async do |task| + semaphore = Async::Semaphore.new(size, parent: task) + boot_queue << :ready + + wait_for_executions(semaphore) + wait_for_inflight_executions + end + rescue Exception => error + register_fatal_error(error) + raise + end + end + + def wait_for_executions(semaphore) + loop do + schedule_pending_executions(semaphore) + + break if shutdown? && pending_executions.empty? + + # Older async releases don't support waking the reactor from another + # thread reliably, so we cooperatively poll for newly posted work. + sleep(IDLE_WAIT_INTERVAL) if pending_executions.empty? + end + end + + def schedule_pending_executions(semaphore) + while execution = next_pending_execution + semaphore.async(execution) do |_execution_task, scheduled_execution| + perform_execution(scheduled_execution) + end + end + end + + def next_pending_execution + pending_executions.pop(true) + rescue ThreadError + nil + end + + def perform_execution(execution) + wrap_in_app_executor { execution.perform } + rescue Async::Stop => error + handle_thread_error(error) + register_fatal_error(error) + rescue Exception => error + handle_thread_error(error) + ensure + restore_capacity + end + + def reserve_capacity! + mutex.synchronize do + raise RuntimeError, "Execution pool is at capacity" if @available_capacity <= 0 + + @available_capacity -= 1 + end + end + + def restore_capacity + should_notify = mutex.synchronize do + @available_capacity += 1 + @available_capacity.positive? + end + + on_state_change&.call if should_notify + end + + def register_fatal_error(error) + state_mutex.synchronize do + @fatal_error ||= error + end + + boot_queue << error if boot_queue.empty? + on_state_change&.call + end + + def raise_if_fatal_error! + error = state_mutex.synchronize { @fatal_error } + raise error if error + end + + def wait_for_inflight_executions + sleep(IDLE_WAIT_INTERVAL) while executions_in_flight? + end + + def executions_in_flight? + mutex.synchronize { @available_capacity < size } + end + end + end +end diff --git a/lib/solid_queue/execution_pools/thread_pool.rb b/lib/solid_queue/execution_pools/thread_pool.rb new file mode 100644 index 000000000..404e1c533 --- /dev/null +++ b/lib/solid_queue/execution_pools/thread_pool.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +module SolidQueue + module ExecutionPools + class ThreadPool + include AppExecutor + + attr_reader :size + + delegate :shutdown, :shutdown?, :wait_for_termination, to: :executor + + def initialize(size, on_state_change: nil) + @size = size + @on_state_change = on_state_change + @available_capacity = size + @mutex = Mutex.new + end + + def post(execution) + reserved = false + reserve_capacity! + reserved = true + + Concurrent::Promises.future_on(executor, execution) do |thread_execution| + wrap_in_app_executor { thread_execution.perform } + rescue Exception => error + handle_thread_error(error) + ensure + restore_capacity + end + rescue Exception + restore_capacity if reserved + raise + end + + def available_capacity + mutex.synchronize { @available_capacity } + end + + def idle? + available_capacity.positive? + end + + def metadata + { + execution_mode: "thread", + capacity: size, + inflight: size - available_capacity, + thread_pool_size: size + } + end + + private + attr_reader :mutex, :on_state_change + + DEFAULT_OPTIONS = { + min_threads: 0, + idletime: 60, + fallback_policy: :abort + } + + def executor + @executor ||= Concurrent::ThreadPoolExecutor.new DEFAULT_OPTIONS.merge(max_threads: size, max_queue: size) + end + + def reserve_capacity! + mutex.synchronize do + raise RuntimeError, "Execution pool is at capacity" if @available_capacity <= 0 + + @available_capacity -= 1 + end + end + + def restore_capacity + should_notify = mutex.synchronize do + @available_capacity += 1 + @available_capacity.positive? + end + + on_state_change&.call if should_notify + end + end + end +end diff --git a/lib/solid_queue/pool.rb b/lib/solid_queue/pool.rb index 9c3d2a298..356bbaf35 100644 --- a/lib/solid_queue/pool.rb +++ b/lib/solid_queue/pool.rb @@ -1,54 +1,5 @@ # frozen_string_literal: true module SolidQueue - class Pool - include AppExecutor - - attr_reader :size - - delegate :shutdown, :shutdown?, :wait_for_termination, to: :executor - - def initialize(size, on_idle: nil) - @size = size - @on_idle = on_idle - @available_threads = Concurrent::AtomicFixnum.new(size) - @mutex = Mutex.new - end - - def post(execution) - available_threads.decrement - - Concurrent::Promises.future_on(executor, execution) do |thread_execution| - wrap_in_app_executor do - thread_execution.perform - ensure - available_threads.increment - mutex.synchronize { on_idle.try(:call) if idle? } - end - end.on_rejection! do |e| - handle_thread_error(e) - end - end - - def idle_threads - available_threads.value - end - - def idle? - idle_threads > 0 - end - - private - attr_reader :available_threads, :on_idle, :mutex - - DEFAULT_OPTIONS = { - min_threads: 0, - idletime: 60, - fallback_policy: :abort - } - - def executor - @executor ||= Concurrent::ThreadPoolExecutor.new DEFAULT_OPTIONS.merge(max_threads: size, max_queue: size) - end - end + Pool = ExecutionPools::ThreadPool end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index e036a5fd9..0c7507866 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -11,18 +11,26 @@ class Worker < Processes::Poller attr_reader :queues, :pool def initialize(**options) - options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) + options = options.dup + options[:threads] = options[:capacity] || options[:fibers] if options.key?(:capacity) || options.key?(:fibers) + options = options.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) # Ensure that the queues array is deep frozen to prevent accidental modification @queues = Array(options[:queues]).map(&:freeze).freeze + @metadata_state_mutex = Mutex.new + @metadata_dirty = false - @pool = Pool.new(options[:threads], on_idle: -> { wake_up }) + @pool_options = { + mode: options[:execution_mode], + size: options[:threads], + on_state_change: -> { mark_metadata_dirty; wake_up } + } super(**options) end def metadata - super.merge(queues: queues.join(","), thread_pool_size: pool.size) + super.merge(queues: queues.join(",")).merge(pool.metadata) end private @@ -32,16 +40,23 @@ def poll pool.post(execution) end + reload_metadata_if_needed(executions.any?) + pool.idle? ? polling_interval : 10.minutes end end def claim_executions with_polling_volume do - SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id) + SolidQueue::ReadyExecution.claim(queues, pool.available_capacity, process_id) end end + def boot + build_pool + super + end + def shutdown pool.shutdown pool.wait_for_termination(SolidQueue.shutdown_timeout) @@ -53,8 +68,35 @@ def all_work_completed? SolidQueue::ReadyExecution.aggregated_count_across(queues).zero? end + def heartbeat + super + reload_metadata + end + def set_procline procline "waiting for jobs in #{queues.join(",")}" end + + def build_pool + @pool ||= ExecutionPools.build(**@pool_options) + end + + def mark_metadata_dirty + metadata_state_mutex.synchronize { @metadata_dirty = true } + end + + def metadata_state_mutex + @metadata_state_mutex + end + + def reload_metadata_if_needed(executions_claimed) + needs_reload = metadata_state_mutex.synchronize do + claimed_or_dirty = executions_claimed || @metadata_dirty + @metadata_dirty = false + claimed_or_dirty + end + + reload_metadata if needs_reload + end end end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 3a5c9693e..e444a7212 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -34,6 +34,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "appraisal" spec.add_development_dependency "debug", "~> 1.9" + spec.add_development_dependency "async", ">= 2.24" spec.add_development_dependency "minitest", "~> 5.0" spec.add_development_dependency "mocha" spec.add_development_dependency "puma", "~> 7.0" diff --git a/test/integration/async_processes_lifecycle_test.rb b/test/integration/async_processes_lifecycle_test.rb index fd284210e..5eeb172c4 100644 --- a/test/integration/async_processes_lifecycle_test.rb +++ b/test/integration/async_processes_lifecycle_test.rb @@ -125,10 +125,15 @@ class AsyncProcessesLifecycleTest < ActiveSupport::TestCase no_pause = enqueue_store_result_job("no pause") pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second) - # Wait for the "no pause" job to complete and the pause job to be claimed. - # This ensures the pause job is actively being processed. + # Wait for the "no pause" job to complete and the pause job to start. + # A claimed execution alone is not enough here because the worker may have + # claimed the job but not yet entered `perform`. wait_for_jobs_to_finish_for(3.seconds, except: pause) - wait_for(timeout: 2.seconds) { SolidQueue::ClaimedExecution.exists?(job_id: SolidQueue::Job.find_by(active_job_id: pause.job_id)&.id) } + wait_for(timeout: 5.seconds) do + skip_active_record_query_cache do + JobResult.where(queue_name: :background, status: "started", value: "pause").exists? + end + end signal_process(@pid, :TERM, wait: 0.2.second) wait_for_jobs_to_finish_for(2.seconds, except: pause) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index a12c48e42..4c281aa90 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -256,6 +256,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase private def assert_stored_sequence(result, sequence) expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join + wait_for(timeout: 1.second) do + skip_active_record_query_cache do + result.reload.status.split(" + ").sort.join == expected + end + end + skip_active_record_query_cache do assert_equal expected, result.reload.status.split(" + ").sort.join end diff --git a/test/integration/puma/plugin_testing.rb b/test/integration/puma/plugin_testing.rb index 14165c9b8..2ae30c0c0 100644 --- a/test/integration/puma/plugin_testing.rb +++ b/test/integration/puma/plugin_testing.rb @@ -11,11 +11,12 @@ module PluginTesting setup do FileUtils.mkdir_p Rails.root.join("tmp", "pids") + @port = TCPServer.open("127.0.0.1", 0) { |server| server.addr[1] } Dir.chdir("test/dummy") do cmd = %W[ bundle exec puma - -b tcp://127.0.0.1:9222 + -b tcp://127.0.0.1:#{@port} -C config/puma_#{solid_queue_mode}.rb -s config.ru @@ -27,6 +28,7 @@ module PluginTesting end wait_for_registered_processes(5, timeout: 5.second) + wait_for(timeout: 5.seconds) { find_processes_registered_as(supervisor_kind).exists? } end teardown do @@ -47,6 +49,7 @@ module PluginTesting # Ensure the restart finishes before we try to continue with the test wait_for_registered_processes(0, timeout: 5.second) wait_for_registered_processes(5, timeout: 5.second) + wait_for(timeout: 5.seconds) { find_processes_registered_as(supervisor_kind).exists? } StoreResultJob.perform_later(:puma_plugin) wait_for_jobs_to_finish_for(2.seconds) @@ -54,6 +57,10 @@ module PluginTesting end private + def supervisor_kind + "Supervisor(#{solid_queue_mode})" + end + def solid_queue_mode raise NotImplementedError end diff --git a/test/test_helper.rb b/test/test_helper.rb index db5bd5c34..cd60eb7af 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -29,6 +29,19 @@ def write(...) Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions) class ExpectedTestError < RuntimeError; end +module ExecutionIsolationTestHelper + def with_execution_isolation(level) + previous_level = ActiveSupport::IsolatedExecutionState.isolation_level + ActiveSupport::IsolatedExecutionState.isolation_level = level + yield + ensure + ActiveSupport::IsolatedExecutionState.isolation_level = previous_level + end +end + +class Minitest::Test + include ExecutionIsolationTestHelper +end class ActiveSupport::TestCase include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 34f69658b..10aaed6e6 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -61,6 +61,64 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :worker, 2 end + test "normalize worker execution modes and capacity aliases" do + with_execution_isolation(:fiber) do + configuration = SolidQueue::Configuration.new( + workers: [ + { queues: "llm*", execution_mode: :async, capacity: 10 }, + { queues: "*", execution_mode: :fiber, fibers: 3 } + ], + dispatchers: [], + skip_recurring: true + ) + + assert configuration.valid? + assert_processes configuration, :worker, 2, execution_mode: [ :async, :async ], capacity: [ 10, 3 ], threads: [ nil, nil ] + end + end + + test "async workers reject threads in favor of capacity aliases" do + with_execution_isolation(:fiber) do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "llm*", execution_mode: :async, threads: 10 } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_match /Async workers do not accept `threads`/, configuration.errors.full_messages.first + end + end + + test "async worker capacity inflates required database pool size on Rails 7.1" do + skip if async_workers_release_connections_between_queries? + + with_execution_isolation(:fiber) do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "llm*", execution_mode: :async, capacity: 1000 } ], + dispatchers: [], + skip_recurring: true + ) + + assert_not configuration.valid? + assert_match /requires at least 1002 database connections/, configuration.errors.full_messages.first + end + end + + test "async worker capacity does not inflate required database pool size on Rails 7.2+" do + skip unless async_workers_release_connections_between_queries? + + with_execution_isolation(:fiber) do + configuration = SolidQueue::Configuration.new( + workers: [ { queues: "llm*", execution_mode: :async, capacity: 1000 } ], + dispatchers: [], + skip_recurring: true + ) + + assert configuration.valid? + end + end + test "mulitple workers with the same configuration" do background_worker = { queues: "background", polling_interval: 10, processes: 3 } configuration = SolidQueue::Configuration.new(workers: [ background_worker ]) @@ -165,14 +223,36 @@ class ConfigurationTest < ActiveSupport::TestCase assert_not configuration.valid? assert_equal [ "No processes configured" ], configuration.errors.full_messages + configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [ { execution_mode: :invalid } ]) + assert_not configuration.valid? + assert_match /Unknown execution mode/, configuration.errors.full_messages.first + + configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [ { execution_mode: :async } ]) + assert_not configuration.valid? + assert_match /requires fiber-scoped isolated execution state/, configuration.errors.full_messages.first + + with_execution_isolation(:fiber) do + load_error = LoadError.new("cannot load such file -- async") + missing_dependency_error = SolidQueue::ExecutionPools::AsyncPool::MissingDependencyError.new(load_error) + SolidQueue::ExecutionPools::AsyncPool.expects(:ensure_dependency!).raises(missing_dependency_error) + + configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [ { execution_mode: :async } ]) + assert_not configuration.valid? + assert_match /gem "async"/, configuration.errors.full_messages.first + end + # Not enough DB connections configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ]) assert_not configuration.valid? - assert_match /Solid Queue is configured to use \d+ threads but the database connection pool is \d+. Increase it in `config\/database.yml`/, + assert_match /Solid Queue requires at least \d+ database connections for the configured workers, but the queue database connection pool is \d+. Increase it in `config\/database.yml`/, configuration.errors.full_messages.first end private + def async_workers_release_connections_between_queries? + ActiveRecord.gem_version >= SolidQueue::Configuration::ASYNC_QUERY_SCOPED_CONNECTIONS_VERSION + end + def assert_processes(configuration, kind, count, **attributes) processes = configuration.configured_processes.select { |p| p.kind == kind } assert_equal count, processes.size diff --git a/test/unit/execution_pools/async_pool_test.rb b/test/unit/execution_pools/async_pool_test.rb new file mode 100644 index 000000000..52b3c36d6 --- /dev/null +++ b/test/unit/execution_pools/async_pool_test.rb @@ -0,0 +1,135 @@ +require "test_helper" + +class AsyncPoolTest < Minitest::Test + Execution = Struct.new(:started, :results, :pause) do + def perform + started << true if started + sleep(pause) if pause + results << [ Thread.current.object_id, Fiber.current.object_id ] if results + end + end + + CancelledExecution = Struct.new(:started) do + def perform + started << true if started + raise Async::Stop.new + end + end + + def test_raises_a_clear_error_when_the_async_gem_is_unavailable + load_error = LoadError.new("cannot load such file -- async") + + SolidQueue::ExecutionPools::AsyncPool.expects(:require).with("async").raises(load_error) + + error = assert_raises SolidQueue::ExecutionPools::AsyncPool::MissingDependencyError do + SolidQueue::ExecutionPools::AsyncPool.new(3) + end + + assert_match /gem "async"/, error.message + end + + def test_build_treats_fiber_as_an_alias_for_async + pool = mock + + SolidQueue::ExecutionPools::AsyncPool.expects(:new).with(5, on_state_change: nil).returns(pool) + + assert_equal pool, SolidQueue::ExecutionPools.build(mode: :fiber, size: 5) + end + + def test_raises_a_clear_error_when_isolation_level_is_not_fiber + error = assert_raises SolidQueue::ExecutionPools::AsyncPool::UnsupportedIsolationLevelError do + SolidQueue::ExecutionPools::AsyncPool.new(3) + end + + assert_match /isolation_level = :fiber/, error.message + end + + def test_adds_io_timeout_compatibility_for_older_rubies + io_class = Class.new + + SolidQueue::ExecutionPools::AsyncPool.ensure_io_timeout_compatibility!(io_class) + + io = io_class.new + assert_nil io.timeout + + io.timeout = 1.second + + assert_equal 1.second, io.timeout + assert io_class.const_defined?(:TimeoutError, false) + end + + def test_executes_jobs_as_fibers_on_a_single_reactor_thread + with_execution_isolation(:fiber) do + pool = SolidQueue::ExecutionPools::AsyncPool.new(2) + results = Thread::Queue.new + + pool.post Execution.new(nil, results, 0.05) + pool.post Execution.new(nil, results, 0.05) + + entries = 2.times.map { Timeout.timeout(1.second) { results.pop } } + + assert_equal 1, entries.map(&:first).uniq.count + assert_equal 2, entries.map(&:last).uniq.count + assert_equal 2, pool.available_capacity + assert_equal 0, pool.metadata[:inflight] + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + end + + def test_waits_for_in_flight_executions_during_shutdown + with_execution_isolation(:fiber) do + pool = SolidQueue::ExecutionPools::AsyncPool.new(1) + started = Thread::Queue.new + + pool.post Execution.new(started, nil, 0.1) + Timeout.timeout(1.second) { started.pop } + + pool.shutdown + + assert_nil pool.wait_for_termination(0.01) + assert pool.wait_for_termination(1.second) + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + end + + def test_shutdown_wakes_the_reactor_when_idle + with_execution_isolation(:fiber) do + pool = SolidQueue::ExecutionPools::AsyncPool.new(1) + + pool.shutdown + + assert pool.wait_for_termination(1.second) + ensure + pool&.shutdown + pool&.wait_for_termination(1.second) + end + end + + def test_marks_the_pool_as_fatal_when_an_execution_is_cancelled + with_execution_isolation(:fiber) do + notifications = Thread::Queue.new + started = Thread::Queue.new + reported_errors = [] + original_on_thread_error = SolidQueue.on_thread_error + SolidQueue.on_thread_error = ->(error) { reported_errors << error.class.name } + + pool = SolidQueue::ExecutionPools::AsyncPool.new(1, on_state_change: -> { notifications << :changed }) + + pool.post CancelledExecution.new(started) + Timeout.timeout(1.second) { started.pop } + Timeout.timeout(1.second) { notifications.pop } + + error = assert_raises(Async::Stop) { pool.available_capacity } + assert_equal [ error.class.name ], reported_errors + assert_raises(Async::Stop) { pool.metadata } + ensure + SolidQueue.on_thread_error = original_on_thread_error + pool&.shutdown + pool&.wait_for_termination(1.second) + end + end +end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 3d692404b..5548f70b6 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -3,6 +3,27 @@ class WorkerTest < ActiveSupport::TestCase include ActiveSupport::Testing::MethodCallAssertions + self.use_transactional_tests = false + + EXECUTION_MODES = [ + { + name: "thread", + options: { threads: 3 }, + expected_metadata: { + execution_mode: "thread", + capacity: 3, + thread_pool_size: 3 + } + }, + { + name: "async", + options: { execution_mode: :async, capacity: 3 }, + expected_metadata: { + execution_mode: "async", + capacity: 3 + } + } + ].freeze setup do @worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2) @@ -13,13 +34,106 @@ class WorkerTest < ActiveSupport::TestCase JobBuffer.clear end - test "worker is registered as process" do - @worker.start + EXECUTION_MODES.each do |mode| + test "worker is registered as process in #{mode[:name]} mode" do + with_worker_execution_support(mode[:options]) do + worker = SolidQueue::Worker.new(queues: "background", polling_interval: 0.2, **mode[:options]) + + worker.start + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Worker", process.kind + assert_metadata process, { + queues: "background", + polling_interval: 0.2, + inflight: 0 + }.merge(mode[:expected_metadata]) + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) + end + end + + test "claim and process more enqueued jobs than the pool size allows to process at once in #{mode[:name]} mode" do + 5.times do + StoreResultJob.perform_later(:paused, pause: 0.1.second) + end + + 3.times do + StoreResultJob.perform_later(:immediate) + end + + with_worker_execution_support(mode[:options]) do + worker = SolidQueue::Worker.new(queues: "background", polling_interval: 0.2, **mode[:options]) + + worker.start + + wait_for_jobs_to_finish_for(2.second) + worker.wake_up + + assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :paused).count + assert_equal 3, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) + end + end + + test "updates inflight metadata after jobs finish in #{mode[:name]} mode" do + StoreResultJob.perform_later(:slow, pause: 0.1.second) + + with_worker_execution_support(mode[:options]) do + worker = SolidQueue::Worker.new(queues: "background", polling_interval: 0.05, **mode[:options]) + + worker.start + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + + wait_for(timeout: 2.seconds) { process.reload.metadata["inflight"] == 1 } + wait_for(timeout: 2.seconds) do + process.reload.metadata["inflight"] == 0 && + JobResult.where(queue_name: :background, status: "completed", value: :slow).count == 1 + end + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) + end + end + end + + test "builds the execution pool on boot instead of initialize" do + pool = SolidQueue::ExecutionPools::ThreadPool.new(3) + + SolidQueue::ExecutionPools.expects(:build).once.with do |**options| + options[:mode] == :thread && options[:size] == 3 && options[:on_state_change].respond_to?(:call) + end.returns(pool) + + worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2) + + assert_nil worker.pool + + worker.start wait_for_registered_processes(1, timeout: 1.second) - process = SolidQueue::Process.first - assert_equal "Worker", process.kind - assert_metadata process, { queues: "background", polling_interval: 0.2, thread_pool_size: 3 } + assert_equal pool, worker.pool + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) + end + + test "defaults thread workers to the configured thread pool size" do + worker = SolidQueue::Worker.new(queues: "background", polling_interval: 0.2) + + worker.start + wait_for_registered_processes(1, timeout: 1.second) + + assert_equal 3, worker.pool.size + assert_metadata SolidQueue::Process.first, thread_pool_size: 3, capacity: 3, execution_mode: "thread" + ensure + worker&.stop + wait_for_registered_processes(0, timeout: 1.second) end test "errors on polling are passed to on_thread_error and re-raised" do @@ -85,24 +199,6 @@ class WorkerTest < ActiveSupport::TestCase Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe) end - test "claim and process more enqueued jobs than the pool size allows to process at once" do - 5.times do |i| - StoreResultJob.perform_later(:paused, pause: 0.1.second) - end - - 3.times do |i| - StoreResultJob.perform_later(:immediate) - end - - @worker.start - - wait_for_jobs_to_finish_for(2.second) - @worker.wake_up - - assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :paused).count - assert_equal 3, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count - end - test "polling queries are logged" do log = StringIO.new with_active_record_logger(ActiveSupport::Logger.new(log)) do @@ -173,28 +269,50 @@ class WorkerTest < ActiveSupport::TestCase end test "sleeps `10.minutes` if at capacity" do - 3.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + 3.times { |i| StoreResultJob.perform_later(i, pause: 5.seconds) } - @worker.expects(:interruptible_sleep).with(10.minutes).at_least_once - @worker.expects(:interruptible_sleep).with(@worker.polling_interval).never - @worker.expects(:handle_thread_error).never + delays = stub_interruptible_sleep(@worker) @worker.start - sleep 1.second + + first_delay = Timeout.timeout(1.second) { delays.pop } + + assert_equal 10.minutes, first_delay end test "sleeps `polling_interval` if worker not at capacity" do - 2.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + 2.times { |i| StoreResultJob.perform_later(i, pause: 5.seconds) } - @worker.expects(:interruptible_sleep).with(@worker.polling_interval).at_least_once - @worker.expects(:interruptible_sleep).with(10.minutes).never - @worker.expects(:handle_thread_error).never + delays = stub_interruptible_sleep(@worker) @worker.start - sleep 1.second + + first_delay = Timeout.timeout(1.second) { delays.pop } + + assert_equal @worker.polling_interval, first_delay end private + def stub_interruptible_sleep(worker) + delays = Thread::Queue.new + + worker.stubs(:handle_thread_error) + worker.define_singleton_method(:interruptible_sleep) do |delay| + delays << delay + sleep 0.01 + end + + delays + end + + def with_worker_execution_support(options, &block) + if options[:execution_mode] == :async + with_execution_isolation(:fiber, &block) + else + yield + end + end + def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence yield