Replace queues with promises, clean up naming#10
Open
henrikbjorn wants to merge 8 commits intomasterfrom
Open
Conversation
The previous implementation used Async::Queue for reply/app-complete channels and Async::Semaphore for command serialization. This was heavier than needed — each command only ever has one pending response. Replace with arrays of Async::Promise. Each send_message/execute_app creates its own promise and pushes it onto an array. The reader fiber resolves them in FIFO order. No mutex needed — cooperative fiber scheduling guarantees the array push happens before the yield point. Also: - Rename send_message/read_message to send_data/receive_data on Protocol::Connection for consistent EM-style naming - Move execute_app to Base so both Inbound and Outbound can execute applications (Outbound's application method passes session UUID) - Extract CommandDelegate to its own file - Use promise.reject in connection_closed instead of nil sentinel - Remove update_session (session is updated automatically) - Remove silent rescue in Outbound#run_session — errors propagate to Client/Server which already handle them - Add ResponseError to Client reconnect loop
- Extract setup_outbound/teardown_outbound helpers to eliminate duplicated 7-line setup across 8+ test classes - Rename test methods to read as prose (e.g. test_dispatches_matching_event_hooks) - Rename files/classes to reflect current behavior: outbound_non_nested_test → outbound_data_passing_test outbound_session_test → outbound_handshake_test OutboundListenerWithNestedApps → OutboundListenerWithSequentialApps - Remove dead code (attr_reader :queue, update_session tests) - Fix stale comments referencing queues instead of promises
The "Two fibers per connection" section still referenced Async::Queue and Async::Semaphore. Updated to describe the Async::Promise array pattern and promise rejection on disconnect. Also updated the sendmsg example to show the UUID parameter.
ConnectionError is expected when either side hangs up — the read loop closes, connection_closed rejects pending promises, and the error surfaces in run_session or event hook fibers. Previously this was silently swallowed by a blanket rescue in Outbound#run_session (removed in the promise refactor), causing it to propagate to Server#accept's catch-all and log noisy "Session error" messages. Fix by rescuing ConnectionError in three targeted places: - Server#handle_session and Client#handle_session — suppresses the error from run_session so it doesn't reach the generic error handler - Base#connection_closed — suppresses the error from event_barrier.wait when event hooks were mid-command at disconnect Consumers no longer need to rescue ConnectionError in session_initiated.
Listeners no longer need to know about IO::Endpoint or default ports. Client.start and Server.start handle endpoint creation, and listeners just delegate: Client.start(self, **options). Runner uses barrier.async to wrap the blocking start call.
785fdac to
0775e3a
Compare
mkyed
reviewed
Mar 30, 2026
Comment on lines
+19
to
+20
| def self.start(...) | ||
| Client.start(self, ...) |
Member
There was a problem hiding this comment.
Den syntaks kendte jeg ikke, men nice
execute_app now stores promises in a hash keyed by UUID instead of an array. CHANNEL_EXECUTE_COMPLETE events resolve the promise matching their unique_id. This is more correct and enables concurrent app execution on different channels (e.g. inbound mode).
execute_app now generates a SecureRandom.uuid and sends it as the Event-UUID header on sendmsg. FreeSWITCH echoes this back as Application-UUID in CHANNEL_EXECUTE_COMPLETE, which is used to resolve the correct promise. This replaces matching by channel UUID and properly correlates each individual app execution. Also: - Defaults go first in header merge so params can override (e.g. event_lock: false) - Use IO::Stream flush: true instead of separate flush call - Remove redundant closed? guard in Connection#close (IO::Stream already guards)
CommandSocket called the old send_message/read_message methods on Protocol::Connection which were renamed to send_data/receive_data. Also reorder application(app, uuid, args) to match execute_app. Add functional tests that verify auth handshake, API commands, and sendmsg application execution against a real TCP server.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Why
The previous implementation used
Async::Queuefor reply and app-complete channels, plusAsync::Semaphorefor command serialization. Each command only ever has one pending response, so a multi-item queue and mutex were heavier than needed.What changed
Async::Promise arrays replace Queue + Semaphore
Each
send_messageandexecute_appcreates its ownAsync::Promiseand pushes it onto an array. The reader fiber resolves them in FIFO order (matching FreeSWITCH reply ordering). No mutex needed — Ruby's cooperative fiber scheduling guarantees the array push happens before the I/O yield point, so interleaving from concurrent event-hook fibers is safe.connection_closedusespromise.reject(ConnectionError)instead of pushing nil sentinels — the error is raised directly at thepromise.waitcall site.EM-style naming
Protocol::Connectionmethods renamed tosend_data/receive_datafor consistency.Listener::Baseusesreceive_dataas the entry point (replacingreceive_message). Theresponseaccessor andhandle_responseindirection are removed — the response is passed as a parameter through the call chain.execute_appmoved to BaseBoth Inbound and Outbound can now execute applications via
execute_app(app, uuid, args, **params). Outbound'sapplicationmethod wraps it withsession[:unique_id]and session tracking. Inbound can callexecute_appdirectly with an explicit UUID.CommandDelegateextracted to its own fileAutoloaded via
librevox.rb.Error propagation
Removed the silent
rescueinOutbound#run_session— errors now propagate toClient/Serverwhich already handle them. AddedResponseErrorto Client's reconnect loop so auth/handshake errors trigger reconnection instead of crashing.Removed
update_sessionThe session is already updated automatically from every
CHANNEL_EXECUTE_COMPLETEandCHANNEL_DATAevent.Test cleanup
setup_outbound/teardown_outboundhelpers (eliminates duplicated 7-line setup across 8+ test classes)outbound_non_nested_test→outbound_data_passing_test)