Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
},
// Features to add to the dev container. More info: https://containers.dev/features.
"features": {
"ghcr.io/devcontainers/features/docker-in-docker": {},
"ghcr.io/devcontainers/features/docker-in-docker": {
"moby": false
},
"ghcr.io/devcontainers/features/github-cli:1": {
"version": "latest"
},
Expand Down
18 changes: 18 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PATH
solid_queue (1.4.0)
activejob (>= 7.1)
activerecord (>= 7.1)
async (>= 2.24, < 3)
concurrent-ruby (>= 1.3.1)
fugit (~> 1.11)
railties (>= 7.1)
Expand Down Expand Up @@ -55,12 +56,22 @@ GEM
rake
thor (>= 0.14.0)
ast (2.4.2)
async (2.38.1)
console (~> 1.29)
fiber-annotation
io-event (~> 1.11)
metrics (~> 0.12)
traces (~> 0.18)
base64 (0.3.0)
benchmark (0.4.1)
bigdecimal (3.3.1)
builder (3.3.0)
concurrent-ruby (1.3.5)
connection_pool (2.5.4)
console (1.34.3)
fiber-annotation
fiber-local (~> 1.1)
json
crass (1.0.6)
date (3.4.1)
debug (1.9.2)
Expand All @@ -70,6 +81,10 @@ GEM
erubi (1.13.1)
et-orbi (1.2.11)
tzinfo
fiber-annotation (0.2.0)
fiber-local (1.1.0)
fiber-storage
fiber-storage (1.0.1)
fugit (1.11.1)
et-orbi (~> 1, >= 1.2.11)
raabro (~> 1.4)
Expand All @@ -78,6 +93,7 @@ GEM
i18n (1.14.7)
concurrent-ruby (~> 1.0)
io-console (0.8.0)
io-event (1.14.5)
irb (1.14.3)
rdoc (>= 4.0.0)
reline (>= 0.4.2)
Expand All @@ -87,6 +103,7 @@ GEM
loofah (2.23.1)
crass (~> 1.0.2)
nokogiri (>= 1.12.0)
metrics (0.15.0)
minitest (5.26.0)
mocha (2.1.0)
ruby2_keywords (>= 0.0.5)
Expand Down Expand Up @@ -181,6 +198,7 @@ GEM
stringio (3.1.2)
thor (1.3.2)
timeout (0.4.3)
traces (0.18.2)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
unicode-display_width (3.1.3)
Expand Down
34 changes: 27 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
- [Optional scheduler configuration](#optional-scheduler-configuration)
- [Queue order and priorities](#queue-order-and-priorities)
- [Queues specification and performance](#queues-specification-and-performance)
- [Threads, processes, and signals](#threads-processes-and-signals)
- [Threads, fibers, processes, and signals](#threads-fibers-processes-and-signals)
- [Database configuration](#database-configuration)
- [Other configuration settings](#other-configuration-settings)
- [Lifecycle hooks](#lifecycle-hooks)
Expand Down Expand Up @@ -229,6 +229,11 @@ production:
threads: 5
polling_interval: 0.1
processes: 3
- queues: llm
threads: 2
concurrency_model: fiber
fibers: 50
polling_interval: 0.1
scheduler:
dynamic_tasks_enabled: true
polling_interval: 5
Expand Down Expand Up @@ -264,15 +269,18 @@ Here's an overview of the different options:
```

This will create a worker fetching jobs from all queues starting with `staging`. The wildcard `*` is only allowed on its own or at the end of a queue name; you can't specify queue names such as `*_some_queue`. These will be ignored.

Also, if a wildcard (*) is included alongside explicit queue names, for example: `queues: [default, backend, *]`, then it would behave like `queues: *`

Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names.

Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance).

- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker thread uses one connection, and two additional connections are reserved for polling and heartbeat.
- `threads`: this is the number of worker threads for each worker. In the default `thread` concurrency model, it is also the worker's maximum execution capacity, as each worker will fetch up to this many jobs and post them to the thread pool. In the `fiber` concurrency model, these are the long-lived runner threads that host Async reactors, and total execution capacity becomes `threads * fibers`. By default, this is `3`. Only workers have this setting.
- `concurrency_model`: the execution strategy for each worker. It can be `thread` or `fiber`, and defaults to `thread`. `thread` preserves the existing thread-pool behaviour. `fiber` runs jobs on Async fibers inside each worker thread and is intended for scheduler-friendly, IO-bound workloads.
- `fibers`: the maximum number of concurrent fibers per worker thread when `concurrency_model: fiber`. This setting is required when using the `fiber` concurrency model, so a worker configured with `threads: 2` and `fibers: 50` can execute up to `100` jobs concurrently. Please be aware that if you configure a large `capacity = threads * fibers`, you should also check your database server's maximum client connections setting. For example a default MySQL installation set this to 150 concurrent client connections.
- When you use `concurrency_model: fiber`, Solid Queue switches `ActiveSupport::IsolatedExecutionState.isolation_level` to `:fiber` before the worker runs jobs so Rails execution state remains fiber-local. If your application sets this value manually, keep it at `:fiber` anywhere fiber workers run.
- Fiber workers can drive much higher job concurrency than thread workers, so review the connection pools for any databases those jobs use. In particular, size the queue and application database pools for the amount of concurrent job work your fiber workers can create.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#fork-vs-async-mode).
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.

Expand Down Expand Up @@ -362,9 +370,21 @@ queues: back*
```


### Threads, processes, and signals
### Threads, fibers, processes, and signals

Solid Queue now has three concurrency layers for workers:

- Processes: configured with `processes`, or by running multiple `bin/jobs` instances across machines.
- Threads: configured with `threads` for each worker.
- Fibers: enabled per worker with `concurrency_model: fiber` and sized with `fibers`.

In the default `thread` concurrency model, a worker runs jobs directly on its thread pool, so total execution capacity equals `threads`. In the `fiber` concurrency model, each worker thread hosts an Async reactor and can run up to `fibers` jobs concurrently, so total execution capacity equals `threads * fibers`.

Fiber mode is best suited to IO-bound jobs that spend most of their time waiting on network or other scheduler-friendly operations. CPU-bound jobs will not benefit from fiber mode and should continue using the default thread execution model.

This worker-level concurrency model is independent from the supervisor `fork` vs `async` mode described below. `fork` vs `async` decides how Solid Queue process instances are supervised. `thread` vs `fiber` decides how each worker executes jobs locally.

Workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling.
Fiber workers require Rails execution state isolation to be fiber-local. Solid Queue sets `ActiveSupport::IsolatedExecutionState.isolation_level = :fiber` automatically when a fiber worker boots. If your application configures this manually, do not switch it back to `:thread` in processes that run fiber workers.

The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode:
- `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit.
Expand Down Expand Up @@ -781,7 +801,7 @@ SolidQueue.unschedule_recurring_task("my_dynamic_task")

Only dynamic tasks can be unscheduled at runtime. Attempting to unschedule a static task (defined in `config/recurring.yml`) will raise an `ActiveRecord::RecordNotFound` error.

Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them.
Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them.

## Inspiration

Expand Down
33 changes: 33 additions & 0 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Configuration

validate :ensure_configured_processes
validate :ensure_valid_recurring_tasks
validate :ensure_valid_worker_concurrency_settings
validate :ensure_correctly_sized_thread_pool

class Process < Struct.new(:kind, :attributes)
Expand All @@ -18,9 +19,12 @@ def instantiate
queues: "*",
threads: 3,
processes: 1,
concurrency_model: :thread,
polling_interval: 0.1
}

WORKER_CONCURRENCY_MODELS = %w[ thread fiber ].freeze

DISPATCHER_DEFAULTS = {
batch_size: 500,
polling_interval: 1,
Expand Down Expand Up @@ -88,6 +92,31 @@ def ensure_valid_recurring_tasks
end
end

def ensure_valid_worker_concurrency_settings
workers_options.each_with_index do |worker_options, index|
worker_label = "Worker #{index + 1}"
concurrency_model = worker_concurrency_model_for(worker_options)

unless WORKER_CONCURRENCY_MODELS.include?(concurrency_model)
errors.add(:base, "#{worker_label} has unsupported `concurrency_model: #{concurrency_model}`. Valid options are: thread, fiber")
next
end

if concurrency_model == "thread"
if worker_options.key?(:fibers) && !worker_options[:fibers].nil?
errors.add(:base, "#{worker_label} cannot set `fibers` unless `concurrency_model` is `fiber`")
end

next
end

fibers = Integer(worker_options[:fibers], exception: false)
unless fibers&.positive?
errors.add(:base, "#{worker_label} with `concurrency_model: fiber` must set a positive integer `fibers` value")
end
end
end

def ensure_correctly_sized_thread_pool
if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads
errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " +
Expand Down Expand Up @@ -156,6 +185,10 @@ def workers_options
.map { |options| options.dup.symbolize_keys }
end

def worker_concurrency_model_for(worker_options)
worker_options.fetch(:concurrency_model, WORKER_DEFAULTS[:concurrency_model]).to_s
end

def dispatchers_options
@dispatchers_options ||= processes_config.fetch(:dispatchers, [])
.map { |options| options.dup.symbolize_keys }
Expand Down
147 changes: 147 additions & 0 deletions lib/solid_queue/fiber_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# frozen_string_literal: true

require "async"
require "async/barrier"
require "async/queue"
require "async/semaphore"

module SolidQueue
class FiberPool < Worker::ExecutionBackend
ISOLATION_MUTEX = Mutex.new

class Runner
def initialize(index, fibers, name:, on_execution:)
@queue = Async::Queue.new
@booted = Thread::Queue.new

@thread = Thread.new do
Thread.current.name = "#{name}-fiber-#{index}"

Sync do |task|
begin
barrier = Async::Barrier.new(parent: task)
semaphore = Async::Semaphore.new(fibers, parent: task)
execution_state = ActiveSupport::IsolatedExecutionState.context

@booted << true

while (execution = queue.dequeue)
barrier.async(parent: semaphore) do
ActiveSupport::IsolatedExecutionState.share_with(execution_state)
on_execution.call(execution)
end
end

barrier.wait
ensure
barrier&.cancel
end
end
end

@booted.pop
end

def enqueue(execution)
queue.enqueue(execution)
end

def shutdown
queue.close
end

def wait(timeout)
thread.join(timeout)
end

private
attr_reader :queue, :thread
end

attr_reader :threads, :fibers

def initialize(threads, fibers, on_available: nil, on_idle: nil, name: "worker")
ensure_fiber_isolation!

@threads = threads
@fibers = fibers

super(threads * fibers, on_available: on_available || on_idle)

@available_slots = Concurrent::AtomicFixnum.new(capacity)
@next_runner = Concurrent::AtomicFixnum.new(0)
@mutex = Mutex.new
@shutdown = false

@runners = Array.new(threads) do |index|
Runner.new(index, fibers, name: name, on_execution: method(:run_execution))
end
end

def post(execution)
raise Concurrent::RejectedExecutionError, "fiber pool is shut down" if shutdown?

available_slots.decrement
next_runner.enqueue(execution)
rescue Exception
available_slots.increment if available_slots.value < capacity
raise
end

def available_capacity
available_slots.value
end

def shutdown
mutex.synchronize do
return if @shutdown

@shutdown = true
runners.each(&:shutdown)
end
end

def shutdown?
@shutdown
end

def wait_for_termination(timeout)
deadline = Concurrent.monotonic_time + timeout.to_f

runners.all? do |runner|
remaining = deadline - Concurrent.monotonic_time
break false if remaining <= 0

runner.wait(remaining)
end
end

private
attr_reader :available_slots, :mutex, :runners

def ensure_fiber_isolation!
ISOLATION_MUTEX.synchronize do
return if ActiveSupport::IsolatedExecutionState.isolation_level == :fiber

ActiveSupport::IsolatedExecutionState.isolation_level = :fiber
end
end

def next_runner
runners[(next_runner_index.increment - 1) % runners.size]
end

def next_runner_index
@next_runner
end

def run_execution(execution)
perform(execution)
rescue Exception => error
handle_thread_error(error)
ensure
available_slots.increment
mutex.synchronize { notify_available }
end
end
end
Loading
Loading