Skip to content

Replace queues with promises, clean up naming#10

Open
henrikbjorn wants to merge 8 commits intomasterfrom
promises-and-cleanup
Open

Replace queues with promises, clean up naming#10
henrikbjorn wants to merge 8 commits intomasterfrom
promises-and-cleanup

Conversation

@henrikbjorn
Copy link
Copy Markdown
Member

Why

The previous implementation used Async::Queue for reply and app-complete channels, plus Async::Semaphore for command serialization. Each command only ever has one pending response, so a multi-item queue and mutex were heavier than needed.

What changed

Async::Promise arrays replace Queue + Semaphore

Each send_message and execute_app creates its own Async::Promise and pushes it onto an array. The reader fiber resolves them in FIFO order (matching FreeSWITCH reply ordering). No mutex needed — Ruby's cooperative fiber scheduling guarantees the array push happens before the I/O yield point, so interleaving from concurrent event-hook fibers is safe.

connection_closed uses promise.reject(ConnectionError) instead of pushing nil sentinels — the error is raised directly at the promise.wait call site.

EM-style naming

Protocol::Connection methods renamed to send_data/receive_data for consistency. Listener::Base uses receive_data as the entry point (replacing receive_message). The response accessor and handle_response indirection are removed — the response is passed as a parameter through the call chain.

execute_app moved to Base

Both Inbound and Outbound can now execute applications via execute_app(app, uuid, args, **params). Outbound's application method wraps it with session[:unique_id] and session tracking. Inbound can call execute_app directly with an explicit UUID.

CommandDelegate extracted to its own file

Autoloaded via librevox.rb.

Error propagation

Removed the silent rescue in Outbound#run_session — errors now propagate to Client/Server which already handle them. Added ResponseError to Client's reconnect loop so auth/handshake errors trigger reconnection instead of crashing.

Removed update_session

The session is already updated automatically from every CHANNEL_EXECUTE_COMPLETE and CHANNEL_DATA event.

Test cleanup

  • Extracted setup_outbound/teardown_outbound helpers (eliminates duplicated 7-line setup across 8+ test classes)
  • Renamed test methods to read as prose
  • Renamed files and classes to reflect current behavior (e.g. outbound_non_nested_testoutbound_data_passing_test)
  • Fixed stale comments referencing queues

The previous implementation used Async::Queue for reply/app-complete
channels and Async::Semaphore for command serialization. This was
heavier than needed — each command only ever has one pending response.

Replace with arrays of Async::Promise. Each send_message/execute_app
creates its own promise and pushes it onto an array. The reader fiber
resolves them in FIFO order. No mutex needed — cooperative fiber
scheduling guarantees the array push happens before the yield point.

Also:
- Rename send_message/read_message to send_data/receive_data on
  Protocol::Connection for consistent EM-style naming
- Move execute_app to Base so both Inbound and Outbound can execute
  applications (Outbound's application method passes session UUID)
- Extract CommandDelegate to its own file
- Use promise.reject in connection_closed instead of nil sentinel
- Remove update_session (session is updated automatically)
- Remove silent rescue in Outbound#run_session — errors propagate
  to Client/Server which already handle them
- Add ResponseError to Client reconnect loop
- Extract setup_outbound/teardown_outbound helpers to eliminate
  duplicated 7-line setup across 8+ test classes
- Rename test methods to read as prose (e.g. test_dispatches_matching_event_hooks)
- Rename files/classes to reflect current behavior:
  outbound_non_nested_test → outbound_data_passing_test
  outbound_session_test → outbound_handshake_test
  OutboundListenerWithNestedApps → OutboundListenerWithSequentialApps
- Remove dead code (attr_reader :queue, update_session tests)
- Fix stale comments referencing queues instead of promises
@henrikbjorn henrikbjorn requested review from c960657 and mkyed March 30, 2026 08:46
@henrikbjorn henrikbjorn self-assigned this Mar 30, 2026
The "Two fibers per connection" section still referenced Async::Queue
and Async::Semaphore. Updated to describe the Async::Promise array
pattern and promise rejection on disconnect. Also updated the sendmsg
example to show the UUID parameter.
ConnectionError is expected when either side hangs up — the read loop
closes, connection_closed rejects pending promises, and the error
surfaces in run_session or event hook fibers. Previously this was
silently swallowed by a blanket rescue in Outbound#run_session (removed
in the promise refactor), causing it to propagate to Server#accept's
catch-all and log noisy "Session error" messages.

Fix by rescuing ConnectionError in three targeted places:
- Server#handle_session and Client#handle_session — suppresses the
  error from run_session so it doesn't reach the generic error handler
- Base#connection_closed — suppresses the error from event_barrier.wait
  when event hooks were mid-command at disconnect

Consumers no longer need to rescue ConnectionError in session_initiated.
Listeners no longer need to know about IO::Endpoint or default ports.
Client.start and Server.start handle endpoint creation, and listeners
just delegate: Client.start(self, **options). Runner uses barrier.async
to wrap the blocking start call.
@henrikbjorn henrikbjorn force-pushed the promises-and-cleanup branch from 785fdac to 0775e3a Compare March 30, 2026 09:34
Comment on lines +19 to +20
def self.start(...)
Client.start(self, ...)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Den syntaks kendte jeg ikke, men nice

Copy link
Copy Markdown
Member

@mkyed mkyed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems legit

execute_app now stores promises in a hash keyed by UUID instead of an
array. CHANNEL_EXECUTE_COMPLETE events resolve the promise matching
their unique_id. This is more correct and enables concurrent app
execution on different channels (e.g. inbound mode).
execute_app now generates a SecureRandom.uuid and sends it as the
Event-UUID header on sendmsg. FreeSWITCH echoes this back as
Application-UUID in CHANNEL_EXECUTE_COMPLETE, which is used to
resolve the correct promise. This replaces matching by channel UUID
and properly correlates each individual app execution.

Also:
- Defaults go first in header merge so params can override (e.g.
  event_lock: false)
- Use IO::Stream flush: true instead of separate flush call
- Remove redundant closed? guard in Connection#close (IO::Stream
  already guards)
CommandSocket called the old send_message/read_message methods on
Protocol::Connection which were renamed to send_data/receive_data.
Also reorder application(app, uuid, args) to match execute_app.

Add functional tests that verify auth handshake, API commands, and
sendmsg application execution against a real TCP server.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants