diff --git a/README.md b/README.md index c377f6b..f901b27 100644 --- a/README.md +++ b/README.md @@ -212,8 +212,6 @@ Librevox.options[:log_file] = "librevox.log" # default: STDOUT Librevox.options[:log_level] = Logger::DEBUG # default: Logger::INFO ``` -When started with `Librevox.start`, sending `SIGHUP` to the process reopens the log file, making it compatible with `logrotate(1)`. - ## Event Socket Protocol Understanding the outbound event socket protocol is important for working on @@ -245,7 +243,7 @@ finished. Application completion is signalled by a `CHANNEL_EXECUTE_COMPLETE` event: ``` -Listener → FS: sendmsg +Listener → FS: sendmsg call-command: execute execute-app-name: playback execute-app-arg: welcome.wav @@ -270,18 +268,19 @@ being processed until the current application completes. ### Two fibers per connection -Librevox runs two fibers for each outbound connection: +Librevox runs two fibers for each connection: - **Session fiber** (`run_session`) — runs the setup sequence and then - `session_initiated`. Each `send_message` or `application` call blocks the fiber - until the reply arrives. -- **Read fiber** (`read_loop`) — reads messages from the socket and dispatches - them to `Async::Queue` instances, waking the session fiber. - -An `Async::Semaphore(1)` mutex on `send_message` ensures only one command is -in-flight at a time, so replies are always delivered to the correct caller. -This also serializes commands issued by event hooks (which run in their own -fibers) with the main session flow. + `session_initiated`. Each `send_message` or `application` call creates an + `Async::Promise`, pushes it onto an array, and blocks the fiber until the + promise is resolved. +- **Read fiber** (`each_message`) — reads messages from the socket and resolves + promises in FIFO order, waking the session fiber. + +No mutex is needed — Ruby's cooperative fiber scheduling guarantees that the +promise push happens before the I/O yield point (the socket write), so +interleaving from concurrent event-hook fibers is safe. When a connection +drops, pending promises are rejected with `ConnectionError`. ## API Documentation diff --git a/lib/librevox.rb b/lib/librevox.rb index eb618b3..703fc58 100644 --- a/lib/librevox.rb +++ b/lib/librevox.rb @@ -9,10 +9,12 @@ class ConnectionError < StandardError; end autoload :Client, 'librevox/client' autoload :CommandSocket, 'librevox/command_socket' + autoload :CommandDelegate, 'librevox/command_delegate' autoload :Commands, 'librevox/commands' autoload :Applications, 'librevox/applications' autoload :Runner, 'librevox/runner' autoload :Server, 'librevox/server' + autoload :Session, 'librevox/session' module Protocol autoload :Connection, 'librevox/protocol/connection' @@ -46,10 +48,6 @@ def self.logger! logger end - def self.reopen_log - @logger = logger! - end - # Start a single listener: # # Librevox.start MyInbound diff --git a/lib/librevox/client.rb b/lib/librevox/client.rb index 3fbb9df..98d2f75 100644 --- a/lib/librevox/client.rb +++ b/lib/librevox/client.rb @@ -1,27 +1,25 @@ # frozen_string_literal: true require 'io/stream' +require 'io/endpoint/host_endpoint' module Librevox class Client + def self.start(handler, host: "localhost", port: 8021, **options) + endpoint = IO::Endpoint.tcp(host, port) + new(handler, endpoint, **options).run + end + def initialize(handler, endpoint, **options) @handler = handler @endpoint = endpoint @options = options end - def connect(socket) - stream = IO::Stream(socket) - connection = Protocol::Connection.new(stream) - listener = @handler.new(connection, @options) - - handle_session(connection, listener) - end - def run loop do @endpoint.connect(&method(:connect)) - rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError => e + rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ResponseError => e Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s." sleep 1 end @@ -29,33 +27,10 @@ def run private - def handle_session(connection, listener) - read_task = start_read_loop(connection, listener) - - listener.run_session - - read_task.wait - ensure - read_task&.stop - connection.close - end - - def start_read_loop(connection, listener) - Async do - read_messages(connection, listener) - ensure - # Close queues here (not in handle_session's ensure) so that - # a connection drop unblocks listener.run_session via nil dequeue. - # handle_session's ensure can't run until run_session returns, - # creating a deadlock if queues aren't closed from this fiber. - listener.connection_closed - end - end - - def read_messages(connection, listener) - connection.read_loop do |msg| - listener.receive_message(msg) - end + def connect(socket) + connection = Protocol::Connection.new(IO::Stream(socket)) + listener = @handler.new(connection, **@options) + Session.new(connection, listener).run end end end diff --git a/lib/librevox/command_delegate.rb b/lib/librevox/command_delegate.rb new file mode 100644 index 0000000..71e0670 --- /dev/null +++ b/lib/librevox/command_delegate.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Librevox + # In some cases there are both applications and commands with the same + # name, e.g. fifo. But we can't have two `fifo`-methods, so we include + # commands in CommandDelegate, and expose all commands through the `api` + # method, which wraps a CommandDelegate instance. + class CommandDelegate + include Librevox::Commands + + def initialize(listener) + @listener = listener + end + + def command(*args) + @listener.send_message(super(*args)) + end + end +end diff --git a/lib/librevox/command_socket.rb b/lib/librevox/command_socket.rb index d77f5de..3d4d2e7 100644 --- a/lib/librevox/command_socket.rb +++ b/lib/librevox/command_socket.rb @@ -7,12 +7,12 @@ module Librevox class CommandSocket include Librevox::Commands - def initialize(args = {}) - @server = args[:server] || "127.0.0.1" - @port = args[:port] || "8021" - @auth = args[:auth] || "ClueCon" + def initialize(server: "127.0.0.1", port: "8021", auth: "ClueCon", connect: true) + @server = server + @port = port + @auth = auth - connect unless args[:connect] == false + self.connect if connect end def connect @@ -23,7 +23,7 @@ def connect end def send_message(msg) - @connection.send_message(msg) + @connection.send_data(msg) read_response end @@ -32,19 +32,19 @@ def command(*args) end def read_response - while msg = @connection.read_message + while msg = @connection.receive_data return msg if msg.command_reply? || msg.api_response? end end - def application(uuid, app, args = nil, **params) - headers = params - .merge( + def application(app, uuid, args = nil, **params) + headers = { event_lock: true, call_command: "execute", execute_app_name: app, execute_app_arg: args, - ) + } + .merge(params) .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } send_message "sendmsg #{uuid}\n#{headers.join("\n")}" diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index b063de9..4b96631 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -1,19 +1,12 @@ # frozen_string_literal: true -require 'async/queue' -require 'async/semaphore' require 'async/barrier' +require 'async/semaphore' +require 'securerandom' module Librevox module Listener class Base - def initialize(connection) - @connection = connection - @reply_queue = Async::Queue.new - @command_mutex = Async::Semaphore.new(1) - @event_barrier = Async::Barrier.new - end - class << self def hooks @hooks ||= Hash.new {|hash, key| hash[key] = []} @@ -24,20 +17,12 @@ def event(event, &block) end end - # In some cases there are both applications and commands with the same - # name, e.g. fifo. But we can't have two `fifo`-methods, so we include - # commands in CommandDelegate, and expose all commands through the `api` - # method, which wraps a CommandDelegate instance. - class CommandDelegate - include Librevox::Commands - - def initialize(listener) - @listener = listener - end - - def command(*args) - @listener.send_message(super(*args)) - end + def initialize(connection) + @connection = connection + @reply_promises = [] + @app_promises = {} + @event_barrier = Async::Barrier.new + @write_lock = Async::Semaphore.new(1) end # Exposes an instance of {CommandDelegate}, which includes {Librevox::Commands}. @@ -51,55 +36,98 @@ def api end def send_message(msg) - @command_mutex.acquire do - @connection.send_message(msg) - reply = @reply_queue.dequeue - raise ConnectionError, "Connection closed" if reply.nil? - raise ResponseError, reply.headers[:reply_text] if reply.error? - reply + promise = Async::Promise.new + + # Serialize only the enqueue+send so the FIFO order of @reply_promises + # always matches the byte order on the wire, even when concurrent event + # hooks send commands. Must not cover promise.wait — holding the lock + # while awaiting a reply would deadlock every other sender. + @write_lock.acquire do + @reply_promises << promise + @connection.send_data(msg) end + + reply = promise.wait + raise ResponseError, reply.headers[:reply_text] if reply.error? + + reply end - attr_accessor :response + def execute_app(app, uuid, args = nil, **params) + event_uuid = SecureRandom.uuid - def receive_message(response) - @response = response - handle_response + headers = { + event_lock: true, + call_command: "execute", + execute_app_name: app, + execute_app_arg: args, + event_uuid: event_uuid, + } + .merge(params) + .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } + + promise = Async::Promise.new + @app_promises[event_uuid] = promise + + send_message "sendmsg #{uuid}\n#{headers.join("\n")}" + + promise.wait end - def handle_response + def receive_message(response) if response.reply? - @reply_queue.push(response) + @reply_promises.shift&.resolve(response) return end if response.event? - resp = response + if response.event == "CHANNEL_EXECUTE_COMPLETE" + app_uuid = response.content[:application_uuid] + @app_promises.delete(app_uuid)&.resolve(response) + end + @event_barrier.async do - on_event(resp) - invoke_event_hooks(resp) + on_event(response) + invoke_event_hooks(response) end end end - # override - def on_event(event) - end + def connection_closed + error = ConnectionError.new("Connection closed") - def run_session - end + @reply_promises.each { |p| p.reject(error) } + @app_promises.each_value { |p| p.reject(error) } + + @reply_promises.clear + @app_promises.clear - def connection_closed - @reply_queue.close @event_barrier.wait + rescue ConnectionError + # Expected — event hooks may have been mid-command when disconnected end def disconnect @connection&.close_write end + # Override to signal that the session has ended by protocol (not by + # socket drop). Polled synchronously by {Session} after every dispatched + # message — keep it cheap and return +true+ exactly once. When +true+, + # the reader loop exits cleanly and in-flight event hooks drain before + # the connection is closed. + def session_complete? + false + end + private + def on_event(event) + end + + def run_session + end + def invoke_event_hooks(resp) event_name = resp.event.downcase.to_sym hooks = self.class.hooks[event_name] diff --git a/lib/librevox/listener/inbound.rb b/lib/librevox/listener/inbound.rb index be7111b..480d6cd 100644 --- a/lib/librevox/listener/inbound.rb +++ b/lib/librevox/listener/inbound.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require 'io/endpoint/host_endpoint' - module Librevox module Listener class Inbound < Base @@ -18,16 +16,14 @@ def filters(filters) end end - def self.run(barrier, host: "localhost", port: 8021, **options) - endpoint = IO::Endpoint.tcp(host, port) - client = Client.new(self, endpoint, **options) - barrier.async { client.run } + def self.start(...) + Client.start(self, ...) end - def initialize(connection, args = {}) + def initialize(connection, auth: "ClueCon", **) super(connection) - @auth = args[:auth] || "ClueCon" + @auth = auth end def run_session diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index fa1a57b..26ff77e 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -1,54 +1,21 @@ # frozen_string_literal: true -require 'io/endpoint/host_endpoint' - module Librevox module Listener class Outbound < Base include Librevox::Applications - def self.run(barrier, host: "localhost", port: 8084, **options) - endpoint = IO::Endpoint.tcp(host, port, **options) - server = Server.new(self, endpoint) - barrier.async { server.run } - end - - def application(app, args = nil, **params) - variable_name = params.delete(:variable) - - headers = params - .merge( - event_lock: true, - call_command: "execute", - execute_app_name: app, - execute_app_arg: args, - ) - .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } - - send_message "sendmsg\n#{headers.join("\n")}" - - response = @app_complete_queue.dequeue - - if response.nil? - raise ConnectionError, "Connection closed" - end - - @session = response.content - - variable(variable_name) if variable_name + def self.start(...) + Server.start(self, ...) end attr_accessor :session - # Called when a new session is initiated. - def session_initiated - end - - def initialize(connection, options = {}) + def initialize(connection, **) super(connection) - @session = nil - @app_complete_queue = Async::Queue.new + @disconnecting = false + @hung_up = false end def run_session @@ -58,33 +25,42 @@ def run_session send_message "linger" session_initiated - rescue ResponseError, ConnectionError, IOError, Errno::EPIPE => e - Librevox.logger.error "Session error: #{e.message}" end - def connection_closed - super - - @app_complete_queue.close + # Called when a new session is initiated. + def session_initiated end - def handle_response - if response.event? && response.event == "CHANNEL_DATA" - @session = response.content - elsif response.event? && response.event == "CHANNEL_EXECUTE_COMPLETE" - @app_complete_queue.push(response) - end + def application(app, args = nil, **params) + variable_name = params.delete(:variable) - super + response = execute_app(app, session[:unique_id], args, **params) + + @session = response.content + + variable(variable_name) if variable_name end def variable(name) session[:"variable_#{name}"] end - def update_session - response = api.command "uuid_dump", session[:unique_id] - @session = response.content + # FreeSWITCH signals end-of-session with disconnect-notice + a final + # CHANNEL_HANGUP_COMPLETE event. Either may arrive first. Once both + # have been seen #session_complete? returns true and Session exits + # the reader loop — after any in-flight event hooks have drained. + def receive_message(response) + if response.disconnect_notice? + @disconnecting = true + else + @session = response.content if response.event == "CHANNEL_DATA" + super + @hung_up = true if response.event == "CHANNEL_HANGUP_COMPLETE" + end + end + + def session_complete? + @disconnecting && @hung_up end end end diff --git a/lib/librevox/protocol/connection.rb b/lib/librevox/protocol/connection.rb index 397c4b9..554e924 100644 --- a/lib/librevox/protocol/connection.rb +++ b/lib/librevox/protocol/connection.rb @@ -7,7 +7,7 @@ def initialize(stream) @stream = stream end - def read_message + def receive_data loop do headers = @stream.read_until("\n\n") return nil if headers.nil? @@ -24,29 +24,26 @@ def read_message end end - def read_loop - while (msg = read_message) + def each_message + while (msg = receive_data) yield msg end end - def send_message(msg) - @stream.write("#{msg}\n\n") - @stream.flush + def send_data(msg) + @stream.write("#{msg}\n\n", flush: true) end def close_write @stream.close_write - rescue IOError, Errno::ENOTCONN - # Already closed or not connected + rescue IOError, Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN + # Already closed or remote hung up. end def close - return if @stream.closed? - @stream.close - rescue Errno::EPIPE, Errno::ECONNRESET - # Remote end already closed + rescue IOError, Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN + # Already closed or remote hung up. end end end diff --git a/lib/librevox/runner.rb b/lib/librevox/runner.rb index 97c7ae2..6596974 100644 --- a/lib/librevox/runner.rb +++ b/lib/librevox/runner.rb @@ -31,7 +31,9 @@ def initialize(barrier) end def run(klass, **args) - klass.run(@barrier, **args) + @barrier.async do + klass.start(**args) + end end end end diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index 32fde58..ba43cbe 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -1,72 +1,35 @@ # frozen_string_literal: true require 'io/stream' +require 'io/endpoint/host_endpoint' module Librevox class Server + attr_reader :endpoint + + def self.start(handler, host: "localhost", port: 8084, **options) + endpoint = IO::Endpoint.tcp(host, port) + new(handler, endpoint, **options).run + end + def initialize(handler, endpoint, **options) @handler = handler @endpoint = endpoint @options = options end - attr :endpoint - - def accept(socket, _address) - stream = IO::Stream(socket) - connection = Protocol::Connection.new(stream) - listener = @handler.new(connection, @options) - - handle_session(connection, listener) - rescue => e - Librevox.logger.error "Session error: #{e.full_message}" - end - def run - Async do |task| - @endpoint.accept(&method(:accept)) - task.children.each(&:wait) - end + @endpoint.accept(&method(:accept)) end private - def handle_session(connection, listener) - read_task = start_read_loop(connection, listener) - - listener.run_session - - read_task.wait - ensure - read_task&.stop - connection.close - end - - def start_read_loop(connection, listener) - Async do - read_messages(connection, listener) - ensure - # Close queues here (not in handle_session's ensure) so that - # a connection drop unblocks listener.run_session via nil dequeue. - # handle_session's ensure can't run until run_session returns, - # creating a deadlock if queues aren't closed from this fiber. - listener.connection_closed - end - end - - def read_messages(connection, listener) - disconnecting = false - hung_up = false - - connection.read_loop do |msg| - if msg.disconnect_notice? - disconnecting = true - else - listener.receive_message(msg) - hung_up = true if msg.event? && msg.event == "CHANNEL_HANGUP_COMPLETE" - end - break if disconnecting && hung_up - end + def accept(socket, _address) + connection = Protocol::Connection.new(IO::Stream(socket)) + listener = @handler.new(connection, **@options) + Session.new(connection, listener).run + rescue => e + Librevox.logger.error "Session error: #{e.message}" end end end diff --git a/lib/librevox/session.rb b/lib/librevox/session.rb new file mode 100644 index 0000000..0d40755 --- /dev/null +++ b/lib/librevox/session.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +module Librevox + # Runs one event-socket session on a connection: spawns a reader fiber that + # dispatches incoming messages to the listener, runs the listener's session + # logic in the calling fiber, and joins them on disconnect. + class Session + def initialize(connection, listener) + @connection = connection + @listener = listener + end + + def run + reader = start_reader + @listener.run_session + reader.wait + rescue ConnectionError + # Expected when the connection drops. + ensure + reader&.stop + @connection.close + end + + private + + # Reject pending promises inside the reader's ensure (not run's) so a + # connection drop unblocks run_session via promise rejection. run's + # ensure can't fire until run_session returns — that would deadlock. + def start_reader + Async do + @connection.each_message do |msg| + @listener.receive_message(msg) + break if @listener.session_complete? + end + ensure + @listener.connection_closed + end + end + end +end diff --git a/test/functional/librevox/command_socket_test.rb b/test/functional/librevox/command_socket_test.rb new file mode 100644 index 0000000..a38972c --- /dev/null +++ b/test/functional/librevox/command_socket_test.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +require_relative '../../test_helper' + +require 'librevox/command_socket' + +class TestCommandSocket < Minitest::Test + def setup + @server = TCPServer.new("127.0.0.1", 0) + @port = @server.local_address.ip_port + end + + def teardown + @socket&.close + @fs&.close + @server&.close + end + + def test_sends_api_command_and_returns_response + connect + + thread = Thread.new { @socket.status } + + assert_equal "api status", read_message + reply "api/response", "OK" + + response = thread.value + assert response.api_response? + assert_equal "OK", response.content + end + + def test_sends_application_via_sendmsg + connect + + thread = Thread.new { @socket.application("playback", "1234-abcd", "/tmp/test.wav") } + + msg = read_message + + assert_match(/\Asendmsg 1234-abcd\n/, msg) + assert_match(/execute-app-name: playback/, msg) + assert_match(/execute-app-arg: \/tmp\/test.wav/, msg) + assert_match(/event-lock: true/, msg) + + reply "command/reply", "+OK" + + thread.value + end + + private + + def connect + thread = Thread.new { @socket = Librevox::CommandSocket.new(port: @port) } + + @fs = @server.accept + assert_equal "auth ClueCon", read_message + reply "command/reply", "+OK accepted" + + thread.join(2) + end + + def reply(content_type, body) + @fs.write("Content-Type: #{content_type}\nContent-Length: #{body.size}\n\n#{body}") + end + + def read_message + @fs.gets("\n\n")&.strip + end +end diff --git a/test/functional/librevox/listener/base_test.rb b/test/functional/librevox/listener/base_test.rb index 36918a7..7892439 100644 --- a/test/functional/librevox/listener/base_test.rb +++ b/test/functional/librevox/listener/base_test.rb @@ -10,7 +10,7 @@ def setup @listener = @class.new(MockConnection.new) end - # Without Async in handle_response, on_event calling api.* deadlocks the + # Without Async in receive_data, on_event calling api.* deadlocks the # calling fiber. A Thread timeout is the only way to detect this — all # Async fibers are stuck so Async-level timeouts can't fire. def test_on_event_with_api_does_not_block_handle_response diff --git a/test/functional/librevox/listener/concurrency_test.rb b/test/functional/librevox/listener/concurrency_test.rb new file mode 100644 index 0000000..b65eadf --- /dev/null +++ b/test/functional/librevox/listener/concurrency_test.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +require_relative '../../../test_helper' + +require 'librevox/listener/base' + +# A connection whose #send_data yields in the middle of "sending", the way the +# real Async::Stream flush yields mid-write. This is what makes the write-lock's +# job observable: without serialization a second fiber can start a send before +# the first one finished. The plain MockConnection can never expose this because +# its #send_data never yields. +class InterleaveDetectingConnection + attr_reader :data, :overlaps + + def initialize + @data = [] + @sending = false + @overlaps = 0 + end + + def send_data(msg) + @overlaps += 1 if @sending # someone else is mid-send -> interleaving + @sending = true + Async::Task.current.yield # simulate flush handing control to the reactor + @data << msg + @sending = false + end + + def close; end + def close_write; end +end + +class TestSendSerialization < Minitest::Test + prepend Librevox::Test::AsyncTest + + def setup + @conn = InterleaveDetectingConnection.new + @listener = Librevox::Listener::Base.new(@conn) + end + + # Invariant #1: a send completes before another begins. + def test_concurrent_senders_do_not_interleave_on_the_wire + a = Async { @listener.send_message("A") } + b = Async { @listener.send_message("B") } + + Async::Task.current.yield until @conn.data.size == 2 + + assert_equal 0, @conn.overlaps, + "a send started before the previous one finished — writes interleaved" + assert_equal ["A", "B"], @conn.data, "sends left the fiber in spawn order" + ensure + a&.stop + b&.stop + end + + # Invariant #2: the Nth reply resolves the Nth sender, under concurrency. + def test_concurrent_replies_route_in_send_order + results = {} + a = Async { results[:a] = @listener.send_message("A") } + b = Async { results[:b] = @listener.send_message("B") } + + Async::Task.current.yield until @conn.data.size == 2 + + reply!("+OK first") + reply!("+OK second") + + Async::Task.current.yield until results.size == 2 + + assert_equal "+OK first", results[:a].headers[:reply_text] + assert_equal "+OK second", results[:b].headers[:reply_text] + ensure + a&.stop + b&.stop + end + + # Invariant #4: app completions route by Event-UUID, even reversed. + def test_out_of_order_app_completions_route_by_uuid + results = {} + a = Async { results[:a] = @listener.execute_app("foo", "uuid-1") } + b = Async { results[:b] = @listener.execute_app("bar", "uuid-1") } + + # Each execute_app first sends its sendmsg and waits for the +OK ack. + Async::Task.current.yield until @conn.data.size == 2 + uuid_a = event_uuid(@conn.data[0]) + uuid_b = event_uuid(@conn.data[1]) + refute_equal uuid_a, uuid_b + + reply!("+OK") # ack for A's sendmsg + reply!("+OK") # ack for B's sendmsg + + # Completions arrive in REVERSE order — B before A. + complete!(uuid_b, marker: "B-done") + complete!(uuid_a, marker: "A-done") + + Async::Task.current.yield until results.size == 2 + + assert_equal "B-done", results[:b].content[:marker] + assert_equal "A-done", results[:a].content[:marker] + ensure + a&.stop + b&.stop + end + + # Invariant #5/#6: a drop rejects every pending promise and unblocks waiters; + # the write-lock never wraps promise.wait, so this cannot deadlock. + def test_disconnect_rejects_all_pending_senders + errors = [] + a = Async do + @listener.send_message("A") + rescue Librevox::ConnectionError => e + errors << e + end + b = Async do + @listener.send_message("B") + rescue Librevox::ConnectionError => e + errors << e + end + + Async::Task.current.yield until @conn.data.size == 2 + + @listener.connection_closed + + Async::Task.current.yield until errors.size == 2 + assert_equal 2, errors.size, "both blocked senders were unblocked by the drop" + ensure + a&.stop + b&.stop + end + + private + + def reply!(text) + @listener.receive_message( + Librevox::Protocol::Response.new("Content-Type: command/reply\nReply-Text: #{text}", "") + ) + end + + def event_uuid(sendmsg) + sendmsg[/event-uuid: (.+)/, 1] + end + + def complete!(uuid, marker:) + body = "Event-Name: CHANNEL_EXECUTE_COMPLETE\nApplication-UUID: #{uuid}\nmarker: #{marker}" + headers = "Content-Type: text/event-plain\nContent-Length: #{body.bytesize}" + @listener.receive_message(Librevox::Protocol::Response.new(headers, body)) + end +end diff --git a/test/functional/librevox/listener/inbound_filter_test.rb b/test/functional/librevox/listener/inbound_filter_test.rb index e6e4d4a..d41a43d 100644 --- a/test/functional/librevox/listener/inbound_filter_test.rb +++ b/test/functional/librevox/listener/inbound_filter_test.rb @@ -36,7 +36,7 @@ def teardown super end - def test_authorize_and_subscribe_to_events + def test_sends_auth_events_and_filters assert_equal "auth ClueCon", @listener.outgoing_data.shift assert_equal "event plain CUSTOM CHANNEL_EXECUTE", @listener.outgoing_data.shift assert_equal "filter Caller-Context default", @listener.outgoing_data.shift diff --git a/test/functional/librevox/listener/inbound_test.rb b/test/functional/librevox/listener/inbound_test.rb index 24d9d54..407ce77 100644 --- a/test/functional/librevox/listener/inbound_test.rb +++ b/test/functional/librevox/listener/inbound_test.rb @@ -29,7 +29,7 @@ def teardown super end - def test_authorize_and_subscribe_to_events + def test_sends_auth_and_event_subscription assert_equal "auth ClueCon", @listener.outgoing_data.shift assert_equal "event plain ALL", @listener.outgoing_data.shift assert_nil @listener.outgoing_data.shift diff --git a/test/functional/librevox/listener/outbound_api_test.rb b/test/functional/librevox/listener/outbound_api_test.rb index 13659bc..21a56f8 100644 --- a/test/functional/librevox/listener/outbound_api_test.rb +++ b/test/functional/librevox/listener/outbound_api_test.rb @@ -12,33 +12,27 @@ def session_initiated end end -class TestOutboundListenerWithAppsAndApi < Minitest::Test +class TestOutboundAppsAndApi < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithAppsAndApi.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithAppsAndApi end def teardown - @session_task&.stop + teardown_outbound super end - def test_wait_for_execute_complete_before_calling_next_app_or_cmd - assert_send_application @listener, "foo" - execute_complete + def test_waits_for_execute_complete_before_api_and_next_app + assert_execute_app @listener, "foo", "1234" + execute_complete "Unique-ID" => "1234" assert_send_command @listener, "api bar" api_response body: "+OK" - assert_send_application @listener, "baz" + assert_execute_app @listener, "baz", "1234" end end diff --git a/test/functional/librevox/listener/outbound_apps_test.rb b/test/functional/librevox/listener/outbound_apps_test.rb index 7a861c9..4faf8c7 100644 --- a/test/functional/librevox/listener/outbound_apps_test.rb +++ b/test/functional/librevox/listener/outbound_apps_test.rb @@ -4,7 +4,7 @@ require 'librevox/listener/outbound' -class OutboundListenerWithNestedApps < Librevox::Listener::Outbound +class OutboundListenerWithSequentialApps < Librevox::Listener::Outbound def session_initiated sample_app "foo" sample_app "bar" @@ -23,38 +23,32 @@ def session_initiated end end -class TestOutboundListenerWithApps < Minitest::Test +class TestOutboundSequentialApps < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithNestedApps.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithSequentialApps end def teardown - @session_task&.stop + teardown_outbound super end - def test_only_send_one_app_at_a_time - assert_send_application @listener, "foo" + def test_sends_one_app_at_a_time + assert_execute_app @listener, "foo", "1234" assert_send_nothing @listener - execute_complete + execute_complete "Unique-ID" => "1234" - assert_send_application @listener, "bar" + assert_execute_app @listener, "bar", "1234" assert_send_nothing @listener end - def test_not_be_driven_forward_by_events - assert_send_application @listener, "foo" + def test_events_do_not_advance_app + assert_execute_app @listener, "foo", "1234" command_reply body: { "Event-Name" => "CHANNEL_EXECUTE", @@ -64,16 +58,16 @@ def test_not_be_driven_forward_by_events assert_send_nothing @listener end - def test_not_be_driven_forward_by_api_responses - assert_send_application @listener, "foo" + def test_api_responses_do_not_advance_app + assert_execute_app @listener, "foo", "1234" api_response body: "Foo" assert_send_nothing @listener end - def test_not_be_driven_forward_by_disconnect_notifications - assert_send_application @listener, "foo" + def test_disconnect_notices_do_not_advance_app + assert_execute_app @listener, "foo", "1234" response "Content-Type" => "text/disconnect-notice", body: "Lingering" @@ -81,8 +75,8 @@ def test_not_be_driven_forward_by_disconnect_notifications assert_send_nothing @listener end - def test_not_be_driven_forward_by_command_reply - assert_send_application @listener, "foo" + def test_command_replies_do_not_advance_app + assert_execute_app @listener, "foo", "1234" command_reply "Reply-Text" => "+OK" @@ -90,52 +84,40 @@ def test_not_be_driven_forward_by_command_reply end end -class TestOutboundListenerWithCustomHeaders < Minitest::Test +class TestOutboundCustomHeaders < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithCustomHeaders.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithCustomHeaders end def teardown - @session_task&.stop + teardown_outbound super end def test_sends_custom_headers - assert_send_application @listener, "playback", "/tmp/test.wav", loops: 3 + assert_execute_app @listener, "playback", "1234", "/tmp/test.wav", loops: 3 end end -class TestOutboundListenerWithEventLockOverride < Minitest::Test +class TestOutboundEventLockOverride < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithEventLockOverride.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} + setup_outbound OutboundListenerWithEventLockOverride end def teardown - @session_task&.stop + teardown_outbound super end def test_overrides_event_lock - assert_send_application @listener, "playback", "/tmp/test.wav", event_lock: false + assert_execute_app @listener, "playback", "1234", "/tmp/test.wav", event_lock: false end end diff --git a/test/functional/librevox/listener/outbound_data_passing_test.rb b/test/functional/librevox/listener/outbound_data_passing_test.rb new file mode 100644 index 0000000..9c54525 --- /dev/null +++ b/test/functional/librevox/listener/outbound_data_passing_test.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +require_relative '../../../test_helper' + +require 'librevox/listener/outbound' + +class OutboundListenerWithDataPassing < Librevox::Listener::Outbound + def session_initiated + sample_app "foo" + data = reader_app + application "send", "the end: #{data}" + end +end + +class TestOutboundDataPassing < Minitest::Test + prepend Librevox::Test::AsyncTest + include OutboundSetupHelpers + include Librevox::Test::Matchers + + def setup + setup_outbound OutboundListenerWithDataPassing + end + + def teardown + teardown_outbound + super + end + + def test_passes_data_between_sequential_apps + assert_execute_app @listener, "foo", "1234" + execute_complete "Unique-ID" => "1234" + + assert_execute_app @listener, "reader_app", "1234" + execute_complete "variable_app_var" => "Second", "Unique-ID" => "1234" + + assert_execute_app @listener, "send", "1234", "the end: Second" + end +end diff --git a/test/functional/librevox/listener/outbound_error_test.rb b/test/functional/librevox/listener/outbound_error_test.rb index fe0ece6..1f20878 100644 --- a/test/functional/librevox/listener/outbound_error_test.rb +++ b/test/functional/librevox/listener/outbound_error_test.rb @@ -26,24 +26,17 @@ class TestOutboundApplicationError < Minitest::Test include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithErrorApp.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times { @listener.outgoing_data.shift } + setup_outbound OutboundListenerWithErrorApp end def teardown - @session_task&.stop + teardown_outbound super end def test_application_raises_on_error_reply - assert_send_application @listener, "fail" + assert_execute_app @listener, "fail", "1234" - # sendmsg ack with error — raises instead of blocking on app_complete_queue command_reply "Reply-Text" => "-ERR invalid command" assert_instance_of Librevox::ResponseError, @listener.error @@ -56,36 +49,20 @@ class TestOutboundUnhandledApplicationError < Minitest::Test include OutboundSetupHelpers include Librevox::Test::Matchers - def setup - @listener = OutboundListenerWithUnhandledErrorApp.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Establish-Session" => "OK", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times { @listener.outgoing_data.shift } - end - - def teardown - @session_task&.stop - super - end - - def test_unhandled_error_ends_session_cleanly - assert_send_application @listener, "fail" - - log = StringIO.new - original_logger = Librevox.logger - Librevox.logger = Logger.new(log) - - # sendmsg ack with error — run_session rescues and logs, no crash - command_reply "Reply-Text" => "-ERR invalid command" - - # session task completes without raising - @session_task.wait - - assert_match(/-ERR invalid command/, log.string) - ensure - Librevox.logger = original_logger + def test_unhandled_error_propagates + setup_outbound OutboundListenerWithUnhandledErrorApp + + assert_execute_app @listener, "fail", "1234" + + # Send error reply and wait on task without yielding in between, + # so async doesn't log an unhandled exception warning. + error_reply = Librevox::Protocol::Response.new( + "Content-Type: command/reply\nReply-Text: -ERR invalid command", "" + ) + error = assert_raises(Librevox::ResponseError) do + @listener.receive_message(error_reply) + @session_task.wait + end + assert_equal "-ERR invalid command", error.message end end diff --git a/test/functional/librevox/listener/outbound_handshake_test.rb b/test/functional/librevox/listener/outbound_handshake_test.rb new file mode 100644 index 0000000..8458b8f --- /dev/null +++ b/test/functional/librevox/listener/outbound_handshake_test.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require_relative '../../../test_helper' + +require 'librevox/listener/outbound' + +class TestOutboundHandshakeOrdering < Minitest::Test + prepend Librevox::Test::AsyncTest + include Librevox::Test::ListenerHelpers + include Librevox::Test::Matchers + + def setup + @listener = Librevox::Listener::Outbound.new(MockConnection.new) + @session_task = Async { @listener.run_session } + end + + def teardown + @session_task&.stop + super + end + + def test_session_initiated_fires_on_linger_reply_not_myevents_reply + # connect response sets session + command_reply "Unique-ID" => "1234" + + # myevents reply — run_session progresses to linger command, but + # session_initiated has NOT been called yet + @listener.outgoing_data.clear + command_reply "Reply-Text" => "+OK Events Enabled" + # linger command was sent (run_session progressed), but no session_initiated yet + assert_send_command @listener, "linger" + assert_send_nothing @listener + + # linger reply — NOW session_initiated fires + command_reply "Reply-Text" => "+OK will linger" + end +end diff --git a/test/functional/librevox/listener/outbound_non_nested_test.rb b/test/functional/librevox/listener/outbound_non_nested_test.rb deleted file mode 100644 index f55c628..0000000 --- a/test/functional/librevox/listener/outbound_non_nested_test.rb +++ /dev/null @@ -1,45 +0,0 @@ -# frozen_string_literal: true - -require_relative '../../../test_helper' - -require 'librevox/listener/outbound' - -class OutboundListenerWithNonNestedApps < Librevox::Listener::Outbound - attr_reader :queue - def session_initiated - sample_app "foo" - data = reader_app - application "send", "the end: #{data}" - end -end - -class TestOutboundListenerWithNonNestedApps < Minitest::Test - prepend Librevox::Test::AsyncTest - include OutboundSetupHelpers - include Librevox::Test::Matchers - - def setup - @listener = OutboundListenerWithNonNestedApps.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} - end - - def teardown - @session_task&.stop - super - end - - def test_wait_for_execute_complete_before_calling_next_app - assert_send_application @listener, "foo" - execute_complete "Unique-ID" => "1234" - - assert_send_application @listener, "reader_app" - execute_complete "variable_app_var" => "Second" - - assert_send_application @listener, "send", "the end: Second" - end -end diff --git a/test/functional/librevox/listener/outbound_reader_test.rb b/test/functional/librevox/listener/outbound_reader_test.rb index b08d1c4..2d7c5eb 100644 --- a/test/functional/librevox/listener/outbound_reader_test.rb +++ b/test/functional/librevox/listener/outbound_reader_test.rb @@ -11,41 +11,34 @@ def session_initiated end end -class TestOutboundListenerWithAppReadingData < Minitest::Test +class TestOutboundAppReadingData < Minitest::Test prepend Librevox::Test::AsyncTest include OutboundSetupHelpers include Librevox::Test::Matchers def setup - @listener = OutboundListenerWithReader.new(MockConnection.new) - @session_task = Async { @listener.run_session } - - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} - - assert_send_application @listener, "reader_app" + setup_outbound OutboundListenerWithReader + assert_execute_app @listener, "reader_app", "1234" end def teardown - @session_task&.stop + teardown_outbound super end - def test_not_send_anything_while_missing_response + def test_blocks_until_execute_complete assert_send_nothing @listener end - def test_update_session_from_execute_complete - execute_complete "Session-Var" => "Second" + def test_updates_session_from_execute_complete + execute_complete "Session-Var" => "Second", "Unique-ID" => "1234" assert_equal "Second", @listener.session[:session_var] end - def test_return_value_of_channel_variable - execute_complete "variable_app_var" => "Second" + def test_returns_channel_variable_value + execute_complete "variable_app_var" => "Second", "Unique-ID" => "1234" - assert_send_application @listener, "send", "Second" + assert_execute_app @listener, "send", "1234", "Second" end end diff --git a/test/functional/librevox/listener/outbound_session_test.rb b/test/functional/librevox/listener/outbound_session_test.rb deleted file mode 100644 index 211c111..0000000 --- a/test/functional/librevox/listener/outbound_session_test.rb +++ /dev/null @@ -1,78 +0,0 @@ -# frozen_string_literal: true - -require_relative '../../../test_helper' - -require 'librevox/listener/outbound' - -class OutboundListenerWithUpdateSessionCallback < Librevox::Listener::Outbound - def session_initiated - update_session - application "send", "yay, #{session[:session_var]}" - end -end - -class TestOutboundListenerWithUpdateSessionCallback < Minitest::Test - prepend Librevox::Test::AsyncTest - include OutboundSetupHelpers - include Librevox::Test::Matchers - - def setup - @listener = OutboundListenerWithUpdateSessionCallback.new(MockConnection.new) - @session_task = Async { @listener.run_session } - command_reply "Session-Var" => "First", - "Unique-ID" => "1234" - event_and_linger_replies - 3.times {@listener.outgoing_data.shift} - - assert_update_session @listener - api_response body: { - "Event-Name" => "CHANNEL_DATA", - "Session-Var" => "Second" - } - end - - def teardown - @session_task&.stop - super - end - - def test_execute_callback - assert_match(/yay,/, @listener.outgoing_data.shift) - end - - def test_update_session_before_calling_callback - assert_send_application @listener, "send", "yay, Second" - end -end - -class TestOutboundReplyQueueOrdering < Minitest::Test - prepend Librevox::Test::AsyncTest - include Librevox::Test::ListenerHelpers - include Librevox::Test::Matchers - - def setup - @listener = Librevox::Listener::Outbound.new(MockConnection.new) - @session_task = Async { @listener.run_session } - end - - def teardown - @session_task&.stop - super - end - - def test_session_initiated_fires_on_linger_reply_not_myevents_reply - # connect response sets session - command_reply "Unique-ID" => "1234" - - # myevents reply — run_session progresses to linger command, but - # session_initiated has NOT been called yet - @listener.outgoing_data.clear - command_reply "Reply-Text" => "+OK Events Enabled" - # linger command was sent (run_session progressed), but no session_initiated yet - assert_send_command @listener, "linger" - assert_send_nothing @listener - - # linger reply — NOW session_initiated fires - command_reply "Reply-Text" => "+OK will linger" - end -end diff --git a/test/functional/librevox/listener/outbound_test.rb b/test/functional/librevox/listener/outbound_test.rb index 35babe6..0523f26 100644 --- a/test/functional/librevox/listener/outbound_test.rb +++ b/test/functional/librevox/listener/outbound_test.rb @@ -10,23 +10,20 @@ def session_initiated end end -class TestOutboundListener < Minitest::Test +class TestOutboundHandshake < Minitest::Test prepend Librevox::Test::AsyncTest - include OutboundSetupHelpers + include Librevox::Test::ListenerHelpers include Librevox::Test::Matchers - include EventTests - include ApiCommandTests def setup @listener = OutboundTestListener.new(MockConnection.new) @session_task = Async { @listener.run_session } - command_reply( - "Caller-Caller-Id-Number" => "8675309", - "Unique-ID" => "1234", - "variable_some_var" => "some value" - ) - event_and_linger_replies - super + + command_reply "Caller-Caller-Id-Number" => "8675309", + "Unique-ID" => "1234", + "variable_some_var" => "some value" + command_reply "Reply-Text" => "+OK Events Enabled" + command_reply "Reply-Text" => "+OK will linger" end def teardown @@ -34,25 +31,41 @@ def teardown super end - def test_connect_to_freeswitch_and_subscribe_to_events - assert_send_command @listener, "connect" - assert_send_command @listener, "myevents" - assert_send_command @listener, "linger" + def test_sends_connect_myevents_linger_in_order + assert_equal "connect", @listener.outgoing_data.shift + assert_equal "myevents", @listener.outgoing_data.shift + assert_equal "linger", @listener.outgoing_data.shift + assert_nil @listener.outgoing_data.shift end - def test_establish_a_session + def test_establishes_session_from_connect_reply assert_equal Hash, @listener.session.class + assert_equal "8675309", @listener.session[:caller_caller_id_number] end - def test_call_session_callback_after_establishing_new_session + def test_calls_session_initiated_after_handshake assert_includes @listener.hook_log, "session was initiated" end - def test_make_headers_available_through_session - assert_equal "8675309", @listener.session[:caller_caller_id_number] + def test_exposes_channel_variables + assert_equal "some value", @listener.variable(:some_var) end +end - def test_make_channel_variables_available_through_variable - assert_equal "some value", @listener.variable(:some_var) +class TestOutboundEvents < Minitest::Test + prepend Librevox::Test::AsyncTest + include OutboundSetupHelpers + include Librevox::Test::Matchers + include EventTests + include ApiCommandTests + + def setup + setup_outbound OutboundTestListener + super + end + + def teardown + teardown_outbound + super end end diff --git a/test/functional/librevox/protocol/connection_test.rb b/test/functional/librevox/protocol/connection_test.rb index f9413a1..3a576d9 100644 --- a/test/functional/librevox/protocol/connection_test.rb +++ b/test/functional/librevox/protocol/connection_test.rb @@ -16,43 +16,43 @@ def teardown @write_io.close unless @write_io.closed? end - def test_read_headers_only_message + def test_receives_headers_only_message @write_io.write "Content-Type: command/reply\nReply-Text: +OK\n\n" @write_io.close - msg = @connection.read_message + msg = @connection.receive_data assert_instance_of Librevox::Protocol::Response, msg assert_equal "command/reply", msg.headers[:content_type] assert_equal "+OK", msg.headers[:reply_text] end - def test_read_message_with_content + def test_receive_data_with_content body = "Event-Name: HEARTBEAT" @write_io.write "Content-Length: #{body.size}\n\n#{body}\n\n" @write_io.close - msg = @connection.read_message + msg = @connection.receive_data assert_instance_of Librevox::Protocol::Response, msg assert_equal body.size.to_s, msg.headers[:content_length] assert_equal "HEARTBEAT", msg.content[:event_name] end - def test_read_multiple_messages + def test_receives_multiple_messages @write_io.write "Content-Type: command/reply\n\n" @write_io.write "Content-Type: api/response\n\n" @write_io.close - msg1 = @connection.read_message + msg1 = @connection.receive_data assert_equal "command/reply", msg1.headers[:content_type] - msg2 = @connection.read_message + msg2 = @connection.receive_data assert_equal "api/response", msg2.headers[:content_type] end def test_eof_returns_nil @write_io.close - assert_nil @connection.read_message + assert_nil @connection.receive_data end def test_skips_empty_header_blocks_after_content @@ -60,31 +60,31 @@ def test_skips_empty_header_blocks_after_content @write_io.write "Content-Length: #{body.size}\n\n#{body}\n\n" @write_io.close - msg = @connection.read_message + msg = @connection.receive_data assert_equal "TEST", msg.content[:event_name] # The trailing \n\n after content creates an empty block which should be skipped - assert_nil @connection.read_message + assert_nil @connection.receive_data end - def test_read_loop_yields_each_message + def test_each_message_yields_each_response @write_io.write "Content-Type: command/reply\n\n" @write_io.write "Content-Type: api/response\n\n" @write_io.close messages = [] - @connection.read_loop { |msg| messages << msg } + @connection.each_message { |msg| messages << msg } assert_equal 2, messages.size assert_equal "command/reply", messages[0].headers[:content_type] assert_equal "api/response", messages[1].headers[:content_type] end - def test_send_message_writes_with_terminator + def test_send_data_writes_with_terminator write_stream = IO::Stream(@write_io) conn = Librevox::Protocol::Connection.new(write_stream) - conn.send_message("auth ClueCon") + conn.send_data("auth ClueCon") write_stream.close assert_equal "auth ClueCon\n\n", @read_io.read diff --git a/test/integration/outbound_disconnect_test.rb b/test/integration/outbound_disconnect_test.rb index 32c0fbf..5b002d1 100644 --- a/test/integration/outbound_disconnect_test.rb +++ b/test/integration/outbound_disconnect_test.rb @@ -10,12 +10,8 @@ require 'timeout' class BlockedOnAppListener < Librevox::Listener::Outbound - attr_reader :error - def session_initiated sample_app "playback", "/tmp/test.wav" - rescue Librevox::ConnectionError => e - @error = e end end @@ -177,15 +173,15 @@ def test_connection_drop_unblocks_blocked_send_message port, server_thread = start_server(BlockedOnAppListener) socket = fake_fs_connect(port) - # session_initiated calls sample_app which sends sendmsg and blocks on app_complete_queue + # session_initiated calls sample_app which sends sendmsg and blocks on app promise msg = socket.gets("\n\n") assert_match(/sendmsg/, msg) # ack the sendmsg socket.write("Content-Type: command/reply\nReply-Text: +OK\n\n") - # Now the listener is blocked on app_complete_queue.dequeue - # Drop the connection — should unblock via queue close + # Now the listener is blocked on app promise + # Drop the connection — should unblock via promise rejection socket.close socket = nil diff --git a/test/integration/write_path_test.rb b/test/integration/write_path_test.rb new file mode 100644 index 0000000..a784831 --- /dev/null +++ b/test/integration/write_path_test.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +require_relative '../test_helper' + +require 'librevox/protocol/connection' +require 'librevox/listener/base' +require 'socket' +require 'io/stream' + +# End-to-end coverage of the send/receive path over a real socket: drives +# Base#send_message -> the real Protocol::Connection -> a real OS socket -> +# the real receive_data framing on the peer, with replies routed back through +# the listener's reader. Nothing else exercises Connection over a live fd. +# +# Note: this does NOT prove the write-lock's value — empirically, the real +# IO::Stream does not interleave concurrent writes even at 1 MB, so these tests +# pass with the lock removed too. The lock's behavior is guarded by the unit +# test in test/functional/librevox/listener/concurrency_test.rb (which forces +# a mid-send yield the real stream does not actually perform). +class TestRealSocketWritePath < Minitest::Test + prepend Librevox::Test::AsyncTest + + def setup + @sock_a, @sock_b = Socket.pair(:UNIX, :STREAM) + end + + def teardown + @sock_a&.close + @sock_b&.close + end + + # Framing round-trips: a reply written on one end reconstructs exactly on the + # other (read_until "\n\n" + Content-Length body). + def test_framing_round_trips_over_a_real_socket + sender = Librevox::Protocol::Connection.new(IO::Stream(@sock_a)) + receiver = Librevox::Protocol::Connection.new(IO::Stream(@sock_b)) + + body = "Event-Name: CHANNEL_DATA\nUnique-ID: abc" + sender.send_data("Content-Type: text/event-plain\nContent-Length: #{body.bytesize}\n\n#{body}") + + msg = receiver.receive_data + + assert msg.event? + assert_equal "CHANNEL_DATA", msg.event + assert_equal "abc", msg.content[:unique_id] + end + + # Many fibers send concurrently; the peer must see every command as a whole, + # uncorrupted frame in send order, and every sender must get its reply back + # via FIFO routing through the listener's real reader loop. + def test_concurrent_sends_arrive_whole_and_ordered + n = 8 + listener_conn = Librevox::Protocol::Connection.new(IO::Stream(@sock_a)) + peer = Librevox::Protocol::Connection.new(IO::Stream(@sock_b)) + listener = Librevox::Listener::Base.new(listener_conn) + + received = [] + + # Listener-side reader, exactly like Session: dispatch replies so senders + # waiting on their promise unblock. + reader = Async do + listener_conn.each_message { |m| listener.receive_message(m) } + end + + # Peer plays FreeSWITCH: read each framed command, echo one +OK back. + peer_task = Async do + n.times do + msg = peer.receive_data + received << msg.headers[:command] + peer.send_data("Content-Type: command/reply\nReply-Text: +OK") + end + end + + # Fire n concurrent senders. Each blocks in send_message until its reply. + senders = (0...n).map do |i| + Async { listener.send_message("Command: CMD-#{i}") } + end + senders.each(&:wait) + peer_task.wait + + expected = (0...n).map { |i| "CMD-#{i}" } + assert_equal expected, received, + "commands must arrive whole and in send order over the real socket" + ensure + reader&.stop + peer_task&.stop + senders&.each(&:stop) + end +end diff --git a/test/support/listener.rb b/test/support/listener.rb index d2f5dbd..d85c502 100644 --- a/test/support/listener.rb +++ b/test/support/listener.rb @@ -9,11 +9,11 @@ def initialize @data = [] end - def send_message(msg) + def send_data(msg) @data << msg end - def read_message + def receive_data nil end @@ -88,9 +88,6 @@ def setup @class.event(:hook_with_arg) {|e| log "got event: #{e.event}"} @listener.on_event_block = proc {|e| log "from on_event: #{e.event}"} - - # Establish session - @listener.receive_message(Librevox::Protocol::Response.new("Content-Length: 0\nTest: Testing", "")) end def test_add_event_hook @@ -100,7 +97,7 @@ def test_add_event_hook end end - def test_execute_callback_for_event + def test_dispatches_matching_event_hooks event "OTHER_EVENT" assert_equal "something else", @listener.read_data @@ -108,26 +105,19 @@ def test_execute_callback_for_event assert_equal "something", @listener.read_data end - def test_pass_event_as_arg_to_hook_block + def test_passes_event_to_hook_block event "HOOK_WITH_ARG" assert_equal "got event: HOOK_WITH_ARG", @listener.read_data end - def test_expose_response - event "OTHER_EVENT" - - assert_equal Librevox::Protocol::Response, @listener.response.class - assert_equal "OTHER_EVENT", @listener.response.content[:event_name] - end - - def test_call_on_event + def test_calls_on_event_for_any_event event "THIRD_EVENT" assert_equal "from on_event: THIRD_EVENT", @listener.read_data end - def test_call_event_hooks_and_on_event_on_channel_data + def test_dispatches_on_event_and_hooks_for_channel_data @listener.hook_log.clear @listener.on_event_block = proc {|e| log "on_event: CHANNEL_DATA test"} @@ -179,8 +169,21 @@ def test_multiple_api_commands module OutboundSetupHelpers include Librevox::Test::ListenerHelpers - def event_and_linger_replies + def setup_outbound(listener_class, **connect_headers) + headers = {"Unique-ID" => "1234"}.merge(connect_headers) + + @listener = listener_class.new(MockConnection.new) + @session_task = Async { @listener.run_session } + + command_reply(headers) command_reply "Reply-Text" => "+OK Events Enabled" command_reply "Reply-Text" => "+OK will linger" + + # Discard connect, myevents, linger commands + 3.times { @listener.outgoing_data.shift } + end + + def teardown_outbound + @session_task&.stop end end diff --git a/test/test_helper.rb b/test/test_helper.rb index b146af4..c1abd98 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -17,24 +17,20 @@ def assert_send_nothing(obj) assert_nil obj.outgoing_data.shift end - def assert_send_application(obj, app, args = nil, **params) - headers = params - .merge( - event_lock: true, - call_command: "execute", - execute_app_name: app, - execute_app_arg: args, - ) - .map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" } - - assert_equal "sendmsg\n#{headers.join("\n")}", obj.outgoing_data.shift - end + def assert_execute_app(obj, app, uuid, args = nil, **params) + data = obj.outgoing_data.shift + assert data, "Expected sendmsg in outgoing data" + + @last_event_uuid = data[/event-uuid: (.+)/, 1] + assert @last_event_uuid, "Expected event-uuid header in sendmsg" + + assert_match(/\Asendmsg #{uuid}\n/, data) + assert_match(/execute-app-name: #{app}/, data) + assert_match(/execute-app-arg: #{args}/, data) if args - def assert_update_session(obj, session_id = nil) - if session_id - assert_equal "api uuid_dump #{session_id}", obj.outgoing_data.shift - else - assert_match(/^api uuid_dump \d+/, obj.outgoing_data.shift) + params.each do |key, value| + header = "#{key.to_s.tr('_', '-')}: #{value}" + assert_match(/#{Regexp.escape(header)}/, data) end end end @@ -77,7 +73,9 @@ def execute_complete(args = {}) # sendmsg ack — always arrives before CHANNEL_EXECUTE_COMPLETE command_reply "Reply-Text" => "+OK" - body = {"Event-Name" => "CHANNEL_EXECUTE_COMPLETE"}.merge(args) + # Use the event-uuid from the last assert_execute_app, echoing it + # back as Application-UUID just like FreeSWITCH would. + body = {"Event-Name" => "CHANNEL_EXECUTE_COMPLETE", "Application-UUID" => @last_event_uuid}.merge(args) body_str = body.map {|k,v| "#{k}: #{v}"}.join("\n") headers = "Content-Type: text/event-plain\nContent-Length: #{body_str.size}" @@ -92,7 +90,7 @@ def yield_to_fibers end end - # Wraps each test method in Async { } so queue operations work. + # Wraps each test method in Async { } so promise operations work. module AsyncTest def run(...) Sync do