From fc0742a9387cd398279f61e8d7f26259a8800a4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 30 Mar 2026 10:45:23 +0200 Subject: [PATCH 01/15] Replace Async::Queue and Semaphore with Async::Promise arrays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation used Async::Queue for reply/app-complete channels and Async::Semaphore for command serialization. This was heavier than needed — each command only ever has one pending response. Replace with arrays of Async::Promise. Each send_message/execute_app creates its own promise and pushes it onto an array. The reader fiber resolves them in FIFO order. No mutex needed — cooperative fiber scheduling guarantees the array push happens before the yield point. Also: - Rename send_message/read_message to send_data/receive_data on Protocol::Connection for consistent EM-style naming - Move execute_app to Base so both Inbound and Outbound can execute applications (Outbound's application method passes session UUID) - Extract CommandDelegate to its own file - Use promise.reject in connection_closed instead of nil sentinel - Remove update_session (session is updated automatically) - Remove silent rescue in Outbound#run_session — errors propagate to Client/Server which already handle them - Add ResponseError to Client reconnect loop --- lib/librevox.rb | 1 + lib/librevox/client.rb | 10 +- lib/librevox/command_delegate.rb | 19 ++++ lib/librevox/listener/base.rb | 98 ++++++++++--------- lib/librevox/listener/outbound.rb | 62 +++--------- lib/librevox/protocol/connection.rb | 6 +- lib/librevox/server.rb | 12 +-- ..._test.rb => outbound_data_passing_test.rb} | 0 ...ion_test.rb => outbound_handshake_test.rb} | 0 9 files changed, 100 insertions(+), 108 deletions(-) create mode 100644 lib/librevox/command_delegate.rb rename test/functional/librevox/listener/{outbound_non_nested_test.rb => outbound_data_passing_test.rb} (100%) rename test/functional/librevox/listener/{outbound_session_test.rb => outbound_handshake_test.rb} (100%) diff --git a/lib/librevox.rb b/lib/librevox.rb index eb618b3..83d7642 100644 --- a/lib/librevox.rb +++ b/lib/librevox.rb @@ -9,6 +9,7 @@ class ConnectionError < StandardError; end autoload :Client, 'librevox/client' autoload :CommandSocket, 'librevox/command_socket' + autoload :CommandDelegate, 'librevox/command_delegate' autoload :Commands, 'librevox/commands' autoload :Applications, 'librevox/applications' autoload :Runner, 'librevox/runner' diff --git a/lib/librevox/client.rb b/lib/librevox/client.rb index 3fbb9df..d33670d 100644 --- a/lib/librevox/client.rb +++ b/lib/librevox/client.rb @@ -21,7 +21,7 @@ def connect(socket) def run loop do @endpoint.connect(&method(:connect)) - rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError => e + rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError, ResponseError => e Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s." sleep 1 end @@ -44,17 +44,17 @@ def start_read_loop(connection, listener) Async do read_messages(connection, listener) ensure - # Close queues here (not in handle_session's ensure) so that - # a connection drop unblocks listener.run_session via nil dequeue. + # Reject pending promises here (not in handle_session's ensure) so + # that a connection drop unblocks listener.run_session via rejection. # handle_session's ensure can't run until run_session returns, - # creating a deadlock if queues aren't closed from this fiber. + # creating a deadlock if promises aren't rejected from this fiber. listener.connection_closed end end def read_messages(connection, listener) connection.read_loop do |msg| - listener.receive_message(msg) + listener.receive_data(msg) end end end diff --git a/lib/librevox/command_delegate.rb b/lib/librevox/command_delegate.rb new file mode 100644 index 0000000..71e0670 --- /dev/null +++ b/lib/librevox/command_delegate.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Librevox + # In some cases there are both applications and commands with the same + # name, e.g. fifo. But we can't have two `fifo`-methods, so we include + # commands in CommandDelegate, and expose all commands through the `api` + # method, which wraps a CommandDelegate instance. + class CommandDelegate + include Librevox::Commands + + def initialize(listener) + @listener = listener + end + + def command(*args) + @listener.send_message(super(*args)) + end + end +end diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index b063de9..730fefd 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -1,19 +1,10 @@ # frozen_string_literal: true -require 'async/queue' -require 'async/semaphore' require 'async/barrier' module Librevox module Listener class Base - def initialize(connection) - @connection = connection - @reply_queue = Async::Queue.new - @command_mutex = Async::Semaphore.new(1) - @event_barrier = Async::Barrier.new - end - class << self def hooks @hooks ||= Hash.new {|hash, key| hash[key] = []} @@ -24,20 +15,11 @@ def event(event, &block) end end - # In some cases there are both applications and commands with the same - # name, e.g. fifo. But we can't have two `fifo`-methods, so we include - # commands in CommandDelegate, and expose all commands through the `api` - # method, which wraps a CommandDelegate instance. - class CommandDelegate - include Librevox::Commands - - def initialize(listener) - @listener = listener - end - - def command(*args) - @listener.send_message(super(*args)) - end + def initialize(connection) + @connection = connection + @reply_promises = [] + @app_promises = [] + @event_barrier = Async::Barrier.new end # Exposes an instance of {CommandDelegate}, which includes {Librevox::Commands}. @@ -51,46 +33,62 @@ def api end def send_message(msg) - @command_mutex.acquire do - @connection.send_message(msg) - reply = @reply_queue.dequeue - raise ConnectionError, "Connection closed" if reply.nil? - raise ResponseError, reply.headers[:reply_text] if reply.error? - reply - end - end + promise = Async::Promise.new + + @reply_promises << promise - attr_accessor :response + @connection.send_data(msg) - def receive_message(response) - @response = response - handle_response + reply = promise.wait + raise ResponseError, reply.headers[:reply_text] if reply.error? + + reply end - def handle_response + def execute_app(app, uuid, args = nil, **params) + headers = params + .merge( + event_lock: true, + call_command: "execute", + execute_app_name: app, + execute_app_arg: args, + ) + .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } + + send_message "sendmsg #{uuid}\n#{headers.join("\n")}" + + promise = Async::Promise.new + @app_promises << promise + promise.wait + end + + def receive_data(response) if response.reply? - @reply_queue.push(response) + @reply_promises.shift&.resolve(response) return end if response.event? - resp = response + if response.event == "CHANNEL_EXECUTE_COMPLETE" + @app_promises.shift&.resolve(response) + end + @event_barrier.async do - on_event(resp) - invoke_event_hooks(resp) + on_event(response) + invoke_event_hooks(response) end end end - # override - def on_event(event) - end + def connection_closed + error = ConnectionError.new("Connection closed") - def run_session - end + @reply_promises.each { |p| p.reject(error) } + @app_promises.each { |p| p.reject(error) } + + @reply_promises.clear + @app_promises.clear - def connection_closed - @reply_queue.close @event_barrier.wait end @@ -100,6 +98,12 @@ def disconnect private + def on_event(event) + end + + def run_session + end + def invoke_event_hooks(resp) event_name = resp.event.downcase.to_sym hooks = self.class.hooks[event_name] diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index fa1a57b..f00cda3 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -13,42 +13,11 @@ def self.run(barrier, host: "localhost", port: 8084, **options) barrier.async { server.run } end - def application(app, args = nil, **params) - variable_name = params.delete(:variable) - - headers = params - .merge( - event_lock: true, - call_command: "execute", - execute_app_name: app, - execute_app_arg: args, - ) - .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } - - send_message "sendmsg\n#{headers.join("\n")}" - - response = @app_complete_queue.dequeue - - if response.nil? - raise ConnectionError, "Connection closed" - end - - @session = response.content - - variable(variable_name) if variable_name - end - attr_accessor :session - # Called when a new session is initiated. - def session_initiated - end - def initialize(connection, options = {}) super(connection) - @session = nil - @app_complete_queue = Async::Queue.new end def run_session @@ -58,33 +27,32 @@ def run_session send_message "linger" session_initiated - rescue ResponseError, ConnectionError, IOError, Errno::EPIPE => e - Librevox.logger.error "Session error: #{e.message}" end - def connection_closed - super - - @app_complete_queue.close + # Called when a new session is initiated. + def session_initiated end - def handle_response - if response.event? && response.event == "CHANNEL_DATA" - @session = response.content - elsif response.event? && response.event == "CHANNEL_EXECUTE_COMPLETE" - @app_complete_queue.push(response) - end + def application(app, args = nil, **params) + variable_name = params.delete(:variable) - super + response = execute_app(app, session[:unique_id], args, **params) + + @session = response.content + + variable(variable_name) if variable_name end def variable(name) session[:"variable_#{name}"] end - def update_session - response = api.command "uuid_dump", session[:unique_id] - @session = response.content + def receive_data(response) + if response.event? && response.event == "CHANNEL_DATA" + @session = response.content + end + + super end end end diff --git a/lib/librevox/protocol/connection.rb b/lib/librevox/protocol/connection.rb index 397c4b9..f9ed83e 100644 --- a/lib/librevox/protocol/connection.rb +++ b/lib/librevox/protocol/connection.rb @@ -7,7 +7,7 @@ def initialize(stream) @stream = stream end - def read_message + def receive_data loop do headers = @stream.read_until("\n\n") return nil if headers.nil? @@ -25,12 +25,12 @@ def read_message end def read_loop - while (msg = read_message) + while (msg = receive_data) yield msg end end - def send_message(msg) + def send_data(msg) @stream.write("#{msg}\n\n") @stream.flush end diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index 32fde58..7a80c01 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -4,14 +4,14 @@ module Librevox class Server + attr :endpoint + def initialize(handler, endpoint, **options) @handler = handler @endpoint = endpoint @options = options end - attr :endpoint - def accept(socket, _address) stream = IO::Stream(socket) connection = Protocol::Connection.new(stream) @@ -46,10 +46,10 @@ def start_read_loop(connection, listener) Async do read_messages(connection, listener) ensure - # Close queues here (not in handle_session's ensure) so that - # a connection drop unblocks listener.run_session via nil dequeue. + # Reject pending promises here (not in handle_session's ensure) so + # that a connection drop unblocks listener.run_session via rejection. # handle_session's ensure can't run until run_session returns, - # creating a deadlock if queues aren't closed from this fiber. + # creating a deadlock if promises aren't rejected from this fiber. listener.connection_closed end end @@ -62,7 +62,7 @@ def read_messages(connection, listener) if msg.disconnect_notice? disconnecting = true else - listener.receive_message(msg) + listener.receive_data(msg) hung_up = true if msg.event? && msg.event == "CHANNEL_HANGUP_COMPLETE" end break if disconnecting && hung_up diff --git a/test/functional/librevox/listener/outbound_non_nested_test.rb b/test/functional/librevox/listener/outbound_data_passing_test.rb similarity index 100% rename from test/functional/librevox/listener/outbound_non_nested_test.rb rename to test/functional/librevox/listener/outbound_data_passing_test.rb diff --git a/test/functional/librevox/listener/outbound_session_test.rb b/test/functional/librevox/listener/outbound_handshake_test.rb similarity index 100% rename from test/functional/librevox/listener/outbound_session_test.rb rename to test/functional/librevox/listener/outbound_handshake_test.rb From 99eba6c89104a5ed3d6b7ab63809f0c42190c601 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 30 Mar 2026 10:45:29 +0200 Subject: [PATCH 02/15] Clean up test suite after promise refactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract setup_outbound/teardown_outbound helpers to eliminate duplicated 7-line setup across 8+ test classes - Rename test methods to read as prose (e.g. test_dispatches_matching_event_hooks) - Rename files/classes to reflect current behavior: outbound_non_nested_test → outbound_data_passing_test outbound_session_test → outbound_handshake_test OutboundListenerWithNestedApps → OutboundListenerWithSequentialApps - Remove dead code (attr_reader :queue, update_session tests) - Fix stale comments referencing queues instead of promises --- .../functional/librevox/listener/base_test.rb | 2 +- .../librevox/listener/inbound_filter_test.rb | 2 +- .../librevox/listener/inbound_test.rb | 2 +- .../librevox/listener/outbound_api_test.rb | 20 ++---- .../librevox/listener/outbound_apps_test.rb | 66 +++++++------------ .../listener/outbound_data_passing_test.rb | 25 +++---- .../librevox/listener/outbound_error_test.rb | 59 +++++------------ .../listener/outbound_handshake_test.rb | 43 +----------- .../librevox/listener/outbound_reader_test.rb | 25 +++---- .../librevox/listener/outbound_test.rb | 55 ++++++++++------ .../librevox/protocol/connection_test.rb | 26 ++++---- test/integration/outbound_disconnect_test.rb | 6 +- test/support/listener.rb | 37 ++++++----- test/test_helper.rb | 20 ++---- 14 files changed, 147 insertions(+), 241 deletions(-) diff --git a/test/functional/librevox/listener/base_test.rb b/test/functional/librevox/listener/base_test.rb index 36918a7..7892439 100644 --- a/test/functional/librevox/listener/base_test.rb +++ b/test/functional/librevox/listener/base_test.rb @@ -10,7 +10,7 @@ def setup @listener = @class.new(MockConnection.new) end - # Without Async in handle_response, on_event calling api.* deadlocks the + # Without Async in receive_data, on_event calling api.* deadlocks the # calling fiber. A Thread timeout is the only way to detect this — all # Async fibers are stuck so Async-level timeouts can't fire. def test_on_event_with_api_does_not_block_handle_response diff --git a/test/functional/librevox/listener/inbound_filter_test.rb b/test/functional/librevox/listener/inbound_filter_test.rb index e6e4d4a..d41a43d 100644 --- a/test/functional/librevox/listener/inbound_filter_test.rb +++ b/test/functional/librevox/listener/inbound_filter_test.rb @@ -36,7 +36,7 @@ def teardown super end - def test_authorize_and_subscribe_to_events + def test_sends_auth_events_and_filters assert_equal "auth ClueCon", @listener.outgoing_data.shift assert_equal "event plain CUSTOM CHANNEL_EXECUTE", @listener.outgoing_data.shift assert_equal "filter Caller-Context default", @listener.outgoing_data.shift diff --git a/test/functional/librevox/listener/inbound_test.rb b/test/functional/librevox/listener/inbound_test.rb index 24d9d54..407ce77 100644 --- a/test/functional/librevox/listener/inbound_test.rb +++ b/test/functional/librevox/listener/inbound_test.rb @@ -29,7 +29,7 @@ def teardown super end - def test_authorize_and_subscribe_to_events + def test_sends_auth_and_event_subscription assert_equal "auth ClueCon", @listener.outgoing_data.shift assert_equal "event plain ALL", @listener.outgoing_data.shift assert_nil @listener.outgoing_data.shift diff --git a/test/functional/librevox/listener/outbound_api_test.rb b/test/functional/librevox/listener/outbound_api_test.rb index 13659bc..21a56f8 100644 --- a/test/functional/librevox/listener/outbound_api_test.rb +++ b/test/functional/librevox/listener/outbound_api_test.rb @@ -12,33 +12,27 @@ def session_initiated end end -class TestOutboundListenerWithAppsAndApi < Minitest::Test +class TestOutboundAppsAndApi < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithAppsAndApi.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithAppsAndApi end def teardown - @session_task&.stop + teardown_outbound super end - def test_wait_for_execute_complete_before_calling_next_app_or_cmd - assert_send_application @listener, "foo" - execute_complete + def test_waits_for_execute_complete_before_api_and_next_app + assert_execute_app @listener, "foo", "1234" + execute_complete "Unique-ID" => "1234" assert_send_command @listener, "api bar" api_response body: "+OK" - assert_send_application @listener, "baz" + assert_execute_app @listener, "baz", "1234" end end diff --git a/test/functional/librevox/listener/outbound_apps_test.rb b/test/functional/librevox/listener/outbound_apps_test.rb index 7a861c9..4faf8c7 100644 --- a/test/functional/librevox/listener/outbound_apps_test.rb +++ b/test/functional/librevox/listener/outbound_apps_test.rb @@ -4,7 +4,7 @@ require 'librevox/listener/outbound' -class OutboundListenerWithNestedApps < Librevox::Listener::Outbound +class OutboundListenerWithSequentialApps < Librevox::Listener::Outbound def session_initiated sample_app "foo" sample_app "bar" @@ -23,38 +23,32 @@ def session_initiated end end -class TestOutboundListenerWithApps < Minitest::Test +class TestOutboundSequentialApps < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithNestedApps.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithSequentialApps end def teardown - @session_task&.stop + teardown_outbound super end - def test_only_send_one_app_at_a_time - assert_send_application @listener, "foo" + def test_sends_one_app_at_a_time + assert_execute_app @listener, "foo", "1234" assert_send_nothing @listener - execute_complete + execute_complete "Unique-ID" => "1234" - assert_send_application @listener, "bar" + assert_execute_app @listener, "bar", "1234" assert_send_nothing @listener end - def test_not_be_driven_forward_by_events - assert_send_application @listener, "foo" + def test_events_do_not_advance_app + assert_execute_app @listener, "foo", "1234" command_reply body: { "Event-Name" => "CHANNEL_EXECUTE", @@ -64,16 +58,16 @@ def test_not_be_driven_forward_by_events assert_send_nothing @listener end - def test_not_be_driven_forward_by_api_responses - assert_send_application @listener, "foo" + def test_api_responses_do_not_advance_app + assert_execute_app @listener, "foo", "1234" api_response body: "Foo" assert_send_nothing @listener end - def test_not_be_driven_forward_by_disconnect_notifications - assert_send_application @listener, "foo" + def test_disconnect_notices_do_not_advance_app + assert_execute_app @listener, "foo", "1234" response "Content-Type" => "text/disconnect-notice", body: "Lingering" @@ -81,8 +75,8 @@ def test_not_be_driven_forward_by_disconnect_notifications assert_send_nothing @listener end - def test_not_be_driven_forward_by_command_reply - assert_send_application @listener, "foo" + def test_command_replies_do_not_advance_app + assert_execute_app @listener, "foo", "1234" command_reply "Reply-Text" => "+OK" @@ -90,52 +84,40 @@ def test_not_be_driven_forward_by_command_reply end end -class TestOutboundListenerWithCustomHeaders < Minitest::Test +class TestOutboundCustomHeaders < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithCustomHeaders.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithCustomHeaders end def teardown - @session_task&.stop + teardown_outbound super end def test_sends_custom_headers - assert_send_application @listener, "playback", "/tmp/test.wav", loops: 3 + assert_execute_app @listener, "playback", "1234", "/tmp/test.wav", loops: 3 end end -class TestOutboundListenerWithEventLockOverride < Minitest::Test +class TestOutboundEventLockOverride < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithEventLockOverride.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithEventLockOverride end def teardown - @session_task&.stop + teardown_outbound super end def test_overrides_event_lock - assert_send_application @listener, "playback", "/tmp/test.wav", event_lock: false + assert_execute_app @listener, "playback", "1234", "/tmp/test.wav", event_lock: false end end diff --git a/test/functional/librevox/listener/outbound_data_passing_test.rb b/test/functional/librevox/listener/outbound_data_passing_test.rb index f55c628..9c54525 100644 --- a/test/functional/librevox/listener/outbound_data_passing_test.rb +++ b/test/functional/librevox/listener/outbound_data_passing_test.rb @@ -4,8 +4,7 @@ require 'librevox/listener/outbound' -class OutboundListenerWithNonNestedApps < Librevox::Listener::Outbound - attr_reader :queue +class OutboundListenerWithDataPassing < Librevox::Listener::Outbound def session_initiated sample_app "foo" data = reader_app @@ -13,33 +12,27 @@ def session_initiated end end -class TestOutboundListenerWithNonNestedApps < Minitest::Test +class TestOutboundDataPassing < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithNonNestedApps.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithDataPassing end def teardown - @session_task&.stop + teardown_outbound super end - def test_wait_for_execute_complete_before_calling_next_app - assert_send_application @listener, "foo" + def test_passes_data_between_sequential_apps + assert_execute_app @listener, "foo", "1234" execute_complete "Unique-ID" => "1234" - assert_send_application @listener, "reader_app" - execute_complete "variable_app_var" => "Second" + assert_execute_app @listener, "reader_app", "1234" + execute_complete "variable_app_var" => "Second", "Unique-ID" => "1234" - assert_send_application @listener, "send", "the end: Second" + assert_execute_app @listener, "send", "1234", "the end: Second" end end diff --git a/test/functional/librevox/listener/outbound_error_test.rb b/test/functional/librevox/listener/outbound_error_test.rb index fe0ece6..e19aa0c 100644 --- a/test/functional/librevox/listener/outbound_error_test.rb +++ b/test/functional/librevox/listener/outbound_error_test.rb @@ -26,24 +26,17 @@ class TestOutboundApplicationError < Minitest::Test include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithErrorApp.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times { @listener.outgoing_data.shift } + setup_outbound OutboundListenerWithErrorApp end def teardown - @session_task&.stop + teardown_outbound super end def test_application_raises_on_error_reply - assert_send_application @listener, "fail" + assert_execute_app @listener, "fail", "1234" - # sendmsg ack with error — raises instead of blocking on app_complete_queue command_reply "Reply-Text" => "-ERR invalid command" assert_instance_of Librevox::ResponseError, @listener.error @@ -56,36 +49,20 @@ class TestOutboundUnhandledApplicationError < Minitest::Test include OutboundSetupHelpers include Librevox::Test::Matchers - def setup - @listener = OutboundListenerWithUnhandledErrorApp.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times { @listener.outgoing_data.shift } - end - - def teardown - @session_task&.stop - super - end - - def test_unhandled_error_ends_session_cleanly - assert_send_application @listener, "fail" - - log = StringIO.new - original_logger = Librevox.logger - Librevox.logger = Logger.new(log) - - # sendmsg ack with error — run_session rescues and logs, no crash - command_reply "Reply-Text" => "-ERR invalid command" - - # session task completes without raising - @session_task.wait - - assert_match(/-ERR invalid command/, log.string) - ensure - Librevox.logger = original_logger + def test_unhandled_error_propagates + setup_outbound OutboundListenerWithUnhandledErrorApp + + assert_execute_app @listener, "fail", "1234" + + # Send error reply and wait on task without yielding in between, + # so async doesn't log an unhandled exception warning. + error_reply = Librevox::Protocol::Response.new( + "Content-Type: command/reply\nReply-Text: -ERR invalid command", "" + ) + error = assert_raises(Librevox::ResponseError) do + @listener.receive_data(error_reply) + @session_task.wait + end + assert_equal "-ERR invalid command", error.message end end diff --git a/test/functional/librevox/listener/outbound_handshake_test.rb b/test/functional/librevox/listener/outbound_handshake_test.rb index 211c111..8458b8f 100644 --- a/test/functional/librevox/listener/outbound_handshake_test.rb +++ b/test/functional/librevox/listener/outbound_handshake_test.rb @@ -4,48 +4,7 @@ require 'librevox/listener/outbound' -class OutboundListenerWithUpdateSessionCallback < Librevox::Listener::Outbound - def session_initiated - update_session - application "send", "yay, #{session[:session_var]}" - end -end - -class TestOutboundListenerWithUpdateSessionCallback < Minitest::Test - prepend Librevox::Test::AsyncTest - include OutboundSetupHelpers - include Librevox::Test::Matchers - - def setup - @listener = OutboundListenerWithUpdateSessionCallback.new(MockConnection.new) - @session_task = Async { @listener.run_session } - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} - - assert_update_session @listener - api_response body: { - "Event-Name" => "CHANNEL_DATA", - "Session-Var" => "Second" - } - end - - def teardown - @session_task&.stop - super - end - - def test_execute_callback - assert_match(/yay,/, @listener.outgoing_data.shift) - end - - def test_update_session_before_calling_callback - assert_send_application @listener, "send", "yay, Second" - end -end - -class TestOutboundReplyQueueOrdering < Minitest::Test +class TestOutboundHandshakeOrdering < Minitest::Test prepend Librevox::Test::AsyncTest include Librevox::Test::ListenerHelpers include Librevox::Test::Matchers diff --git a/test/functional/librevox/listener/outbound_reader_test.rb b/test/functional/librevox/listener/outbound_reader_test.rb index b08d1c4..bb28e96 100644 --- a/test/functional/librevox/listener/outbound_reader_test.rb +++ b/test/functional/librevox/listener/outbound_reader_test.rb @@ -11,41 +11,34 @@ def session_initiated end end -class TestOutboundListenerWithAppReadingData < Minitest::Test +class TestOutboundAppReadingData < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithReader.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} - - assert_send_application @listener, "reader_app" + setup_outbound OutboundListenerWithReader + assert_execute_app @listener, "reader_app", "1234" end def teardown - @session_task&.stop + teardown_outbound super end - def test_not_send_anything_while_missing_response + def test_blocks_until_execute_complete assert_send_nothing @listener end - def test_update_session_from_execute_complete + def test_updates_session_from_execute_complete execute_complete "Session-Var" => "Second" assert_equal "Second", @listener.session[:session_var] end - def test_return_value_of_channel_variable - execute_complete "variable_app_var" => "Second" + def test_returns_channel_variable_value + execute_complete "variable_app_var" => "Second", "Unique-ID" => "1234" - assert_send_application @listener, "send", "Second" + assert_execute_app @listener, "send", "1234", "Second" end end diff --git a/test/functional/librevox/listener/outbound_test.rb b/test/functional/librevox/listener/outbound_test.rb index 35babe6..0523f26 100644 --- a/test/functional/librevox/listener/outbound_test.rb +++ b/test/functional/librevox/listener/outbound_test.rb @@ -10,23 +10,20 @@ def session_initiated end end -class TestOutboundListener < Minitest::Test +class TestOutboundHandshake < Minitest::Test prepend Librevox::Test::AsyncTest - include OutboundSetupHelpers + include Librevox::Test::ListenerHelpers include Librevox::Test::Matchers - include EventTests - include ApiCommandTests def setup @listener = OutboundTestListener.new(MockConnection.new) @session_task = Async { @listener.run_session } - command_reply( - "Caller-Caller-Id-Number" => "8675309", - "Unique-ID" => "1234", - "variable_some_var" => "some value" - ) - event_and_linger_replies - super + + command_reply "Caller-Caller-Id-Number" => "8675309", + "Unique-ID" => "1234", + "variable_some_var" => "some value" + command_reply "Reply-Text" => "+OK Events Enabled" + command_reply "Reply-Text" => "+OK will linger" end def teardown @@ -34,25 +31,41 @@ def teardown super end - def test_connect_to_freeswitch_and_subscribe_to_events - assert_send_command @listener, "connect" - assert_send_command @listener, "myevents" - assert_send_command @listener, "linger" + def test_sends_connect_myevents_linger_in_order + assert_equal "connect", @listener.outgoing_data.shift + assert_equal "myevents", @listener.outgoing_data.shift + assert_equal "linger", @listener.outgoing_data.shift + assert_nil @listener.outgoing_data.shift end - def test_establish_a_session + def test_establishes_session_from_connect_reply assert_equal Hash, @listener.session.class + assert_equal "8675309", @listener.session[:caller_caller_id_number] end - def test_call_session_callback_after_establishing_new_session + def test_calls_session_initiated_after_handshake assert_includes @listener.hook_log, "session was initiated" end - def test_make_headers_available_through_session - assert_equal "8675309", @listener.session[:caller_caller_id_number] + def test_exposes_channel_variables + assert_equal "some value", @listener.variable(:some_var) end +end - def test_make_channel_variables_available_through_variable - assert_equal "some value", @listener.variable(:some_var) +class TestOutboundEvents < Minitest::Test + prepend Librevox::Test::AsyncTest + include OutboundSetupHelpers + include Librevox::Test::Matchers + include EventTests + include ApiCommandTests + + def setup + setup_outbound OutboundTestListener + super + end + + def teardown + teardown_outbound + super end end diff --git a/test/functional/librevox/protocol/connection_test.rb b/test/functional/librevox/protocol/connection_test.rb index f9413a1..9024e54 100644 --- a/test/functional/librevox/protocol/connection_test.rb +++ b/test/functional/librevox/protocol/connection_test.rb @@ -16,43 +16,43 @@ def teardown @write_io.close unless @write_io.closed? end - def test_read_headers_only_message + def test_receives_headers_only_message @write_io.write "Content-Type: command/reply\nReply-Text: +OK\n\n" @write_io.close - msg = @connection.read_message + msg = @connection.receive_data assert_instance_of Librevox::Protocol::Response, msg assert_equal "command/reply", msg.headers[:content_type] assert_equal "+OK", msg.headers[:reply_text] end - def test_read_message_with_content + def test_receive_data_with_content body = "Event-Name: HEARTBEAT" @write_io.write "Content-Length: #{body.size}\n\n#{body}\n\n" @write_io.close - msg = @connection.read_message + msg = @connection.receive_data assert_instance_of Librevox::Protocol::Response, msg assert_equal body.size.to_s, msg.headers[:content_length] assert_equal "HEARTBEAT", msg.content[:event_name] end - def test_read_multiple_messages + def test_receives_multiple_messages @write_io.write "Content-Type: command/reply\n\n" @write_io.write "Content-Type: api/response\n\n" @write_io.close - msg1 = @connection.read_message + msg1 = @connection.receive_data assert_equal "command/reply", msg1.headers[:content_type] - msg2 = @connection.read_message + msg2 = @connection.receive_data assert_equal "api/response", msg2.headers[:content_type] end def test_eof_returns_nil @write_io.close - assert_nil @connection.read_message + assert_nil @connection.receive_data end def test_skips_empty_header_blocks_after_content @@ -60,14 +60,14 @@ def test_skips_empty_header_blocks_after_content @write_io.write "Content-Length: #{body.size}\n\n#{body}\n\n" @write_io.close - msg = @connection.read_message + msg = @connection.receive_data assert_equal "TEST", msg.content[:event_name] # The trailing \n\n after content creates an empty block which should be skipped - assert_nil @connection.read_message + assert_nil @connection.receive_data end - def test_read_loop_yields_each_message + def test_read_loop_yields_each_response @write_io.write "Content-Type: command/reply\n\n" @write_io.write "Content-Type: api/response\n\n" @write_io.close @@ -80,11 +80,11 @@ def test_read_loop_yields_each_message assert_equal "api/response", messages[1].headers[:content_type] end - def test_send_message_writes_with_terminator + def test_send_data_writes_with_terminator write_stream = IO::Stream(@write_io) conn = Librevox::Protocol::Connection.new(write_stream) - conn.send_message("auth ClueCon") + conn.send_data("auth ClueCon") write_stream.close assert_equal "auth ClueCon\n\n", @read_io.read diff --git a/test/integration/outbound_disconnect_test.rb b/test/integration/outbound_disconnect_test.rb index 32c0fbf..5f13dae 100644 --- a/test/integration/outbound_disconnect_test.rb +++ b/test/integration/outbound_disconnect_test.rb @@ -177,15 +177,15 @@ def test_connection_drop_unblocks_blocked_send_message port, server_thread = start_server(BlockedOnAppListener) socket = fake_fs_connect(port) - # session_initiated calls sample_app which sends sendmsg and blocks on app_complete_queue + # session_initiated calls sample_app which sends sendmsg and blocks on app promise msg = socket.gets("\n\n") assert_match(/sendmsg/, msg) # ack the sendmsg socket.write("Content-Type: command/reply\nReply-Text: +OK\n\n") - # Now the listener is blocked on app_complete_queue.dequeue - # Drop the connection — should unblock via queue close + # Now the listener is blocked on app promise + # Drop the connection — should unblock via promise rejection socket.close socket = nil diff --git a/test/support/listener.rb b/test/support/listener.rb index d2f5dbd..d85c502 100644 --- a/test/support/listener.rb +++ b/test/support/listener.rb @@ -9,11 +9,11 @@ def initialize @data = [] end - def send_message(msg) + def send_data(msg) @data << msg end - def read_message + def receive_data nil end @@ -88,9 +88,6 @@ def setup @class.event(:hook_with_arg) {|e| log "got event: #{e.event}"} @listener.on_event_block = proc {|e| log "from on_event: #{e.event}"} - - # Establish session - @listener.receive_message(Librevox::Protocol::Response.new("Content-Length: 0\nTest: Testing", "")) end def test_add_event_hook @@ -100,7 +97,7 @@ def test_add_event_hook end end - def test_execute_callback_for_event + def test_dispatches_matching_event_hooks event "OTHER_EVENT" assert_equal "something else", @listener.read_data @@ -108,26 +105,19 @@ def test_execute_callback_for_event assert_equal "something", @listener.read_data end - def test_pass_event_as_arg_to_hook_block + def test_passes_event_to_hook_block event "HOOK_WITH_ARG" assert_equal "got event: HOOK_WITH_ARG", @listener.read_data end - def test_expose_response - event "OTHER_EVENT" - - assert_equal Librevox::Protocol::Response, @listener.response.class - assert_equal "OTHER_EVENT", @listener.response.content[:event_name] - end - - def test_call_on_event + def test_calls_on_event_for_any_event event "THIRD_EVENT" assert_equal "from on_event: THIRD_EVENT", @listener.read_data end - def test_call_event_hooks_and_on_event_on_channel_data + def test_dispatches_on_event_and_hooks_for_channel_data @listener.hook_log.clear @listener.on_event_block = proc {|e| log "on_event: CHANNEL_DATA test"} @@ -179,8 +169,21 @@ def test_multiple_api_commands module OutboundSetupHelpers include Librevox::Test::ListenerHelpers - def event_and_linger_replies + def setup_outbound(listener_class, **connect_headers) + headers = {"Unique-ID" => "1234"}.merge(connect_headers) + + @listener = listener_class.new(MockConnection.new) + @session_task = Async { @listener.run_session } + + command_reply(headers) command_reply "Reply-Text" => "+OK Events Enabled" command_reply "Reply-Text" => "+OK will linger" + + # Discard connect, myevents, linger commands + 3.times { @listener.outgoing_data.shift } + end + + def teardown_outbound + @session_task&.stop end end diff --git a/test/test_helper.rb b/test/test_helper.rb index b146af4..bbebe5c 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -17,7 +17,7 @@ def assert_send_nothing(obj) assert_nil obj.outgoing_data.shift end - def assert_send_application(obj, app, args = nil, **params) + def assert_execute_app(obj, app, uuid, args = nil, **params) headers = params .merge( event_lock: true, @@ -27,15 +27,7 @@ def assert_send_application(obj, app, args = nil, **params) ) .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } - assert_equal "sendmsg\n#{headers.join("\n")}", obj.outgoing_data.shift - end - - def assert_update_session(obj, session_id = nil) - if session_id - assert_equal "api uuid_dump #{session_id}", obj.outgoing_data.shift - else - assert_match(/^api uuid_dump \d+/, obj.outgoing_data.shift) - end + assert_equal "sendmsg #{uuid}\n#{headers.join("\n")}", obj.outgoing_data.shift end end @@ -61,7 +53,7 @@ def response(args = {}) headers["Content-Length"] = body.size if body header_str = headers.map {|k, v| "#{k}: #{v}"}.join("\n") - @listener.receive_message(Librevox::Protocol::Response.new(header_str, body.to_s)) + @listener.receive_data(Librevox::Protocol::Response.new(header_str, body.to_s)) yield_to_fibers end @@ -69,7 +61,7 @@ def event(name) body = "Event-Name: #{name}" headers = "Content-Type: text/event-plain\nContent-Length: #{body.size}" - @listener.receive_message(Librevox::Protocol::Response.new(headers, body)) + @listener.receive_data(Librevox::Protocol::Response.new(headers, body)) yield_to_fibers end @@ -81,7 +73,7 @@ def execute_complete(args = {}) body_str = body.map {|k,v| "#{k}: #{v}"}.join("\n") headers = "Content-Type: text/event-plain\nContent-Length: #{body_str.size}" - @listener.receive_message(Librevox::Protocol::Response.new(headers, body_str)) + @listener.receive_data(Librevox::Protocol::Response.new(headers, body_str)) yield_to_fibers end @@ -92,7 +84,7 @@ def yield_to_fibers end end - # Wraps each test method in Async { } so queue operations work. + # Wraps each test method in Async { } so promise operations work. module AsyncTest def run(...) Sync do From e6c3574e838ed31f2193dd626590e90b3e63a437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 30 Mar 2026 10:48:05 +0200 Subject: [PATCH 03/15] Update README to reflect promise-based architecture The "Two fibers per connection" section still referenced Async::Queue and Async::Semaphore. Updated to describe the Async::Promise array pattern and promise rejection on disconnect. Also updated the sendmsg example to show the UUID parameter. --- README.md | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index c377f6b..05cfa31 100644 --- a/README.md +++ b/README.md @@ -245,7 +245,7 @@ finished. Application completion is signalled by a `CHANNEL_EXECUTE_COMPLETE` event: ``` -Listener → FS: sendmsg +Listener → FS: sendmsg call-command: execute execute-app-name: playback execute-app-arg: welcome.wav @@ -270,18 +270,19 @@ being processed until the current application completes. ### Two fibers per connection -Librevox runs two fibers for each outbound connection: +Librevox runs two fibers for each connection: - **Session fiber** (`run_session`) — runs the setup sequence and then - `session_initiated`. Each `send_message` or `application` call blocks the fiber - until the reply arrives. -- **Read fiber** (`read_loop`) — reads messages from the socket and dispatches - them to `Async::Queue` instances, waking the session fiber. - -An `Async::Semaphore(1)` mutex on `send_message` ensures only one command is -in-flight at a time, so replies are always delivered to the correct caller. -This also serializes commands issued by event hooks (which run in their own -fibers) with the main session flow. + `session_initiated`. Each `send_message` or `application` call creates an + `Async::Promise`, pushes it onto an array, and blocks the fiber until the + promise is resolved. +- **Read fiber** (`read_loop`) — reads messages from the socket and resolves + promises in FIFO order, waking the session fiber. + +No mutex is needed — Ruby's cooperative fiber scheduling guarantees that the +promise push happens before the I/O yield point (the socket write), so +interleaving from concurrent event-hook fibers is safe. When a connection +drops, pending promises are rejected with `ConnectionError`. ## API Documentation From 81e27d9676e8d4cc9fa12c8b5c46be43ff0ea2f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 30 Mar 2026 11:04:24 +0200 Subject: [PATCH 04/15] Suppress ConnectionError on disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ConnectionError is expected when either side hangs up — the read loop closes, connection_closed rejects pending promises, and the error surfaces in run_session or event hook fibers. Previously this was silently swallowed by a blanket rescue in Outbound#run_session (removed in the promise refactor), causing it to propagate to Server#accept's catch-all and log noisy "Session error" messages. Fix by rescuing ConnectionError in three targeted places: - Server#handle_session and Client#handle_session — suppresses the error from run_session so it doesn't reach the generic error handler - Base#connection_closed — suppresses the error from event_barrier.wait when event hooks were mid-command at disconnect Consumers no longer need to rescue ConnectionError in session_initiated. --- lib/librevox/client.rb | 2 ++ lib/librevox/listener/base.rb | 2 ++ lib/librevox/server.rb | 2 ++ test/integration/outbound_disconnect_test.rb | 4 ---- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/librevox/client.rb b/lib/librevox/client.rb index d33670d..79e8b7f 100644 --- a/lib/librevox/client.rb +++ b/lib/librevox/client.rb @@ -35,6 +35,8 @@ def handle_session(connection, listener) listener.run_session read_task.wait + rescue ConnectionError + # Expected when the connection drops ensure read_task&.stop connection.close diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index 730fefd..8e0f46f 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -90,6 +90,8 @@ def connection_closed @app_promises.clear @event_barrier.wait + rescue ConnectionError + # Expected — event hooks may have been mid-command when disconnected end def disconnect diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index 7a80c01..44dff61 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -37,6 +37,8 @@ def handle_session(connection, listener) listener.run_session read_task.wait + rescue ConnectionError + # Expected when the connection drops ensure read_task&.stop connection.close diff --git a/test/integration/outbound_disconnect_test.rb b/test/integration/outbound_disconnect_test.rb index 5f13dae..5b002d1 100644 --- a/test/integration/outbound_disconnect_test.rb +++ b/test/integration/outbound_disconnect_test.rb @@ -10,12 +10,8 @@ require 'timeout' class BlockedOnAppListener < Librevox::Listener::Outbound - attr_reader :error - def session_initiated sample_app "playback", "/tmp/test.wav" - rescue Librevox::ConnectionError => e - @error = e end end From 0775e3a91dd6a8710c901b716186e77633648ae8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 30 Mar 2026 11:33:41 +0200 Subject: [PATCH 05/15] Move endpoint creation and default ports to Client/Server.start Listeners no longer need to know about IO::Endpoint or default ports. Client.start and Server.start handle endpoint creation, and listeners just delegate: Client.start(self, **options). Runner uses barrier.async to wrap the blocking start call. --- lib/librevox/client.rb | 6 ++++++ lib/librevox/listener/inbound.rb | 8 ++------ lib/librevox/listener/outbound.rb | 8 ++------ lib/librevox/runner.rb | 4 +++- lib/librevox/server.rb | 6 ++++++ 5 files changed, 19 insertions(+), 13 deletions(-) diff --git a/lib/librevox/client.rb b/lib/librevox/client.rb index 79e8b7f..025861f 100644 --- a/lib/librevox/client.rb +++ b/lib/librevox/client.rb @@ -1,9 +1,15 @@ # frozen_string_literal: true require 'io/stream' +require 'io/endpoint/host_endpoint' module Librevox class Client + def self.start(handler, host: "localhost", port: 8021, **options) + endpoint = IO::Endpoint.tcp(host, port) + new(handler, endpoint, **options).run + end + def initialize(handler, endpoint, **options) @handler = handler @endpoint = endpoint diff --git a/lib/librevox/listener/inbound.rb b/lib/librevox/listener/inbound.rb index be7111b..6d51ced 100644 --- a/lib/librevox/listener/inbound.rb +++ b/lib/librevox/listener/inbound.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require 'io/endpoint/host_endpoint' - module Librevox module Listener class Inbound < Base @@ -18,10 +16,8 @@ def filters(filters) end end - def self.run(barrier, host: "localhost", port: 8021, **options) - endpoint = IO::Endpoint.tcp(host, port) - client = Client.new(self, endpoint, **options) - barrier.async { client.run } + def self.start(...) + Client.start(self, ...) end def initialize(connection, args = {}) diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index f00cda3..5614d1b 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -1,16 +1,12 @@ # frozen_string_literal: true -require 'io/endpoint/host_endpoint' - module Librevox module Listener class Outbound < Base include Librevox::Applications - def self.run(barrier, host: "localhost", port: 8084, **options) - endpoint = IO::Endpoint.tcp(host, port, **options) - server = Server.new(self, endpoint) - barrier.async { server.run } + def self.start(...) + Server.start(self, ...) end attr_accessor :session diff --git a/lib/librevox/runner.rb b/lib/librevox/runner.rb index 97c7ae2..6596974 100644 --- a/lib/librevox/runner.rb +++ b/lib/librevox/runner.rb @@ -31,7 +31,9 @@ def initialize(barrier) end def run(klass, **args) - klass.run(@barrier, **args) + @barrier.async do + klass.start(**args) + end end end end diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index 44dff61..c6a0fef 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -1,11 +1,17 @@ # frozen_string_literal: true require 'io/stream' +require 'io/endpoint/host_endpoint' module Librevox class Server attr :endpoint + def self.start(handler, host: "localhost", port: 8084, **options) + endpoint = IO::Endpoint.tcp(host, port, **options) + new(handler, endpoint).run + end + def initialize(handler, endpoint, **options) @handler = handler @endpoint = endpoint From a3bb5050423707a11e9e3482e97057bbbe39fe77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 30 Mar 2026 12:09:48 +0200 Subject: [PATCH 06/15] Match app promises by UUID instead of FIFO order execute_app now stores promises in a hash keyed by UUID instead of an array. CHANNEL_EXECUTE_COMPLETE events resolve the promise matching their unique_id. This is more correct and enables concurrent app execution on different channels (e.g. inbound mode). --- lib/librevox/listener/base.rb | 9 +++++---- .../functional/librevox/listener/outbound_reader_test.rb | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index 8e0f46f..334ab68 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -18,7 +18,7 @@ def event(event, &block) def initialize(connection) @connection = connection @reply_promises = [] - @app_promises = [] + @app_promises = {} @event_barrier = Async::Barrier.new end @@ -58,7 +58,7 @@ def execute_app(app, uuid, args = nil, **params) send_message "sendmsg #{uuid}\n#{headers.join("\n")}" promise = Async::Promise.new - @app_promises << promise + @app_promises[uuid] = promise promise.wait end @@ -70,7 +70,8 @@ def receive_data(response) if response.event? if response.event == "CHANNEL_EXECUTE_COMPLETE" - @app_promises.shift&.resolve(response) + uuid = response.content[:unique_id] + @app_promises.delete(uuid)&.resolve(response) end @event_barrier.async do @@ -84,7 +85,7 @@ def connection_closed error = ConnectionError.new("Connection closed") @reply_promises.each { |p| p.reject(error) } - @app_promises.each { |p| p.reject(error) } + @app_promises.each_value { |p| p.reject(error) } @reply_promises.clear @app_promises.clear diff --git a/test/functional/librevox/listener/outbound_reader_test.rb b/test/functional/librevox/listener/outbound_reader_test.rb index bb28e96..2d7c5eb 100644 --- a/test/functional/librevox/listener/outbound_reader_test.rb +++ b/test/functional/librevox/listener/outbound_reader_test.rb @@ -31,7 +31,7 @@ def test_blocks_until_execute_complete end def test_updates_session_from_execute_complete - execute_complete "Session-Var" => "Second" + execute_complete "Session-Var" => "Second", "Unique-ID" => "1234" assert_equal "Second", @listener.session[:session_var] end From 30a830692d381992a2a650bc9cf2b7c584ca1f1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 30 Mar 2026 12:48:16 +0200 Subject: [PATCH 07/15] Use Event-UUID to correlate sendmsg with CHANNEL_EXECUTE_COMPLETE execute_app now generates a SecureRandom.uuid and sends it as the Event-UUID header on sendmsg. FreeSWITCH echoes this back as Application-UUID in CHANNEL_EXECUTE_COMPLETE, which is used to resolve the correct promise. This replaces matching by channel UUID and properly correlates each individual app execution. Also: - Defaults go first in header merge so params can override (e.g. event_lock: false) - Use IO::Stream flush: true instead of separate flush call - Remove redundant closed? guard in Connection#close (IO::Stream already guards) --- lib/librevox/listener/base.rb | 16 ++++++++++------ lib/librevox/protocol/connection.rb | 5 +---- test/test_helper.rb | 28 +++++++++++++++++----------- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index 334ab68..0fef567 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'async/barrier' +require 'securerandom' module Librevox module Listener @@ -46,19 +47,22 @@ def send_message(msg) end def execute_app(app, uuid, args = nil, **params) - headers = params - .merge( + event_uuid = SecureRandom.uuid + + headers = { event_lock: true, call_command: "execute", execute_app_name: app, execute_app_arg: args, - ) + event_uuid: event_uuid, + } + .merge(params) .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } send_message "sendmsg #{uuid}\n#{headers.join("\n")}" promise = Async::Promise.new - @app_promises[uuid] = promise + @app_promises[event_uuid] = promise promise.wait end @@ -70,8 +74,8 @@ def receive_data(response) if response.event? if response.event == "CHANNEL_EXECUTE_COMPLETE" - uuid = response.content[:unique_id] - @app_promises.delete(uuid)&.resolve(response) + app_uuid = response.content[:application_uuid] + @app_promises.delete(app_uuid)&.resolve(response) end @event_barrier.async do diff --git a/lib/librevox/protocol/connection.rb b/lib/librevox/protocol/connection.rb index f9ed83e..975f763 100644 --- a/lib/librevox/protocol/connection.rb +++ b/lib/librevox/protocol/connection.rb @@ -31,8 +31,7 @@ def read_loop end def send_data(msg) - @stream.write("#{msg}\n\n") - @stream.flush + @stream.write("#{msg}\n\n", flush: true) end def close_write @@ -42,8 +41,6 @@ def close_write end def close - return if @stream.closed? - @stream.close rescue Errno::EPIPE, Errno::ECONNRESET # Remote end already closed diff --git a/test/test_helper.rb b/test/test_helper.rb index bbebe5c..87e9238 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -18,16 +18,20 @@ def assert_send_nothing(obj) end def assert_execute_app(obj, app, uuid, args = nil, **params) - headers = params - .merge( - event_lock: true, - call_command: "execute", - execute_app_name: app, - execute_app_arg: args, - ) - .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } - - assert_equal "sendmsg #{uuid}\n#{headers.join("\n")}", obj.outgoing_data.shift + data = obj.outgoing_data.shift + assert data, "Expected sendmsg in outgoing data" + + @last_event_uuid = data[/event-uuid: (.+)/, 1] + assert @last_event_uuid, "Expected event-uuid header in sendmsg" + + assert_match(/\Asendmsg #{uuid}\n/, data) + assert_match(/execute-app-name: #{app}/, data) + assert_match(/execute-app-arg: #{args}/, data) if args + + params.each do |key, value| + header = "#{key.to_s.tr('_', '-')}: #{value}" + assert_match(/#{Regexp.escape(header)}/, data) + end end end @@ -69,7 +73,9 @@ def execute_complete(args = {}) # sendmsg ack — always arrives before CHANNEL_EXECUTE_COMPLETE command_reply "Reply-Text" => "+OK" - body = {"Event-Name" => "CHANNEL_EXECUTE_COMPLETE"}.merge(args) + # Use the event-uuid from the last assert_execute_app, echoing it + # back as Application-UUID just like FreeSWITCH would. + body = {"Event-Name" => "CHANNEL_EXECUTE_COMPLETE", "Application-UUID" => @last_event_uuid}.merge(args) body_str = body.map {|k,v| "#{k}: #{v}"}.join("\n") headers = "Content-Type: text/event-plain\nContent-Length: #{body_str.size}" From 09dcb2ea483e6adaad43dab7fff74a4160a64b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 30 Mar 2026 13:17:49 +0200 Subject: [PATCH 08/15] Fix CommandSocket broken by protocol rename, add tests CommandSocket called the old send_message/read_message methods on Protocol::Connection which were renamed to send_data/receive_data. Also reorder application(app, uuid, args) to match execute_app. Add functional tests that verify auth handshake, API commands, and sendmsg application execution against a real TCP server. --- lib/librevox/command_socket.rb | 6 +- .../librevox/command_socket_test.rb | 68 +++++++++++++++++++ 2 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 test/functional/librevox/command_socket_test.rb diff --git a/lib/librevox/command_socket.rb b/lib/librevox/command_socket.rb index d77f5de..9e54b43 100644 --- a/lib/librevox/command_socket.rb +++ b/lib/librevox/command_socket.rb @@ -23,7 +23,7 @@ def connect end def send_message(msg) - @connection.send_message(msg) + @connection.send_data(msg) read_response end @@ -32,12 +32,12 @@ def command(*args) end def read_response - while msg = @connection.read_message + while msg = @connection.receive_data return msg if msg.command_reply? || msg.api_response? end end - def application(uuid, app, args = nil, **params) + def application(app, uuid, args = nil, **params) headers = params .merge( event_lock: true, diff --git a/test/functional/librevox/command_socket_test.rb b/test/functional/librevox/command_socket_test.rb new file mode 100644 index 0000000..a38972c --- /dev/null +++ b/test/functional/librevox/command_socket_test.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +require_relative '../../test_helper' + +require 'librevox/command_socket' + +class TestCommandSocket < Minitest::Test + def setup + @server = TCPServer.new("127.0.0.1", 0) + @port = @server.local_address.ip_port + end + + def teardown + @socket&.close + @fs&.close + @server&.close + end + + def test_sends_api_command_and_returns_response + connect + + thread = Thread.new { @socket.status } + + assert_equal "api status", read_message + reply "api/response", "OK" + + response = thread.value + assert response.api_response? + assert_equal "OK", response.content + end + + def test_sends_application_via_sendmsg + connect + + thread = Thread.new { @socket.application("playback", "1234-abcd", "/tmp/test.wav") } + + msg = read_message + + assert_match(/\Asendmsg 1234-abcd\n/, msg) + assert_match(/execute-app-name: playback/, msg) + assert_match(/execute-app-arg: \/tmp\/test.wav/, msg) + assert_match(/event-lock: true/, msg) + + reply "command/reply", "+OK" + + thread.value + end + + private + + def connect + thread = Thread.new { @socket = Librevox::CommandSocket.new(port: @port) } + + @fs = @server.accept + assert_equal "auth ClueCon", read_message + reply "command/reply", "+OK accepted" + + thread.join(2) + end + + def reply(content_type, body) + @fs.write("Content-Type: #{content_type}\nContent-Length: #{body.size}\n\n#{body}") + end + + def read_message + @fs.gets("\n\n")&.strip + end +end From 3c65f97a2d4a187c01b44289d099b7992d258760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Fri, 10 Apr 2026 13:51:06 +0200 Subject: [PATCH 09/15] Fix CommandSocket header merge order, forward Server options CommandSocket#application had defaults overwriting caller params (params.merge(defaults)). Reversed to defaults.merge(params) so overrides like event_lock: false work, matching Base#execute_app. Server.start passed **options to the endpoint instead of to new, so listener options were lost. Now symmetric with Client.start. --- lib/librevox/command_socket.rb | 6 +++--- lib/librevox/server.rb | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/librevox/command_socket.rb b/lib/librevox/command_socket.rb index 9e54b43..1f1c9f2 100644 --- a/lib/librevox/command_socket.rb +++ b/lib/librevox/command_socket.rb @@ -38,13 +38,13 @@ def read_response end def application(app, uuid, args = nil, **params) - headers = params - .merge( + headers = { event_lock: true, call_command: "execute", execute_app_name: app, execute_app_arg: args, - ) + } + .merge(params) .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } send_message "sendmsg #{uuid}\n#{headers.join("\n")}" diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index c6a0fef..296da05 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -8,8 +8,8 @@ class Server attr :endpoint def self.start(handler, host: "localhost", port: 8084, **options) - endpoint = IO::Endpoint.tcp(host, port, **options) - new(handler, endpoint).run + endpoint = IO::Endpoint.tcp(host, port) + new(handler, endpoint, **options).run end def initialize(handler, endpoint, **options) From 9954acb161be440f59a5d822ed634d4429173585 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 13 Apr 2026 10:07:02 +0200 Subject: [PATCH 10/15] Create app promise before sending sendmsg The promise must be in @app_promises before send_message sends data, because the read loop could process CHANNEL_EXECUTE_COMPLETE before this fiber resumes. If FreeSWITCH sends the ack and completion back-to-back and both are buffered, read_until returns without yielding and the completion event would be lost. --- lib/librevox/listener/base.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index 0fef567..2e24183 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -59,10 +59,11 @@ def execute_app(app, uuid, args = nil, **params) .merge(params) .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } - send_message "sendmsg #{uuid}\n#{headers.join("\n")}" - promise = Async::Promise.new @app_promises[event_uuid] = promise + + send_message "sendmsg #{uuid}\n#{headers.join("\n")}" + promise.wait end From c7d9c07d6cc8db464f01475db36dec50367148f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 20 Apr 2026 09:07:52 +0200 Subject: [PATCH 11/15] Extract Session, move disconnect policy into Outbound MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Client#handle_session and Server#handle_session were identical and the Server's read loop knew about disconnect-notice and CHANNEL_HANGUP_COMPLETE — outbound-listener policy leaking into the transport supervisor. Extract the per-connection choreography (reader fiber + session fiber + teardown) into Librevox::Session. Both Client and Server now just manage their endpoint and hand each new socket to Session.new(...).run. Move the end-of-session detection into Listener::Outbound via a #session_complete? predicate that Session checks after each dispatched message. The reader exits cleanly without close_write, so in-flight event hooks drain via the barrier before the socket is actually closed — matching the previous behaviour. Also drops the dead `task.children.each(&:wait)` in Server#run: verified against io-endpoint 0.17.2 that Wrapper#accept is an infinite loop which spawns a fiber per connection via Fiber.schedule and never returns, so the trailing line was unreachable. --- lib/librevox.rb | 1 + lib/librevox/client.rb | 43 +++------------------- lib/librevox/listener/base.rb | 7 ++++ lib/librevox/listener/outbound.rb | 18 ++++++++-- lib/librevox/server.rb | 59 +++++-------------------------- lib/librevox/session.rb | 40 +++++++++++++++++++++ 6 files changed, 76 insertions(+), 92 deletions(-) create mode 100644 lib/librevox/session.rb diff --git a/lib/librevox.rb b/lib/librevox.rb index 83d7642..7890630 100644 --- a/lib/librevox.rb +++ b/lib/librevox.rb @@ -14,6 +14,7 @@ class ConnectionError < StandardError; end autoload :Applications, 'librevox/applications' autoload :Runner, 'librevox/runner' autoload :Server, 'librevox/server' + autoload :Session, 'librevox/session' module Protocol autoload :Connection, 'librevox/protocol/connection' diff --git a/lib/librevox/client.rb b/lib/librevox/client.rb index 025861f..5b32b3e 100644 --- a/lib/librevox/client.rb +++ b/lib/librevox/client.rb @@ -16,17 +16,9 @@ def initialize(handler, endpoint, **options) @options = options end - def connect(socket) - stream = IO::Stream(socket) - connection = Protocol::Connection.new(stream) - listener = @handler.new(connection, @options) - - handle_session(connection, listener) - end - def run loop do - @endpoint.connect(&method(:connect)) + @endpoint.connect { |socket| start_session(socket) } rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError, ResponseError => e Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s." sleep 1 @@ -35,35 +27,10 @@ def run private - def handle_session(connection, listener) - read_task = start_read_loop(connection, listener) - - listener.run_session - - read_task.wait - rescue ConnectionError - # Expected when the connection drops - ensure - read_task&.stop - connection.close - end - - def start_read_loop(connection, listener) - Async do - read_messages(connection, listener) - ensure - # Reject pending promises here (not in handle_session's ensure) so - # that a connection drop unblocks listener.run_session via rejection. - # handle_session's ensure can't run until run_session returns, - # creating a deadlock if promises aren't rejected from this fiber. - listener.connection_closed - end - end - - def read_messages(connection, listener) - connection.read_loop do |msg| - listener.receive_data(msg) - end + def start_session(socket) + connection = Protocol::Connection.new(IO::Stream(socket)) + listener = @handler.new(connection, @options) + Session.new(connection, listener).run end end end diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index 2e24183..8d8e2a9 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -104,6 +104,13 @@ def disconnect @connection&.close_write end + # Overridden by listeners that drive their own end-of-session criteria + # (e.g. Outbound waits for disconnect-notice + CHANNEL_HANGUP_COMPLETE). + # Session checks this after each message and stops the reader when true. + def session_complete? + false + end + private def on_event(event) diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index 5614d1b..9ecd235 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -14,6 +14,8 @@ def self.start(...) def initialize(connection, options = {}) super(connection) @session = nil + @disconnecting = false + @hung_up = false end def run_session @@ -43,12 +45,22 @@ def variable(name) session[:"variable_#{name}"] end + # FreeSWITCH signals end-of-session with disconnect-notice + a final + # CHANNEL_HANGUP_COMPLETE event. Either may arrive first. Once both + # have been seen #session_complete? returns true and Session exits + # the reader loop — after any in-flight event hooks have drained. def receive_data(response) - if response.event? && response.event == "CHANNEL_DATA" - @session = response.content + if response.disconnect_notice? + @disconnecting = true + else + @session = response.content if response.event? && response.event == "CHANNEL_DATA" + super + @hung_up = true if response.event? && response.event == "CHANNEL_HANGUP_COMPLETE" end + end - super + def session_complete? + @disconnecting && @hung_up end end end diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index 296da05..d63b98a 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -18,63 +18,20 @@ def initialize(handler, endpoint, **options) @options = options end - def accept(socket, _address) - stream = IO::Stream(socket) - connection = Protocol::Connection.new(stream) - listener = @handler.new(connection, @options) - - handle_session(connection, listener) - rescue => e - Librevox.logger.error "Session error: #{e.full_message}" - end - def run - Async do |task| - @endpoint.accept(&method(:accept)) - task.children.each(&:wait) + @endpoint.accept do |socket, _address| + start_session(socket) + rescue => e + Librevox.logger.error "Session error: #{e.full_message}" end end private - def handle_session(connection, listener) - read_task = start_read_loop(connection, listener) - - listener.run_session - - read_task.wait - rescue ConnectionError - # Expected when the connection drops - ensure - read_task&.stop - connection.close - end - - def start_read_loop(connection, listener) - Async do - read_messages(connection, listener) - ensure - # Reject pending promises here (not in handle_session's ensure) so - # that a connection drop unblocks listener.run_session via rejection. - # handle_session's ensure can't run until run_session returns, - # creating a deadlock if promises aren't rejected from this fiber. - listener.connection_closed - end - end - - def read_messages(connection, listener) - disconnecting = false - hung_up = false - - connection.read_loop do |msg| - if msg.disconnect_notice? - disconnecting = true - else - listener.receive_data(msg) - hung_up = true if msg.event? && msg.event == "CHANNEL_HANGUP_COMPLETE" - end - break if disconnecting && hung_up - end + def start_session(socket) + connection = Protocol::Connection.new(IO::Stream(socket)) + listener = @handler.new(connection, @options) + Session.new(connection, listener).run end end end diff --git a/lib/librevox/session.rb b/lib/librevox/session.rb new file mode 100644 index 0000000..3c2f0c8 --- /dev/null +++ b/lib/librevox/session.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +module Librevox + # Runs one event-socket session on a connection: spawns a reader fiber that + # dispatches incoming messages to the listener, runs the listener's session + # logic in the calling fiber, and joins them on disconnect. + class Session + def initialize(connection, listener) + @connection = connection + @listener = listener + end + + def run + reader = start_reader + @listener.run_session + reader.wait + rescue ConnectionError + # Expected when the connection drops. + ensure + reader&.stop + @connection.close + end + + private + + # Reject pending promises inside the reader's ensure (not run's) so a + # connection drop unblocks run_session via promise rejection. run's + # ensure can't fire until run_session returns — that would deadlock. + def start_reader + Async do + @connection.read_loop do |msg| + @listener.receive_data(msg) + break if @listener.session_complete? + end + ensure + @listener.connection_closed + end + end + end +end From 325354e1c84bd74f72d9f42f9ba6d8cce6d40769 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 20 Apr 2026 09:10:15 +0200 Subject: [PATCH 12/15] Naming + kwargs sweep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Protocol::Connection#read_loop → each_message (returns Enumerator when called without a block). - Listener::Base#receive_data → receive_message (it takes a parsed Response, not bytes; old name was EventMachine baggage). - Listener::Inbound#initialize / Outbound#initialize use kwargs instead of positional args-hashes. Client/Server splat @options through. - CommandSocket#initialize takes kwargs. - Normalize Protocol::Connection#close and #close_write rescue lists to the same set (IOError + the common Errno family). - Drop Librevox.reopen_log (no trap ever wired it up) and the SIGHUP paragraph from the README. --- README.md | 4 +--- lib/librevox.rb | 4 ---- lib/librevox/client.rb | 6 +++--- lib/librevox/command_socket.rb | 10 +++++----- lib/librevox/listener/base.rb | 2 +- lib/librevox/listener/inbound.rb | 4 ++-- lib/librevox/listener/outbound.rb | 4 ++-- lib/librevox/protocol/connection.rb | 12 +++++++----- lib/librevox/server.rb | 12 +++++------- lib/librevox/session.rb | 4 ++-- .../librevox/listener/outbound_error_test.rb | 2 +- test/functional/librevox/protocol/connection_test.rb | 4 ++-- test/test_helper.rb | 6 +++--- 13 files changed, 34 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 05cfa31..f901b27 100644 --- a/README.md +++ b/README.md @@ -212,8 +212,6 @@ Librevox.options[:log_file] = "librevox.log" # default: STDOUT Librevox.options[:log_level] = Logger::DEBUG # default: Logger::INFO ``` -When started with `Librevox.start`, sending `SIGHUP` to the process reopens the log file, making it compatible with `logrotate(1)`. - ## Event Socket Protocol Understanding the outbound event socket protocol is important for working on @@ -276,7 +274,7 @@ Librevox runs two fibers for each connection: `session_initiated`. Each `send_message` or `application` call creates an `Async::Promise`, pushes it onto an array, and blocks the fiber until the promise is resolved. -- **Read fiber** (`read_loop`) — reads messages from the socket and resolves +- **Read fiber** (`each_message`) — reads messages from the socket and resolves promises in FIFO order, waking the session fiber. No mutex is needed — Ruby's cooperative fiber scheduling guarantees that the diff --git a/lib/librevox.rb b/lib/librevox.rb index 7890630..703fc58 100644 --- a/lib/librevox.rb +++ b/lib/librevox.rb @@ -48,10 +48,6 @@ def self.logger! logger end - def self.reopen_log - @logger = logger! - end - # Start a single listener: # # Librevox.start MyInbound diff --git a/lib/librevox/client.rb b/lib/librevox/client.rb index 5b32b3e..c3f7c23 100644 --- a/lib/librevox/client.rb +++ b/lib/librevox/client.rb @@ -18,7 +18,7 @@ def initialize(handler, endpoint, **options) def run loop do - @endpoint.connect { |socket| start_session(socket) } + @endpoint.connect(&method(:connect)) rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError, ResponseError => e Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s." sleep 1 @@ -27,9 +27,9 @@ def run private - def start_session(socket) + def connect(socket) connection = Protocol::Connection.new(IO::Stream(socket)) - listener = @handler.new(connection, @options) + listener = @handler.new(connection, **@options) Session.new(connection, listener).run end end diff --git a/lib/librevox/command_socket.rb b/lib/librevox/command_socket.rb index 1f1c9f2..3d4d2e7 100644 --- a/lib/librevox/command_socket.rb +++ b/lib/librevox/command_socket.rb @@ -7,12 +7,12 @@ module Librevox class CommandSocket include Librevox::Commands - def initialize(args = {}) - @server = args[:server] || "127.0.0.1" - @port = args[:port] || "8021" - @auth = args[:auth] || "ClueCon" + def initialize(server: "127.0.0.1", port: "8021", auth: "ClueCon", connect: true) + @server = server + @port = port + @auth = auth - connect unless args[:connect] == false + self.connect if connect end def connect diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index 8d8e2a9..71eb175 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -67,7 +67,7 @@ def execute_app(app, uuid, args = nil, **params) promise.wait end - def receive_data(response) + def receive_message(response) if response.reply? @reply_promises.shift&.resolve(response) return diff --git a/lib/librevox/listener/inbound.rb b/lib/librevox/listener/inbound.rb index 6d51ced..480d6cd 100644 --- a/lib/librevox/listener/inbound.rb +++ b/lib/librevox/listener/inbound.rb @@ -20,10 +20,10 @@ def self.start(...) Client.start(self, ...) end - def initialize(connection, args = {}) + def initialize(connection, auth: "ClueCon", **) super(connection) - @auth = args[:auth] || "ClueCon" + @auth = auth end def run_session diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index 9ecd235..598cb3c 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -11,7 +11,7 @@ def self.start(...) attr_accessor :session - def initialize(connection, options = {}) + def initialize(connection, **) super(connection) @session = nil @disconnecting = false @@ -49,7 +49,7 @@ def variable(name) # CHANNEL_HANGUP_COMPLETE event. Either may arrive first. Once both # have been seen #session_complete? returns true and Session exits # the reader loop — after any in-flight event hooks have drained. - def receive_data(response) + def receive_message(response) if response.disconnect_notice? @disconnecting = true else diff --git a/lib/librevox/protocol/connection.rb b/lib/librevox/protocol/connection.rb index 975f763..61c9173 100644 --- a/lib/librevox/protocol/connection.rb +++ b/lib/librevox/protocol/connection.rb @@ -24,7 +24,9 @@ def receive_data end end - def read_loop + def each_message + return enum_for(:each_message) unless block_given? + while (msg = receive_data) yield msg end @@ -36,14 +38,14 @@ def send_data(msg) def close_write @stream.close_write - rescue IOError, Errno::ENOTCONN - # Already closed or not connected + rescue IOError, Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN + # Already closed or remote hung up. end def close @stream.close - rescue Errno::EPIPE, Errno::ECONNRESET - # Remote end already closed + rescue IOError, Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN + # Already closed or remote hung up. end end end diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index d63b98a..6e04a89 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -19,19 +19,17 @@ def initialize(handler, endpoint, **options) end def run - @endpoint.accept do |socket, _address| - start_session(socket) - rescue => e - Librevox.logger.error "Session error: #{e.full_message}" - end + @endpoint.accept(&method(:accept)) end private - def start_session(socket) + def accept(socket, _address) connection = Protocol::Connection.new(IO::Stream(socket)) - listener = @handler.new(connection, @options) + listener = @handler.new(connection, **@options) Session.new(connection, listener).run + rescue => e + Librevox.logger.error "Session error: #{e.full_message}" end end end diff --git a/lib/librevox/session.rb b/lib/librevox/session.rb index 3c2f0c8..0d40755 100644 --- a/lib/librevox/session.rb +++ b/lib/librevox/session.rb @@ -28,8 +28,8 @@ def run # ensure can't fire until run_session returns — that would deadlock. def start_reader Async do - @connection.read_loop do |msg| - @listener.receive_data(msg) + @connection.each_message do |msg| + @listener.receive_message(msg) break if @listener.session_complete? end ensure diff --git a/test/functional/librevox/listener/outbound_error_test.rb b/test/functional/librevox/listener/outbound_error_test.rb index e19aa0c..1f20878 100644 --- a/test/functional/librevox/listener/outbound_error_test.rb +++ b/test/functional/librevox/listener/outbound_error_test.rb @@ -60,7 +60,7 @@ def test_unhandled_error_propagates "Content-Type: command/reply\nReply-Text: -ERR invalid command", "" ) error = assert_raises(Librevox::ResponseError) do - @listener.receive_data(error_reply) + @listener.receive_message(error_reply) @session_task.wait end assert_equal "-ERR invalid command", error.message diff --git a/test/functional/librevox/protocol/connection_test.rb b/test/functional/librevox/protocol/connection_test.rb index 9024e54..3a576d9 100644 --- a/test/functional/librevox/protocol/connection_test.rb +++ b/test/functional/librevox/protocol/connection_test.rb @@ -67,13 +67,13 @@ def test_skips_empty_header_blocks_after_content assert_nil @connection.receive_data end - def test_read_loop_yields_each_response + def test_each_message_yields_each_response @write_io.write "Content-Type: command/reply\n\n" @write_io.write "Content-Type: api/response\n\n" @write_io.close messages = [] - @connection.read_loop { |msg| messages << msg } + @connection.each_message { |msg| messages << msg } assert_equal 2, messages.size assert_equal "command/reply", messages[0].headers[:content_type] diff --git a/test/test_helper.rb b/test/test_helper.rb index 87e9238..c1abd98 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -57,7 +57,7 @@ def response(args = {}) headers["Content-Length"] = body.size if body header_str = headers.map {|k, v| "#{k}: #{v}"}.join("\n") - @listener.receive_data(Librevox::Protocol::Response.new(header_str, body.to_s)) + @listener.receive_message(Librevox::Protocol::Response.new(header_str, body.to_s)) yield_to_fibers end @@ -65,7 +65,7 @@ def event(name) body = "Event-Name: #{name}" headers = "Content-Type: text/event-plain\nContent-Length: #{body.size}" - @listener.receive_data(Librevox::Protocol::Response.new(headers, body)) + @listener.receive_message(Librevox::Protocol::Response.new(headers, body)) yield_to_fibers end @@ -79,7 +79,7 @@ def execute_complete(args = {}) body_str = body.map {|k,v| "#{k}: #{v}"}.join("\n") headers = "Content-Type: text/event-plain\nContent-Length: #{body_str.size}" - @listener.receive_data(Librevox::Protocol::Response.new(headers, body_str)) + @listener.receive_message(Librevox::Protocol::Response.new(headers, body_str)) yield_to_fibers end From 290fa01bf9618baaa9982f86a26ce219570cc37c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Mon, 20 Apr 2026 09:21:42 +0200 Subject: [PATCH 13/15] Address review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Drop redundant response.event? guards in Outbound — response.event already returns nil when not an event, so the && is dead weight. - attr :endpoint → attr_reader :endpoint on Server. - Drop ConnectionError from Client#run's rescue list — Session#run already catches it internally, so it never escapes. - Normalize error logging to #message on both Client and Server (full_message was noisy on routine reconnects). - Drop the enum_for fallback on Connection#each_message — no caller uses the block-less form and the stream is not re-iterable anyway. - Document the #session_complete? contract on Base so future listeners don't put expensive work in the hot path. The reader&.stop safe-nav in Session is kept: ensure can fire before reader is assigned when a test hard-kills the thread mid-run. --- lib/librevox/client.rb | 2 +- lib/librevox/listener/base.rb | 8 +++++--- lib/librevox/listener/outbound.rb | 4 ++-- lib/librevox/protocol/connection.rb | 2 -- lib/librevox/server.rb | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/librevox/client.rb b/lib/librevox/client.rb index c3f7c23..98d2f75 100644 --- a/lib/librevox/client.rb +++ b/lib/librevox/client.rb @@ -19,7 +19,7 @@ def initialize(handler, endpoint, **options) def run loop do @endpoint.connect(&method(:connect)) - rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError, ResponseError => e + rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ResponseError => e Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s." sleep 1 end diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index 71eb175..b7f99b5 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -104,9 +104,11 @@ def disconnect @connection&.close_write end - # Overridden by listeners that drive their own end-of-session criteria - # (e.g. Outbound waits for disconnect-notice + CHANNEL_HANGUP_COMPLETE). - # Session checks this after each message and stops the reader when true. + # Override to signal that the session has ended by protocol (not by + # socket drop). Polled synchronously by {Session} after every dispatched + # message — keep it cheap and return +true+ exactly once. When +true+, + # the reader loop exits cleanly and in-flight event hooks drain before + # the connection is closed. def session_complete? false end diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index 598cb3c..26ff77e 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -53,9 +53,9 @@ def receive_message(response) if response.disconnect_notice? @disconnecting = true else - @session = response.content if response.event? && response.event == "CHANNEL_DATA" + @session = response.content if response.event == "CHANNEL_DATA" super - @hung_up = true if response.event? && response.event == "CHANNEL_HANGUP_COMPLETE" + @hung_up = true if response.event == "CHANNEL_HANGUP_COMPLETE" end end diff --git a/lib/librevox/protocol/connection.rb b/lib/librevox/protocol/connection.rb index 61c9173..554e924 100644 --- a/lib/librevox/protocol/connection.rb +++ b/lib/librevox/protocol/connection.rb @@ -25,8 +25,6 @@ def receive_data end def each_message - return enum_for(:each_message) unless block_given? - while (msg = receive_data) yield msg end diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index 6e04a89..ba43cbe 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -5,7 +5,7 @@ module Librevox class Server - attr :endpoint + attr_reader :endpoint def self.start(handler, host: "localhost", port: 8084, **options) endpoint = IO::Endpoint.tcp(host, port) @@ -29,7 +29,7 @@ def accept(socket, _address) listener = @handler.new(connection, **@options) Session.new(connection, listener).run rescue => e - Librevox.logger.error "Session error: #{e.full_message}" + Librevox.logger.error "Session error: #{e.message}" end end end From 3904971f16512db6e34e430b4158c67c707c0fb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Thu, 28 May 2026 13:51:58 +0200 Subject: [PATCH 14/15] Serialize sends with a write-lock, add concurrency tests The FIFO @reply_promises invariant relied on push-then-send being atomic up to the stream's flush yield. Make it explicit with an Async::Semaphore around the enqueue+send critical section. The lock deliberately excludes promise.wait, so a slow reply does not queue the next sender behind it. Adds unit tests with a connection that yields mid-send (the real stream does not, but this guards the lock's logic) covering: no mid-send interleaving, FIFO reply routing under concurrency, UUID-keyed app completion under reversed order, and disconnect unblocking pending senders. --- lib/librevox/listener/base.rb | 13 +- .../librevox/listener/concurrency_test.rb | 147 ++++++++++++++++++ 2 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 test/functional/librevox/listener/concurrency_test.rb diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index b7f99b5..4b96631 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'async/barrier' +require 'async/semaphore' require 'securerandom' module Librevox @@ -21,6 +22,7 @@ def initialize(connection) @reply_promises = [] @app_promises = {} @event_barrier = Async::Barrier.new + @write_lock = Async::Semaphore.new(1) end # Exposes an instance of {CommandDelegate}, which includes {Librevox::Commands}. @@ -36,9 +38,14 @@ def api def send_message(msg) promise = Async::Promise.new - @reply_promises << promise - - @connection.send_data(msg) + # Serialize only the enqueue+send so the FIFO order of @reply_promises + # always matches the byte order on the wire, even when concurrent event + # hooks send commands. Must not cover promise.wait — holding the lock + # while awaiting a reply would deadlock every other sender. + @write_lock.acquire do + @reply_promises << promise + @connection.send_data(msg) + end reply = promise.wait raise ResponseError, reply.headers[:reply_text] if reply.error? diff --git a/test/functional/librevox/listener/concurrency_test.rb b/test/functional/librevox/listener/concurrency_test.rb new file mode 100644 index 0000000..b65eadf --- /dev/null +++ b/test/functional/librevox/listener/concurrency_test.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +require_relative '../../../test_helper' + +require 'librevox/listener/base' + +# A connection whose #send_data yields in the middle of "sending", the way the +# real Async::Stream flush yields mid-write. This is what makes the write-lock's +# job observable: without serialization a second fiber can start a send before +# the first one finished. The plain MockConnection can never expose this because +# its #send_data never yields. +class InterleaveDetectingConnection + attr_reader :data, :overlaps + + def initialize + @data = [] + @sending = false + @overlaps = 0 + end + + def send_data(msg) + @overlaps += 1 if @sending # someone else is mid-send -> interleaving + @sending = true + Async::Task.current.yield # simulate flush handing control to the reactor + @data << msg + @sending = false + end + + def close; end + def close_write; end +end + +class TestSendSerialization < Minitest::Test + prepend Librevox::Test::AsyncTest + + def setup + @conn = InterleaveDetectingConnection.new + @listener = Librevox::Listener::Base.new(@conn) + end + + # Invariant #1: a send completes before another begins. + def test_concurrent_senders_do_not_interleave_on_the_wire + a = Async { @listener.send_message("A") } + b = Async { @listener.send_message("B") } + + Async::Task.current.yield until @conn.data.size == 2 + + assert_equal 0, @conn.overlaps, + "a send started before the previous one finished — writes interleaved" + assert_equal ["A", "B"], @conn.data, "sends left the fiber in spawn order" + ensure + a&.stop + b&.stop + end + + # Invariant #2: the Nth reply resolves the Nth sender, under concurrency. + def test_concurrent_replies_route_in_send_order + results = {} + a = Async { results[:a] = @listener.send_message("A") } + b = Async { results[:b] = @listener.send_message("B") } + + Async::Task.current.yield until @conn.data.size == 2 + + reply!("+OK first") + reply!("+OK second") + + Async::Task.current.yield until results.size == 2 + + assert_equal "+OK first", results[:a].headers[:reply_text] + assert_equal "+OK second", results[:b].headers[:reply_text] + ensure + a&.stop + b&.stop + end + + # Invariant #4: app completions route by Event-UUID, even reversed. + def test_out_of_order_app_completions_route_by_uuid + results = {} + a = Async { results[:a] = @listener.execute_app("foo", "uuid-1") } + b = Async { results[:b] = @listener.execute_app("bar", "uuid-1") } + + # Each execute_app first sends its sendmsg and waits for the +OK ack. + Async::Task.current.yield until @conn.data.size == 2 + uuid_a = event_uuid(@conn.data[0]) + uuid_b = event_uuid(@conn.data[1]) + refute_equal uuid_a, uuid_b + + reply!("+OK") # ack for A's sendmsg + reply!("+OK") # ack for B's sendmsg + + # Completions arrive in REVERSE order — B before A. + complete!(uuid_b, marker: "B-done") + complete!(uuid_a, marker: "A-done") + + Async::Task.current.yield until results.size == 2 + + assert_equal "B-done", results[:b].content[:marker] + assert_equal "A-done", results[:a].content[:marker] + ensure + a&.stop + b&.stop + end + + # Invariant #5/#6: a drop rejects every pending promise and unblocks waiters; + # the write-lock never wraps promise.wait, so this cannot deadlock. + def test_disconnect_rejects_all_pending_senders + errors = [] + a = Async do + @listener.send_message("A") + rescue Librevox::ConnectionError => e + errors << e + end + b = Async do + @listener.send_message("B") + rescue Librevox::ConnectionError => e + errors << e + end + + Async::Task.current.yield until @conn.data.size == 2 + + @listener.connection_closed + + Async::Task.current.yield until errors.size == 2 + assert_equal 2, errors.size, "both blocked senders were unblocked by the drop" + ensure + a&.stop + b&.stop + end + + private + + def reply!(text) + @listener.receive_message( + Librevox::Protocol::Response.new("Content-Type: command/reply\nReply-Text: #{text}", "") + ) + end + + def event_uuid(sendmsg) + sendmsg[/event-uuid: (.+)/, 1] + end + + def complete!(uuid, marker:) + body = "Event-Name: CHANNEL_EXECUTE_COMPLETE\nApplication-UUID: #{uuid}\nmarker: #{marker}" + headers = "Content-Type: text/event-plain\nContent-Length: #{body.bytesize}" + @listener.receive_message(Librevox::Protocol::Response.new(headers, body)) + end +end From fe9581bc615d05189975593fae9bd716ad2ac48e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Thu, 28 May 2026 13:51:58 +0200 Subject: [PATCH 15/15] Add real-socket integration test for framing and reply routing Drives Base#send_message -> Protocol::Connection -> Socket.pair -> real receive_data framing on the peer, with replies routed back through the listener's reader. Closes the gap where nothing exercised Connection over a live fd. --- test/integration/write_path_test.rb | 89 +++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 test/integration/write_path_test.rb diff --git a/test/integration/write_path_test.rb b/test/integration/write_path_test.rb new file mode 100644 index 0000000..a784831 --- /dev/null +++ b/test/integration/write_path_test.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +require_relative '../test_helper' + +require 'librevox/protocol/connection' +require 'librevox/listener/base' +require 'socket' +require 'io/stream' + +# End-to-end coverage of the send/receive path over a real socket: drives +# Base#send_message -> the real Protocol::Connection -> a real OS socket -> +# the real receive_data framing on the peer, with replies routed back through +# the listener's reader. Nothing else exercises Connection over a live fd. +# +# Note: this does NOT prove the write-lock's value — empirically, the real +# IO::Stream does not interleave concurrent writes even at 1 MB, so these tests +# pass with the lock removed too. The lock's behavior is guarded by the unit +# test in test/functional/librevox/listener/concurrency_test.rb (which forces +# a mid-send yield the real stream does not actually perform). +class TestRealSocketWritePath < Minitest::Test + prepend Librevox::Test::AsyncTest + + def setup + @sock_a, @sock_b = Socket.pair(:UNIX, :STREAM) + end + + def teardown + @sock_a&.close + @sock_b&.close + end + + # Framing round-trips: a reply written on one end reconstructs exactly on the + # other (read_until "\n\n" + Content-Length body). + def test_framing_round_trips_over_a_real_socket + sender = Librevox::Protocol::Connection.new(IO::Stream(@sock_a)) + receiver = Librevox::Protocol::Connection.new(IO::Stream(@sock_b)) + + body = "Event-Name: CHANNEL_DATA\nUnique-ID: abc" + sender.send_data("Content-Type: text/event-plain\nContent-Length: #{body.bytesize}\n\n#{body}") + + msg = receiver.receive_data + + assert msg.event? + assert_equal "CHANNEL_DATA", msg.event + assert_equal "abc", msg.content[:unique_id] + end + + # Many fibers send concurrently; the peer must see every command as a whole, + # uncorrupted frame in send order, and every sender must get its reply back + # via FIFO routing through the listener's real reader loop. + def test_concurrent_sends_arrive_whole_and_ordered + n = 8 + listener_conn = Librevox::Protocol::Connection.new(IO::Stream(@sock_a)) + peer = Librevox::Protocol::Connection.new(IO::Stream(@sock_b)) + listener = Librevox::Listener::Base.new(listener_conn) + + received = [] + + # Listener-side reader, exactly like Session: dispatch replies so senders + # waiting on their promise unblock. + reader = Async do + listener_conn.each_message { |m| listener.receive_message(m) } + end + + # Peer plays FreeSWITCH: read each framed command, echo one +OK back. + peer_task = Async do + n.times do + msg = peer.receive_data + received << msg.headers[:command] + peer.send_data("Content-Type: command/reply\nReply-Text: +OK") + end + end + + # Fire n concurrent senders. Each blocks in send_message until its reply. + senders = (0...n).map do |i| + Async { listener.send_message("Command: CMD-#{i}") } + end + senders.each(&:wait) + peer_task.wait + + expected = (0...n).map { |i| "CMD-#{i}" } + assert_equal expected, received, + "commands must arrive whole and in send order over the real socket" + ensure + reader&.stop + peer_task&.stop + senders&.each(&:stop) + end +end