diff --git a/README.md b/README.md index c377f6b..05cfa31 100644 --- a/README.md +++ b/README.md @@ -245,7 +245,7 @@ finished. Application completion is signalled by a `CHANNEL_EXECUTE_COMPLETE` event: ``` -Listener → FS: sendmsg +Listener → FS: sendmsg call-command: execute execute-app-name: playback execute-app-arg: welcome.wav @@ -270,18 +270,19 @@ being processed until the current application completes. ### Two fibers per connection -Librevox runs two fibers for each outbound connection: +Librevox runs two fibers for each connection: - **Session fiber** (`run_session`) — runs the setup sequence and then - `session_initiated`. Each `send_message` or `application` call blocks the fiber - until the reply arrives. -- **Read fiber** (`read_loop`) — reads messages from the socket and dispatches - them to `Async::Queue` instances, waking the session fiber. - -An `Async::Semaphore(1)` mutex on `send_message` ensures only one command is -in-flight at a time, so replies are always delivered to the correct caller. -This also serializes commands issued by event hooks (which run in their own -fibers) with the main session flow. + `session_initiated`. Each `send_message` or `application` call creates an + `Async::Promise`, pushes it onto an array, and blocks the fiber until the + promise is resolved. +- **Read fiber** (`read_loop`) — reads messages from the socket and resolves + promises in FIFO order, waking the session fiber. + +No mutex is needed — Ruby's cooperative fiber scheduling guarantees that the +promise push happens before the I/O yield point (the socket write), so +interleaving from concurrent event-hook fibers is safe. When a connection +drops, pending promises are rejected with `ConnectionError`. ## API Documentation diff --git a/lib/librevox.rb b/lib/librevox.rb index eb618b3..83d7642 100644 --- a/lib/librevox.rb +++ b/lib/librevox.rb @@ -9,6 +9,7 @@ class ConnectionError < StandardError; end autoload :Client, 'librevox/client' autoload :CommandSocket, 'librevox/command_socket' + autoload :CommandDelegate, 'librevox/command_delegate' autoload :Commands, 'librevox/commands' autoload :Applications, 'librevox/applications' autoload :Runner, 'librevox/runner' diff --git a/lib/librevox/client.rb b/lib/librevox/client.rb index 3fbb9df..025861f 100644 --- a/lib/librevox/client.rb +++ b/lib/librevox/client.rb @@ -1,9 +1,15 @@ # frozen_string_literal: true require 'io/stream' +require 'io/endpoint/host_endpoint' module Librevox class Client + def self.start(handler, host: "localhost", port: 8021, **options) + endpoint = IO::Endpoint.tcp(host, port) + new(handler, endpoint, **options).run + end + def initialize(handler, endpoint, **options) @handler = handler @endpoint = endpoint @@ -21,7 +27,7 @@ def connect(socket) def run loop do @endpoint.connect(&method(:connect)) - rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError => e + rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError, ResponseError => e Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s." sleep 1 end @@ -35,6 +41,8 @@ def handle_session(connection, listener) listener.run_session read_task.wait + rescue ConnectionError + # Expected when the connection drops ensure read_task&.stop connection.close @@ -44,17 +52,17 @@ def start_read_loop(connection, listener) Async do read_messages(connection, listener) ensure - # Close queues here (not in handle_session's ensure) so that - # a connection drop unblocks listener.run_session via nil dequeue. + # Reject pending promises here (not in handle_session's ensure) so + # that a connection drop unblocks listener.run_session via rejection. # handle_session's ensure can't run until run_session returns, - # creating a deadlock if queues aren't closed from this fiber. + # creating a deadlock if promises aren't rejected from this fiber. listener.connection_closed end end def read_messages(connection, listener) connection.read_loop do |msg| - listener.receive_message(msg) + listener.receive_data(msg) end end end diff --git a/lib/librevox/command_delegate.rb b/lib/librevox/command_delegate.rb new file mode 100644 index 0000000..71e0670 --- /dev/null +++ b/lib/librevox/command_delegate.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Librevox + # In some cases there are both applications and commands with the same + # name, e.g. fifo. But we can't have two `fifo`-methods, so we include + # commands in CommandDelegate, and expose all commands through the `api` + # method, which wraps a CommandDelegate instance. + class CommandDelegate + include Librevox::Commands + + def initialize(listener) + @listener = listener + end + + def command(*args) + @listener.send_message(super(*args)) + end + end +end diff --git a/lib/librevox/command_socket.rb b/lib/librevox/command_socket.rb index d77f5de..1f1c9f2 100644 --- a/lib/librevox/command_socket.rb +++ b/lib/librevox/command_socket.rb @@ -23,7 +23,7 @@ def connect end def send_message(msg) - @connection.send_message(msg) + @connection.send_data(msg) read_response end @@ -32,19 +32,19 @@ def command(*args) end def read_response - while msg = @connection.read_message + while msg = @connection.receive_data return msg if msg.command_reply? || msg.api_response? end end - def application(uuid, app, args = nil, **params) - headers = params - .merge( + def application(app, uuid, args = nil, **params) + headers = { event_lock: true, call_command: "execute", execute_app_name: app, execute_app_arg: args, - ) + } + .merge(params) .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } send_message "sendmsg #{uuid}\n#{headers.join("\n")}" diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index b063de9..2e24183 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -1,19 +1,11 @@ # frozen_string_literal: true -require 'async/queue' -require 'async/semaphore' require 'async/barrier' +require 'securerandom' module Librevox module Listener class Base - def initialize(connection) - @connection = connection - @reply_queue = Async::Queue.new - @command_mutex = Async::Semaphore.new(1) - @event_barrier = Async::Barrier.new - end - class << self def hooks @hooks ||= Hash.new {|hash, key| hash[key] = []} @@ -24,20 +16,11 @@ def event(event, &block) end end - # In some cases there are both applications and commands with the same - # name, e.g. fifo. But we can't have two `fifo`-methods, so we include - # commands in CommandDelegate, and expose all commands through the `api` - # method, which wraps a CommandDelegate instance. - class CommandDelegate - include Librevox::Commands - - def initialize(listener) - @listener = listener - end - - def command(*args) - @listener.send_message(super(*args)) - end + def initialize(connection) + @connection = connection + @reply_promises = [] + @app_promises = {} + @event_barrier = Async::Barrier.new end # Exposes an instance of {CommandDelegate}, which includes {Librevox::Commands}. @@ -51,47 +34,70 @@ def api end def send_message(msg) - @command_mutex.acquire do - @connection.send_message(msg) - reply = @reply_queue.dequeue - raise ConnectionError, "Connection closed" if reply.nil? - raise ResponseError, reply.headers[:reply_text] if reply.error? - reply - end + promise = Async::Promise.new + + @reply_promises << promise + + @connection.send_data(msg) + + reply = promise.wait + raise ResponseError, reply.headers[:reply_text] if reply.error? + + reply end - attr_accessor :response + def execute_app(app, uuid, args = nil, **params) + event_uuid = SecureRandom.uuid + + headers = { + event_lock: true, + call_command: "execute", + execute_app_name: app, + execute_app_arg: args, + event_uuid: event_uuid, + } + .merge(params) + .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } - def receive_message(response) - @response = response - handle_response + promise = Async::Promise.new + @app_promises[event_uuid] = promise + + send_message "sendmsg #{uuid}\n#{headers.join("\n")}" + + promise.wait end - def handle_response + def receive_data(response) if response.reply? - @reply_queue.push(response) + @reply_promises.shift&.resolve(response) return end if response.event? - resp = response + if response.event == "CHANNEL_EXECUTE_COMPLETE" + app_uuid = response.content[:application_uuid] + @app_promises.delete(app_uuid)&.resolve(response) + end + @event_barrier.async do - on_event(resp) - invoke_event_hooks(resp) + on_event(response) + invoke_event_hooks(response) end end end - # override - def on_event(event) - end + def connection_closed + error = ConnectionError.new("Connection closed") - def run_session - end + @reply_promises.each { |p| p.reject(error) } + @app_promises.each_value { |p| p.reject(error) } + + @reply_promises.clear + @app_promises.clear - def connection_closed - @reply_queue.close @event_barrier.wait + rescue ConnectionError + # Expected — event hooks may have been mid-command when disconnected end def disconnect @@ -100,6 +106,12 @@ def disconnect private + def on_event(event) + end + + def run_session + end + def invoke_event_hooks(resp) event_name = resp.event.downcase.to_sym hooks = self.class.hooks[event_name] diff --git a/lib/librevox/listener/inbound.rb b/lib/librevox/listener/inbound.rb index be7111b..6d51ced 100644 --- a/lib/librevox/listener/inbound.rb +++ b/lib/librevox/listener/inbound.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require 'io/endpoint/host_endpoint' - module Librevox module Listener class Inbound < Base @@ -18,10 +16,8 @@ def filters(filters) end end - def self.run(barrier, host: "localhost", port: 8021, **options) - endpoint = IO::Endpoint.tcp(host, port) - client = Client.new(self, endpoint, **options) - barrier.async { client.run } + def self.start(...) + Client.start(self, ...) end def initialize(connection, args = {}) diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index fa1a57b..5614d1b 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -1,54 +1,19 @@ # frozen_string_literal: true -require 'io/endpoint/host_endpoint' - module Librevox module Listener class Outbound < Base include Librevox::Applications - def self.run(barrier, host: "localhost", port: 8084, **options) - endpoint = IO::Endpoint.tcp(host, port, **options) - server = Server.new(self, endpoint) - barrier.async { server.run } - end - - def application(app, args = nil, **params) - variable_name = params.delete(:variable) - - headers = params - .merge( - event_lock: true, - call_command: "execute", - execute_app_name: app, - execute_app_arg: args, - ) - .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } - - send_message "sendmsg\n#{headers.join("\n")}" - - response = @app_complete_queue.dequeue - - if response.nil? - raise ConnectionError, "Connection closed" - end - - @session = response.content - - variable(variable_name) if variable_name + def self.start(...) + Server.start(self, ...) end attr_accessor :session - # Called when a new session is initiated. - def session_initiated - end - def initialize(connection, options = {}) super(connection) - @session = nil - @app_complete_queue = Async::Queue.new end def run_session @@ -58,33 +23,32 @@ def run_session send_message "linger" session_initiated - rescue ResponseError, ConnectionError, IOError, Errno::EPIPE => e - Librevox.logger.error "Session error: #{e.message}" end - def connection_closed - super - - @app_complete_queue.close + # Called when a new session is initiated. + def session_initiated end - def handle_response - if response.event? && response.event == "CHANNEL_DATA" - @session = response.content - elsif response.event? && response.event == "CHANNEL_EXECUTE_COMPLETE" - @app_complete_queue.push(response) - end + def application(app, args = nil, **params) + variable_name = params.delete(:variable) - super + response = execute_app(app, session[:unique_id], args, **params) + + @session = response.content + + variable(variable_name) if variable_name end def variable(name) session[:"variable_#{name}"] end - def update_session - response = api.command "uuid_dump", session[:unique_id] - @session = response.content + def receive_data(response) + if response.event? && response.event == "CHANNEL_DATA" + @session = response.content + end + + super end end end diff --git a/lib/librevox/protocol/connection.rb b/lib/librevox/protocol/connection.rb index 397c4b9..975f763 100644 --- a/lib/librevox/protocol/connection.rb +++ b/lib/librevox/protocol/connection.rb @@ -7,7 +7,7 @@ def initialize(stream) @stream = stream end - def read_message + def receive_data loop do headers = @stream.read_until("\n\n") return nil if headers.nil? @@ -25,14 +25,13 @@ def read_message end def read_loop - while (msg = read_message) + while (msg = receive_data) yield msg end end - def send_message(msg) - @stream.write("#{msg}\n\n") - @stream.flush + def send_data(msg) + @stream.write("#{msg}\n\n", flush: true) end def close_write @@ -42,8 +41,6 @@ def close_write end def close - return if @stream.closed? - @stream.close rescue Errno::EPIPE, Errno::ECONNRESET # Remote end already closed diff --git a/lib/librevox/runner.rb b/lib/librevox/runner.rb index 97c7ae2..6596974 100644 --- a/lib/librevox/runner.rb +++ b/lib/librevox/runner.rb @@ -31,7 +31,9 @@ def initialize(barrier) end def run(klass, **args) - klass.run(@barrier, **args) + @barrier.async do + klass.start(**args) + end end end end diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index 32fde58..296da05 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -1,17 +1,23 @@ # frozen_string_literal: true require 'io/stream' +require 'io/endpoint/host_endpoint' module Librevox class Server + attr :endpoint + + def self.start(handler, host: "localhost", port: 8084, **options) + endpoint = IO::Endpoint.tcp(host, port) + new(handler, endpoint, **options).run + end + def initialize(handler, endpoint, **options) @handler = handler @endpoint = endpoint @options = options end - attr :endpoint - def accept(socket, _address) stream = IO::Stream(socket) connection = Protocol::Connection.new(stream) @@ -37,6 +43,8 @@ def handle_session(connection, listener) listener.run_session read_task.wait + rescue ConnectionError + # Expected when the connection drops ensure read_task&.stop connection.close @@ -46,10 +54,10 @@ def start_read_loop(connection, listener) Async do read_messages(connection, listener) ensure - # Close queues here (not in handle_session's ensure) so that - # a connection drop unblocks listener.run_session via nil dequeue. + # Reject pending promises here (not in handle_session's ensure) so + # that a connection drop unblocks listener.run_session via rejection. # handle_session's ensure can't run until run_session returns, - # creating a deadlock if queues aren't closed from this fiber. + # creating a deadlock if promises aren't rejected from this fiber. listener.connection_closed end end @@ -62,7 +70,7 @@ def read_messages(connection, listener) if msg.disconnect_notice? disconnecting = true else - listener.receive_message(msg) + listener.receive_data(msg) hung_up = true if msg.event? && msg.event == "CHANNEL_HANGUP_COMPLETE" end break if disconnecting && hung_up diff --git a/test/functional/librevox/command_socket_test.rb b/test/functional/librevox/command_socket_test.rb new file mode 100644 index 0000000..a38972c --- /dev/null +++ b/test/functional/librevox/command_socket_test.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +require_relative '../../test_helper' + +require 'librevox/command_socket' + +class TestCommandSocket < Minitest::Test + def setup + @server = TCPServer.new("127.0.0.1", 0) + @port = @server.local_address.ip_port + end + + def teardown + @socket&.close + @fs&.close + @server&.close + end + + def test_sends_api_command_and_returns_response + connect + + thread = Thread.new { @socket.status } + + assert_equal "api status", read_message + reply "api/response", "OK" + + response = thread.value + assert response.api_response? + assert_equal "OK", response.content + end + + def test_sends_application_via_sendmsg + connect + + thread = Thread.new { @socket.application("playback", "1234-abcd", "/tmp/test.wav") } + + msg = read_message + + assert_match(/\Asendmsg 1234-abcd\n/, msg) + assert_match(/execute-app-name: playback/, msg) + assert_match(/execute-app-arg: \/tmp\/test.wav/, msg) + assert_match(/event-lock: true/, msg) + + reply "command/reply", "+OK" + + thread.value + end + + private + + def connect + thread = Thread.new { @socket = Librevox::CommandSocket.new(port: @port) } + + @fs = @server.accept + assert_equal "auth ClueCon", read_message + reply "command/reply", "+OK accepted" + + thread.join(2) + end + + def reply(content_type, body) + @fs.write("Content-Type: #{content_type}\nContent-Length: #{body.size}\n\n#{body}") + end + + def read_message + @fs.gets("\n\n")&.strip + end +end diff --git a/test/functional/librevox/listener/base_test.rb b/test/functional/librevox/listener/base_test.rb index 36918a7..7892439 100644 --- a/test/functional/librevox/listener/base_test.rb +++ b/test/functional/librevox/listener/base_test.rb @@ -10,7 +10,7 @@ def setup @listener = @class.new(MockConnection.new) end - # Without Async in handle_response, on_event calling api.* deadlocks the + # Without Async in receive_data, on_event calling api.* deadlocks the # calling fiber. A Thread timeout is the only way to detect this — all # Async fibers are stuck so Async-level timeouts can't fire. def test_on_event_with_api_does_not_block_handle_response diff --git a/test/functional/librevox/listener/inbound_filter_test.rb b/test/functional/librevox/listener/inbound_filter_test.rb index e6e4d4a..d41a43d 100644 --- a/test/functional/librevox/listener/inbound_filter_test.rb +++ b/test/functional/librevox/listener/inbound_filter_test.rb @@ -36,7 +36,7 @@ def teardown super end - def test_authorize_and_subscribe_to_events + def test_sends_auth_events_and_filters assert_equal "auth ClueCon", @listener.outgoing_data.shift assert_equal "event plain CUSTOM CHANNEL_EXECUTE", @listener.outgoing_data.shift assert_equal "filter Caller-Context default", @listener.outgoing_data.shift diff --git a/test/functional/librevox/listener/inbound_test.rb b/test/functional/librevox/listener/inbound_test.rb index 24d9d54..407ce77 100644 --- a/test/functional/librevox/listener/inbound_test.rb +++ b/test/functional/librevox/listener/inbound_test.rb @@ -29,7 +29,7 @@ def teardown super end - def test_authorize_and_subscribe_to_events + def test_sends_auth_and_event_subscription assert_equal "auth ClueCon", @listener.outgoing_data.shift assert_equal "event plain ALL", @listener.outgoing_data.shift assert_nil @listener.outgoing_data.shift diff --git a/test/functional/librevox/listener/outbound_api_test.rb b/test/functional/librevox/listener/outbound_api_test.rb index 13659bc..21a56f8 100644 --- a/test/functional/librevox/listener/outbound_api_test.rb +++ b/test/functional/librevox/listener/outbound_api_test.rb @@ -12,33 +12,27 @@ def session_initiated end end -class TestOutboundListenerWithAppsAndApi < Minitest::Test +class TestOutboundAppsAndApi < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithAppsAndApi.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithAppsAndApi end def teardown - @session_task&.stop + teardown_outbound super end - def test_wait_for_execute_complete_before_calling_next_app_or_cmd - assert_send_application @listener, "foo" - execute_complete + def test_waits_for_execute_complete_before_api_and_next_app + assert_execute_app @listener, "foo", "1234" + execute_complete "Unique-ID" => "1234" assert_send_command @listener, "api bar" api_response body: "+OK" - assert_send_application @listener, "baz" + assert_execute_app @listener, "baz", "1234" end end diff --git a/test/functional/librevox/listener/outbound_apps_test.rb b/test/functional/librevox/listener/outbound_apps_test.rb index 7a861c9..4faf8c7 100644 --- a/test/functional/librevox/listener/outbound_apps_test.rb +++ b/test/functional/librevox/listener/outbound_apps_test.rb @@ -4,7 +4,7 @@ require 'librevox/listener/outbound' -class OutboundListenerWithNestedApps < Librevox::Listener::Outbound +class OutboundListenerWithSequentialApps < Librevox::Listener::Outbound def session_initiated sample_app "foo" sample_app "bar" @@ -23,38 +23,32 @@ def session_initiated end end -class TestOutboundListenerWithApps < Minitest::Test +class TestOutboundSequentialApps < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithNestedApps.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithSequentialApps end def teardown - @session_task&.stop + teardown_outbound super end - def test_only_send_one_app_at_a_time - assert_send_application @listener, "foo" + def test_sends_one_app_at_a_time + assert_execute_app @listener, "foo", "1234" assert_send_nothing @listener - execute_complete + execute_complete "Unique-ID" => "1234" - assert_send_application @listener, "bar" + assert_execute_app @listener, "bar", "1234" assert_send_nothing @listener end - def test_not_be_driven_forward_by_events - assert_send_application @listener, "foo" + def test_events_do_not_advance_app + assert_execute_app @listener, "foo", "1234" command_reply body: { "Event-Name" => "CHANNEL_EXECUTE", @@ -64,16 +58,16 @@ def test_not_be_driven_forward_by_events assert_send_nothing @listener end - def test_not_be_driven_forward_by_api_responses - assert_send_application @listener, "foo" + def test_api_responses_do_not_advance_app + assert_execute_app @listener, "foo", "1234" api_response body: "Foo" assert_send_nothing @listener end - def test_not_be_driven_forward_by_disconnect_notifications - assert_send_application @listener, "foo" + def test_disconnect_notices_do_not_advance_app + assert_execute_app @listener, "foo", "1234" response "Content-Type" => "text/disconnect-notice", body: "Lingering" @@ -81,8 +75,8 @@ def test_not_be_driven_forward_by_disconnect_notifications assert_send_nothing @listener end - def test_not_be_driven_forward_by_command_reply - assert_send_application @listener, "foo" + def test_command_replies_do_not_advance_app + assert_execute_app @listener, "foo", "1234" command_reply "Reply-Text" => "+OK" @@ -90,52 +84,40 @@ def test_not_be_driven_forward_by_command_reply end end -class TestOutboundListenerWithCustomHeaders < Minitest::Test +class TestOutboundCustomHeaders < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithCustomHeaders.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithCustomHeaders end def teardown - @session_task&.stop + teardown_outbound super end def test_sends_custom_headers - assert_send_application @listener, "playback", "/tmp/test.wav", loops: 3 + assert_execute_app @listener, "playback", "1234", "/tmp/test.wav", loops: 3 end end -class TestOutboundListenerWithEventLockOverride < Minitest::Test +class TestOutboundEventLockOverride < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithEventLockOverride.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithEventLockOverride end def teardown - @session_task&.stop + teardown_outbound super end def test_overrides_event_lock - assert_send_application @listener, "playback", "/tmp/test.wav", event_lock: false + assert_execute_app @listener, "playback", "1234", "/tmp/test.wav", event_lock: false end end diff --git a/test/functional/librevox/listener/outbound_data_passing_test.rb b/test/functional/librevox/listener/outbound_data_passing_test.rb new file mode 100644 index 0000000..9c54525 --- /dev/null +++ b/test/functional/librevox/listener/outbound_data_passing_test.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +require_relative '../../../test_helper' + +require 'librevox/listener/outbound' + +class OutboundListenerWithDataPassing < Librevox::Listener::Outbound + def session_initiated + sample_app "foo" + data = reader_app + application "send", "the end: #{data}" + end +end + +class TestOutboundDataPassing < Minitest::Test + prepend Librevox::Test::AsyncTest + include OutboundSetupHelpers + include Librevox::Test::Matchers + + def setup + setup_outbound OutboundListenerWithDataPassing + end + + def teardown + teardown_outbound + super + end + + def test_passes_data_between_sequential_apps + assert_execute_app @listener, "foo", "1234" + execute_complete "Unique-ID" => "1234" + + assert_execute_app @listener, "reader_app", "1234" + execute_complete "variable_app_var" => "Second", "Unique-ID" => "1234" + + assert_execute_app @listener, "send", "1234", "the end: Second" + end +end diff --git a/test/functional/librevox/listener/outbound_error_test.rb b/test/functional/librevox/listener/outbound_error_test.rb index fe0ece6..e19aa0c 100644 --- a/test/functional/librevox/listener/outbound_error_test.rb +++ b/test/functional/librevox/listener/outbound_error_test.rb @@ -26,24 +26,17 @@ class TestOutboundApplicationError < Minitest::Test include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithErrorApp.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times { @listener.outgoing_data.shift } + setup_outbound OutboundListenerWithErrorApp end def teardown - @session_task&.stop + teardown_outbound super end def test_application_raises_on_error_reply - assert_send_application @listener, "fail" + assert_execute_app @listener, "fail", "1234" - # sendmsg ack with error — raises instead of blocking on app_complete_queue command_reply "Reply-Text" => "-ERR invalid command" assert_instance_of Librevox::ResponseError, @listener.error @@ -56,36 +49,20 @@ class TestOutboundUnhandledApplicationError < Minitest::Test include OutboundSetupHelpers include Librevox::Test::Matchers - def setup - @listener = OutboundListenerWithUnhandledErrorApp.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times { @listener.outgoing_data.shift } - end - - def teardown - @session_task&.stop - super - end - - def test_unhandled_error_ends_session_cleanly - assert_send_application @listener, "fail" - - log = StringIO.new - original_logger = Librevox.logger - Librevox.logger = Logger.new(log) - - # sendmsg ack with error — run_session rescues and logs, no crash - command_reply "Reply-Text" => "-ERR invalid command" - - # session task completes without raising - @session_task.wait - - assert_match(/-ERR invalid command/, log.string) - ensure - Librevox.logger = original_logger + def test_unhandled_error_propagates + setup_outbound OutboundListenerWithUnhandledErrorApp + + assert_execute_app @listener, "fail", "1234" + + # Send error reply and wait on task without yielding in between, + # so async doesn't log an unhandled exception warning. + error_reply = Librevox::Protocol::Response.new( + "Content-Type: command/reply\nReply-Text: -ERR invalid command", "" + ) + error = assert_raises(Librevox::ResponseError) do + @listener.receive_data(error_reply) + @session_task.wait + end + assert_equal "-ERR invalid command", error.message end end diff --git a/test/functional/librevox/listener/outbound_handshake_test.rb b/test/functional/librevox/listener/outbound_handshake_test.rb new file mode 100644 index 0000000..8458b8f --- /dev/null +++ b/test/functional/librevox/listener/outbound_handshake_test.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require_relative '../../../test_helper' + +require 'librevox/listener/outbound' + +class TestOutboundHandshakeOrdering < Minitest::Test + prepend Librevox::Test::AsyncTest + include Librevox::Test::ListenerHelpers + include Librevox::Test::Matchers + + def setup + @listener = Librevox::Listener::Outbound.new(MockConnection.new) + @session_task = Async { @listener.run_session } + end + + def teardown + @session_task&.stop + super + end + + def test_session_initiated_fires_on_linger_reply_not_myevents_reply + # connect response sets session + command_reply "Unique-ID" => "1234" + + # myevents reply — run_session progresses to linger command, but + # session_initiated has NOT been called yet + @listener.outgoing_data.clear + command_reply "Reply-Text" => "+OK Events Enabled" + # linger command was sent (run_session progressed), but no session_initiated yet + assert_send_command @listener, "linger" + assert_send_nothing @listener + + # linger reply — NOW session_initiated fires + command_reply "Reply-Text" => "+OK will linger" + end +end diff --git a/test/functional/librevox/listener/outbound_non_nested_test.rb b/test/functional/librevox/listener/outbound_non_nested_test.rb deleted file mode 100644 index f55c628..0000000 --- a/test/functional/librevox/listener/outbound_non_nested_test.rb +++ /dev/null @@ -1,45 +0,0 @@ -# frozen_string_literal: true - -require_relative '../../../test_helper' - -require 'librevox/listener/outbound' - -class OutboundListenerWithNonNestedApps < Librevox::Listener::Outbound - attr_reader :queue - def session_initiated - sample_app "foo" - data = reader_app - application "send", "the end: #{data}" - end -end - -class TestOutboundListenerWithNonNestedApps < Minitest::Test - prepend Librevox::Test::AsyncTest - include OutboundSetupHelpers - include Librevox::Test::Matchers - - def setup - @listener = OutboundListenerWithNonNestedApps.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} - end - - def teardown - @session_task&.stop - super - end - - def test_wait_for_execute_complete_before_calling_next_app - assert_send_application @listener, "foo" - execute_complete "Unique-ID" => "1234" - - assert_send_application @listener, "reader_app" - execute_complete "variable_app_var" => "Second" - - assert_send_application @listener, "send", "the end: Second" - end -end diff --git a/test/functional/librevox/listener/outbound_reader_test.rb b/test/functional/librevox/listener/outbound_reader_test.rb index b08d1c4..2d7c5eb 100644 --- a/test/functional/librevox/listener/outbound_reader_test.rb +++ b/test/functional/librevox/listener/outbound_reader_test.rb @@ -11,41 +11,34 @@ def session_initiated end end -class TestOutboundListenerWithAppReadingData < Minitest::Test +class TestOutboundAppReadingData < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithReader.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} - - assert_send_application @listener, "reader_app" + setup_outbound OutboundListenerWithReader + assert_execute_app @listener, "reader_app", "1234" end def teardown - @session_task&.stop + teardown_outbound super end - def test_not_send_anything_while_missing_response + def test_blocks_until_execute_complete assert_send_nothing @listener end - def test_update_session_from_execute_complete - execute_complete "Session-Var" => "Second" + def test_updates_session_from_execute_complete + execute_complete "Session-Var" => "Second", "Unique-ID" => "1234" assert_equal "Second", @listener.session[:session_var] end - def test_return_value_of_channel_variable - execute_complete "variable_app_var" => "Second" + def test_returns_channel_variable_value + execute_complete "variable_app_var" => "Second", "Unique-ID" => "1234" - assert_send_application @listener, "send", "Second" + assert_execute_app @listener, "send", "1234", "Second" end end diff --git a/test/functional/librevox/listener/outbound_session_test.rb b/test/functional/librevox/listener/outbound_session_test.rb deleted file mode 100644 index 211c111..0000000 --- a/test/functional/librevox/listener/outbound_session_test.rb +++ /dev/null @@ -1,78 +0,0 @@ -# frozen_string_literal: true - -require_relative '../../../test_helper' - -require 'librevox/listener/outbound' - -class OutboundListenerWithUpdateSessionCallback < Librevox::Listener::Outbound - def session_initiated - update_session - application "send", "yay, #{session[:session_var]}" - end -end - -class TestOutboundListenerWithUpdateSessionCallback < Minitest::Test - prepend Librevox::Test::AsyncTest - include OutboundSetupHelpers - include Librevox::Test::Matchers - - def setup - @listener = OutboundListenerWithUpdateSessionCallback.new(MockConnection.new) - @session_task = Async { @listener.run_session } - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} - - assert_update_session @listener - api_response body: { - "Event-Name" => "CHANNEL_DATA", - "Session-Var" => "Second" - } - end - - def teardown - @session_task&.stop - super - end - - def test_execute_callback - assert_match(/yay,/, @listener.outgoing_data.shift) - end - - def test_update_session_before_calling_callback - assert_send_application @listener, "send", "yay, Second" - end -end - -class TestOutboundReplyQueueOrdering < Minitest::Test - prepend Librevox::Test::AsyncTest - include Librevox::Test::ListenerHelpers - include Librevox::Test::Matchers - - def setup - @listener = Librevox::Listener::Outbound.new(MockConnection.new) - @session_task = Async { @listener.run_session } - end - - def teardown - @session_task&.stop - super - end - - def test_session_initiated_fires_on_linger_reply_not_myevents_reply - # connect response sets session - command_reply "Unique-ID" => "1234" - - # myevents reply — run_session progresses to linger command, but - # session_initiated has NOT been called yet - @listener.outgoing_data.clear - command_reply "Reply-Text" => "+OK Events Enabled" - # linger command was sent (run_session progressed), but no session_initiated yet - assert_send_command @listener, "linger" - assert_send_nothing @listener - - # linger reply — NOW session_initiated fires - command_reply "Reply-Text" => "+OK will linger" - end -end diff --git a/test/functional/librevox/listener/outbound_test.rb b/test/functional/librevox/listener/outbound_test.rb index 35babe6..0523f26 100644 --- a/test/functional/librevox/listener/outbound_test.rb +++ b/test/functional/librevox/listener/outbound_test.rb @@ -10,23 +10,20 @@ def session_initiated end end -class TestOutboundListener < Minitest::Test +class TestOutboundHandshake < Minitest::Test prepend Librevox::Test::AsyncTest - include OutboundSetupHelpers + include Librevox::Test::ListenerHelpers include Librevox::Test::Matchers - include EventTests - include ApiCommandTests def setup @listener = OutboundTestListener.new(MockConnection.new) @session_task = Async { @listener.run_session } - command_reply( - "Caller-Caller-Id-Number" => "8675309", - "Unique-ID" => "1234", - "variable_some_var" => "some value" - ) - event_and_linger_replies - super + + command_reply "Caller-Caller-Id-Number" => "8675309", + "Unique-ID" => "1234", + "variable_some_var" => "some value" + command_reply "Reply-Text" => "+OK Events Enabled" + command_reply "Reply-Text" => "+OK will linger" end def teardown @@ -34,25 +31,41 @@ def teardown super end - def test_connect_to_freeswitch_and_subscribe_to_events - assert_send_command @listener, "connect" - assert_send_command @listener, "myevents" - assert_send_command @listener, "linger" + def test_sends_connect_myevents_linger_in_order + assert_equal "connect", @listener.outgoing_data.shift + assert_equal "myevents", @listener.outgoing_data.shift + assert_equal "linger", @listener.outgoing_data.shift + assert_nil @listener.outgoing_data.shift end - def test_establish_a_session + def test_establishes_session_from_connect_reply assert_equal Hash, @listener.session.class + assert_equal "8675309", @listener.session[:caller_caller_id_number] end - def test_call_session_callback_after_establishing_new_session + def test_calls_session_initiated_after_handshake assert_includes @listener.hook_log, "session was initiated" end - def test_make_headers_available_through_session - assert_equal "8675309", @listener.session[:caller_caller_id_number] + def test_exposes_channel_variables + assert_equal "some value", @listener.variable(:some_var) end +end - def test_make_channel_variables_available_through_variable - assert_equal "some value", @listener.variable(:some_var) +class TestOutboundEvents < Minitest::Test + prepend Librevox::Test::AsyncTest + include OutboundSetupHelpers + include Librevox::Test::Matchers + include EventTests + include ApiCommandTests + + def setup + setup_outbound OutboundTestListener + super + end + + def teardown + teardown_outbound + super end end diff --git a/test/functional/librevox/protocol/connection_test.rb b/test/functional/librevox/protocol/connection_test.rb index f9413a1..9024e54 100644 --- a/test/functional/librevox/protocol/connection_test.rb +++ b/test/functional/librevox/protocol/connection_test.rb @@ -16,43 +16,43 @@ def teardown @write_io.close unless @write_io.closed? end - def test_read_headers_only_message + def test_receives_headers_only_message @write_io.write "Content-Type: command/reply\nReply-Text: +OK\n\n" @write_io.close - msg = @connection.read_message + msg = @connection.receive_data assert_instance_of Librevox::Protocol::Response, msg assert_equal "command/reply", msg.headers[:content_type] assert_equal "+OK", msg.headers[:reply_text] end - def test_read_message_with_content + def test_receive_data_with_content body = "Event-Name: HEARTBEAT" @write_io.write "Content-Length: #{body.size}\n\n#{body}\n\n" @write_io.close - msg = @connection.read_message + msg = @connection.receive_data assert_instance_of Librevox::Protocol::Response, msg assert_equal body.size.to_s, msg.headers[:content_length] assert_equal "HEARTBEAT", msg.content[:event_name] end - def test_read_multiple_messages + def test_receives_multiple_messages @write_io.write "Content-Type: command/reply\n\n" @write_io.write "Content-Type: api/response\n\n" @write_io.close - msg1 = @connection.read_message + msg1 = @connection.receive_data assert_equal "command/reply", msg1.headers[:content_type] - msg2 = @connection.read_message + msg2 = @connection.receive_data assert_equal "api/response", msg2.headers[:content_type] end def test_eof_returns_nil @write_io.close - assert_nil @connection.read_message + assert_nil @connection.receive_data end def test_skips_empty_header_blocks_after_content @@ -60,14 +60,14 @@ def test_skips_empty_header_blocks_after_content @write_io.write "Content-Length: #{body.size}\n\n#{body}\n\n" @write_io.close - msg = @connection.read_message + msg = @connection.receive_data assert_equal "TEST", msg.content[:event_name] # The trailing \n\n after content creates an empty block which should be skipped - assert_nil @connection.read_message + assert_nil @connection.receive_data end - def test_read_loop_yields_each_message + def test_read_loop_yields_each_response @write_io.write "Content-Type: command/reply\n\n" @write_io.write "Content-Type: api/response\n\n" @write_io.close @@ -80,11 +80,11 @@ def test_read_loop_yields_each_message assert_equal "api/response", messages[1].headers[:content_type] end - def test_send_message_writes_with_terminator + def test_send_data_writes_with_terminator write_stream = IO::Stream(@write_io) conn = Librevox::Protocol::Connection.new(write_stream) - conn.send_message("auth ClueCon") + conn.send_data("auth ClueCon") write_stream.close assert_equal "auth ClueCon\n\n", @read_io.read diff --git a/test/integration/outbound_disconnect_test.rb b/test/integration/outbound_disconnect_test.rb index 32c0fbf..5b002d1 100644 --- a/test/integration/outbound_disconnect_test.rb +++ b/test/integration/outbound_disconnect_test.rb @@ -10,12 +10,8 @@ require 'timeout' class BlockedOnAppListener < Librevox::Listener::Outbound - attr_reader :error - def session_initiated sample_app "playback", "/tmp/test.wav" - rescue Librevox::ConnectionError => e - @error = e end end @@ -177,15 +173,15 @@ def test_connection_drop_unblocks_blocked_send_message port, server_thread = start_server(BlockedOnAppListener) socket = fake_fs_connect(port) - # session_initiated calls sample_app which sends sendmsg and blocks on app_complete_queue + # session_initiated calls sample_app which sends sendmsg and blocks on app promise msg = socket.gets("\n\n") assert_match(/sendmsg/, msg) # ack the sendmsg socket.write("Content-Type: command/reply\nReply-Text: +OK\n\n") - # Now the listener is blocked on app_complete_queue.dequeue - # Drop the connection — should unblock via queue close + # Now the listener is blocked on app promise + # Drop the connection — should unblock via promise rejection socket.close socket = nil diff --git a/test/support/listener.rb b/test/support/listener.rb index d2f5dbd..d85c502 100644 --- a/test/support/listener.rb +++ b/test/support/listener.rb @@ -9,11 +9,11 @@ def initialize @data = [] end - def send_message(msg) + def send_data(msg) @data << msg end - def read_message + def receive_data nil end @@ -88,9 +88,6 @@ def setup @class.event(:hook_with_arg) {|e| log "got event: #{e.event}"} @listener.on_event_block = proc {|e| log "from on_event: #{e.event}"} - - # Establish session - @listener.receive_message(Librevox::Protocol::Response.new("Content-Length: 0\nTest: Testing", "")) end def test_add_event_hook @@ -100,7 +97,7 @@ def test_add_event_hook end end - def test_execute_callback_for_event + def test_dispatches_matching_event_hooks event "OTHER_EVENT" assert_equal "something else", @listener.read_data @@ -108,26 +105,19 @@ def test_execute_callback_for_event assert_equal "something", @listener.read_data end - def test_pass_event_as_arg_to_hook_block + def test_passes_event_to_hook_block event "HOOK_WITH_ARG" assert_equal "got event: HOOK_WITH_ARG", @listener.read_data end - def test_expose_response - event "OTHER_EVENT" - - assert_equal Librevox::Protocol::Response, @listener.response.class - assert_equal "OTHER_EVENT", @listener.response.content[:event_name] - end - - def test_call_on_event + def test_calls_on_event_for_any_event event "THIRD_EVENT" assert_equal "from on_event: THIRD_EVENT", @listener.read_data end - def test_call_event_hooks_and_on_event_on_channel_data + def test_dispatches_on_event_and_hooks_for_channel_data @listener.hook_log.clear @listener.on_event_block = proc {|e| log "on_event: CHANNEL_DATA test"} @@ -179,8 +169,21 @@ def test_multiple_api_commands module OutboundSetupHelpers include Librevox::Test::ListenerHelpers - def event_and_linger_replies + def setup_outbound(listener_class, **connect_headers) + headers = {"Unique-ID" => "1234"}.merge(connect_headers) + + @listener = listener_class.new(MockConnection.new) + @session_task = Async { @listener.run_session } + + command_reply(headers) command_reply "Reply-Text" => "+OK Events Enabled" command_reply "Reply-Text" => "+OK will linger" + + # Discard connect, myevents, linger commands + 3.times { @listener.outgoing_data.shift } + end + + def teardown_outbound + @session_task&.stop end end diff --git a/test/test_helper.rb b/test/test_helper.rb index b146af4..87e9238 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -17,24 +17,20 @@ def assert_send_nothing(obj) assert_nil obj.outgoing_data.shift end - def assert_send_application(obj, app, args = nil, **params) - headers = params - .merge( - event_lock: true, - call_command: "execute", - execute_app_name: app, - execute_app_arg: args, - ) - .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } - - assert_equal "sendmsg\n#{headers.join("\n")}", obj.outgoing_data.shift - end + def assert_execute_app(obj, app, uuid, args = nil, **params) + data = obj.outgoing_data.shift + assert data, "Expected sendmsg in outgoing data" + + @last_event_uuid = data[/event-uuid: (.+)/, 1] + assert @last_event_uuid, "Expected event-uuid header in sendmsg" + + assert_match(/\Asendmsg #{uuid}\n/, data) + assert_match(/execute-app-name: #{app}/, data) + assert_match(/execute-app-arg: #{args}/, data) if args - def assert_update_session(obj, session_id = nil) - if session_id - assert_equal "api uuid_dump #{session_id}", obj.outgoing_data.shift - else - assert_match(/^api uuid_dump \d+/, obj.outgoing_data.shift) + params.each do |key, value| + header = "#{key.to_s.tr('_', '-')}: #{value}" + assert_match(/#{Regexp.escape(header)}/, data) end end end @@ -61,7 +57,7 @@ def response(args = {}) headers["Content-Length"] = body.size if body header_str = headers.map {|k, v| "#{k}: #{v}"}.join("\n") - @listener.receive_message(Librevox::Protocol::Response.new(header_str, body.to_s)) + @listener.receive_data(Librevox::Protocol::Response.new(header_str, body.to_s)) yield_to_fibers end @@ -69,7 +65,7 @@ def event(name) body = "Event-Name: #{name}" headers = "Content-Type: text/event-plain\nContent-Length: #{body.size}" - @listener.receive_message(Librevox::Protocol::Response.new(headers, body)) + @listener.receive_data(Librevox::Protocol::Response.new(headers, body)) yield_to_fibers end @@ -77,11 +73,13 @@ def execute_complete(args = {}) # sendmsg ack — always arrives before CHANNEL_EXECUTE_COMPLETE command_reply "Reply-Text" => "+OK" - body = {"Event-Name" => "CHANNEL_EXECUTE_COMPLETE"}.merge(args) + # Use the event-uuid from the last assert_execute_app, echoing it + # back as Application-UUID just like FreeSWITCH would. + body = {"Event-Name" => "CHANNEL_EXECUTE_COMPLETE", "Application-UUID" => @last_event_uuid}.merge(args) body_str = body.map {|k,v| "#{k}: #{v}"}.join("\n") headers = "Content-Type: text/event-plain\nContent-Length: #{body_str.size}" - @listener.receive_message(Librevox::Protocol::Response.new(headers, body_str)) + @listener.receive_data(Librevox::Protocol::Response.new(headers, body_str)) yield_to_fibers end @@ -92,7 +90,7 @@ def yield_to_fibers end end - # Wraps each test method in Async { } so queue operations work. + # Wraps each test method in Async { } so promise operations work. module AsyncTest def run(...) Sync do