diff --git a/lib/librevox.rb b/lib/librevox.rb index dade947..eb618b3 100644 --- a/lib/librevox.rb +++ b/lib/librevox.rb @@ -5,6 +5,7 @@ module Librevox class ResponseError < StandardError; end + class ConnectionError < StandardError; end autoload :Client, 'librevox/client' autoload :CommandSocket, 'librevox/command_socket' diff --git a/lib/librevox/client.rb b/lib/librevox/client.rb index fd35507..3fbb9df 100644 --- a/lib/librevox/client.rb +++ b/lib/librevox/client.rb @@ -10,30 +10,52 @@ def initialize(handler, endpoint, **options) @options = options end - attr :endpoint - def connect(socket) stream = IO::Stream(socket) connection = Protocol::Connection.new(stream) - listener = @handler.new(connection, @options) - session_task = Async { listener.run_session } - connection.read_loop { |msg| listener.receive_message(msg) } - ensure - session_task&.stop - connection.close + handle_session(connection, listener) end def run loop do - @endpoint.connect do |socket| - connect(socket) - end - rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET => e + @endpoint.connect(&method(:connect)) + rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError => e Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s." sleep 1 end end + + private + + def handle_session(connection, listener) + read_task = start_read_loop(connection, listener) + + listener.run_session + + read_task.wait + ensure + read_task&.stop + connection.close + end + + def start_read_loop(connection, listener) + Async do + read_messages(connection, listener) + ensure + # Close queues here (not in handle_session's ensure) so that + # a connection drop unblocks listener.run_session via nil dequeue. + # handle_session's ensure can't run until run_session returns, + # creating a deadlock if queues aren't closed from this fiber. + listener.connection_closed + end + end + + def read_messages(connection, listener) + connection.read_loop do |msg| + listener.receive_message(msg) + end + end end end diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index cf45ac1..b063de9 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -2,6 +2,7 @@ require 'async/queue' require 'async/semaphore' +require 'async/barrier' module Librevox module Listener @@ -10,6 +11,7 @@ def initialize(connection) @connection = connection @reply_queue = Async::Queue.new @command_mutex = Async::Semaphore.new(1) + @event_barrier = Async::Barrier.new end class << self @@ -52,7 +54,8 @@ def send_message(msg) @command_mutex.acquire do @connection.send_message(msg) reply = @reply_queue.dequeue - raise Librevox::ResponseError, reply.headers[:reply_text] if reply.error? + raise ConnectionError, "Connection closed" if reply.nil? + raise ResponseError, reply.headers[:reply_text] if reply.error? reply end end @@ -72,7 +75,7 @@ def handle_response if response.event? resp = response - Async do + @event_barrier.async do on_event(resp) invoke_event_hooks(resp) end @@ -86,6 +89,11 @@ def on_event(event) def run_session end + def connection_closed + @reply_queue.close + @event_barrier.wait + end + def disconnect @connection&.close_write end diff --git a/lib/librevox/listener/inbound.rb b/lib/librevox/listener/inbound.rb index 9717f2e..be7111b 100644 --- a/lib/librevox/listener/inbound.rb +++ b/lib/librevox/listener/inbound.rb @@ -26,6 +26,7 @@ def self.run(barrier, host: "localhost", port: 8021, **options) def initialize(connection, args = {}) super(connection) + @auth = args[:auth] || "ClueCon" end @@ -35,6 +36,7 @@ def run_session send_message "auth #{@auth}" events = self.class.subscribe_events || ['ALL'] + send_message "event plain #{events.join(' ')}" filters = self.class.subscribe_filters || {} diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index b6f8cd0..fa1a57b 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -28,6 +28,11 @@ def application(app, args = nil, **params) send_message "sendmsg\n#{headers.join("\n")}" response = @app_complete_queue.dequeue + + if response.nil? + raise ConnectionError, "Connection closed" + end + @session = response.content variable(variable_name) if variable_name @@ -41,19 +46,28 @@ def session_initiated def initialize(connection, options = {}) super(connection) + @session = nil @app_complete_queue = Async::Queue.new end def run_session @session = send_message("connect").headers + send_message "myevents" send_message "linger" + session_initiated - rescue Librevox::ResponseError => e + rescue ResponseError, ConnectionError, IOError, Errno::EPIPE => e Librevox.logger.error "Session error: #{e.message}" end + def connection_closed + super + + @app_complete_queue.close + end + def handle_response if response.event? && response.event == "CHANNEL_DATA" @session = response.content diff --git a/lib/librevox/protocol/connection.rb b/lib/librevox/protocol/connection.rb index bf72275..397c4b9 100644 --- a/lib/librevox/protocol/connection.rb +++ b/lib/librevox/protocol/connection.rb @@ -20,7 +20,7 @@ def read_message content = "" end - return Librevox::Protocol::Response.new(headers, content) + return Response.new(headers, content) end end diff --git a/lib/librevox/protocol/response.rb b/lib/librevox/protocol/response.rb index 85ec3ee..c5a25ce 100644 --- a/lib/librevox/protocol/response.rb +++ b/lib/librevox/protocol/response.rb @@ -28,6 +28,10 @@ def command_reply? @headers[:content_type] == "command/reply" end + def disconnect_notice? + @headers[:content_type] == "text/disconnect-notice" + end + def reply? api_response? || command_reply? end diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index 5c469a4..32fde58 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -15,16 +15,11 @@ def initialize(handler, endpoint, **options) def accept(socket, _address) stream = IO::Stream(socket) connection = Protocol::Connection.new(stream) - listener = @handler.new(connection, @options) - session_task = Async { listener.run_session } - connection.read_loop { |msg| listener.receive_message(msg) } + handle_session(connection, listener) rescue => e Librevox.logger.error "Session error: #{e.full_message}" - ensure - session_task&.stop - connection&.close end def run @@ -33,5 +28,45 @@ def run task.children.each(&:wait) end end + + private + + def handle_session(connection, listener) + read_task = start_read_loop(connection, listener) + + listener.run_session + + read_task.wait + ensure + read_task&.stop + connection.close + end + + def start_read_loop(connection, listener) + Async do + read_messages(connection, listener) + ensure + # Close queues here (not in handle_session's ensure) so that + # a connection drop unblocks listener.run_session via nil dequeue. + # handle_session's ensure can't run until run_session returns, + # creating a deadlock if queues aren't closed from this fiber. + listener.connection_closed + end + end + + def read_messages(connection, listener) + disconnecting = false + hung_up = false + + connection.read_loop do |msg| + if msg.disconnect_notice? + disconnecting = true + else + listener.receive_message(msg) + hung_up = true if msg.event? && msg.event == "CHANNEL_HANGUP_COMPLETE" + end + break if disconnecting && hung_up + end + end end end diff --git a/test/integration/inbound_disconnect_test.rb b/test/integration/inbound_disconnect_test.rb new file mode 100644 index 0000000..4b54849 --- /dev/null +++ b/test/integration/inbound_disconnect_test.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +require_relative '../test_helper' + +require 'librevox/listener/inbound' +require 'librevox/client' +require 'async' +require 'io/endpoint' +require 'timeout' + +class InboundDisconnectListener < Librevox::Listener::Inbound +end + +class TestInboundReconnectAfterMidSessionDrop < Minitest::Test + def test_reconnects_after_connection_drop_during_handshake + tcp_server = TCPServer.new("127.0.0.1", 0) + port = tcp_server.local_address.ip_port + tcp_server.close + + connection_count = 0 + + # Fake FS: accept connections, reply to auth, then drop before event reply + fs_thread = Thread.new do + server = TCPServer.new("127.0.0.1", port) + 2.times do + socket = server.accept + # auth + socket.gets("\n\n") + socket.write("Content-Type: command/reply\nReply-Text: +OK accepted\n\n") + # event subscribe — read the command but drop instead of replying + socket.gets("\n\n") + + connection_count += 1 + socket.close + end + server.close + end + + client_thread = Thread.new do + Sync do + endpoint = IO::Endpoint.tcp("127.0.0.1", port) + client = Librevox::Client.new(InboundDisconnectListener, endpoint, auth: "ClueCon") + client.run + end + end + + # Wait for at least 2 connections (proves reconnect worked) + Timeout.timeout(5) do + sleep 0.1 until connection_count >= 2 + end + + assert connection_count >= 2, "Expected at least 2 connections (reconnect), got #{connection_count}" + ensure + client_thread&.kill + client_thread&.join(1) + fs_thread&.kill + fs_thread&.join(1) + end +end diff --git a/test/integration/outbound_disconnect_test.rb b/test/integration/outbound_disconnect_test.rb index f8806f3..32c0fbf 100644 --- a/test/integration/outbound_disconnect_test.rb +++ b/test/integration/outbound_disconnect_test.rb @@ -7,6 +7,34 @@ require 'async' require 'io/endpoint' require 'io/stream' +require 'timeout' + +class BlockedOnAppListener < Librevox::Listener::Outbound + attr_reader :error + + def session_initiated + sample_app "playback", "/tmp/test.wav" + rescue Librevox::ConnectionError => e + @error = e + end +end + +class NormalLifecycleListener < Librevox::Listener::Outbound + attr_reader :initiated + + def session_initiated + @initiated = true + end +end + +SLOW_HOOK_RESULT = Queue.new + +class SlowEventHookListener < Librevox::Listener::Outbound + event(:channel_hangup) do + sleep 0.2 + SLOW_HOOK_RESULT << :completed + end +end class DisconnectFromSessionListener < Librevox::Listener::Outbound def session_initiated @@ -57,6 +85,15 @@ def fake_fs_connect(port) socket end + def send_hangup_complete(socket) + body = "Event-Name: CHANNEL_HANGUP_COMPLETE" + socket.write("Content-Type: text/event-plain\nContent-Length: #{body.size}\n\n#{body}") + end + + def send_disconnect_notice(socket) + socket.write("Content-Type: text/disconnect-notice\n\n") + end + def assert_connection_closed(socket) ready = IO.select([socket], nil, nil, 3) if ready @@ -69,6 +106,101 @@ def assert_connection_closed(socket) end end +class TestEventHookCompletion < Minitest::Test + include DisconnectTestHelpers + + def setup + SLOW_HOOK_RESULT.clear + end + + def test_event_hooks_complete_before_connection_closes + port, server_thread = start_server(SlowEventHookListener) + socket = fake_fs_connect(port) + + # Send CHANNEL_HANGUP event — triggers slow hook (0.2s sleep) + body = "Event-Name: CHANNEL_HANGUP" + socket.write("Content-Type: text/event-plain\nContent-Length: #{body.size}\n\n#{body}") + + # Immediately trigger latch — both conditions met + send_hangup_complete(socket) + send_disconnect_notice(socket) + + # Connection should stay open until the slow hook finishes + assert_connection_closed(socket) + + # Hook must have completed before the connection closed + result = SLOW_HOOK_RESULT.pop(true) rescue nil + assert_equal :completed, result, "Event hook was killed before completing" + ensure + socket&.close + server_thread&.kill + server_thread&.join(1) + end +end + +class TestNormalLifecycle < Minitest::Test + include DisconnectTestHelpers + + def test_clean_shutdown_on_hangup_complete_and_disconnect_notice + port, server_thread = start_server(NormalLifecycleListener) + socket = fake_fs_connect(port) + + send_hangup_complete(socket) + send_disconnect_notice(socket) + + assert_connection_closed(socket) + ensure + socket&.close + server_thread&.kill + server_thread&.join(1) + end + + def test_clean_shutdown_disconnect_notice_before_hangup_complete + port, server_thread = start_server(NormalLifecycleListener) + socket = fake_fs_connect(port) + + send_disconnect_notice(socket) + send_hangup_complete(socket) + + assert_connection_closed(socket) + ensure + socket&.close + server_thread&.kill + server_thread&.join(1) + end +end + +class TestConnectionDrop < Minitest::Test + include DisconnectTestHelpers + + def test_connection_drop_unblocks_blocked_send_message + port, server_thread = start_server(BlockedOnAppListener) + socket = fake_fs_connect(port) + + # session_initiated calls sample_app which sends sendmsg and blocks on app_complete_queue + msg = socket.gets("\n\n") + assert_match(/sendmsg/, msg) + + # ack the sendmsg + socket.write("Content-Type: command/reply\nReply-Text: +OK\n\n") + + # Now the listener is blocked on app_complete_queue.dequeue + # Drop the connection — should unblock via queue close + socket.close + socket = nil + + # Server should accept another connection without hanging, + # proving the first connection was cleaned up properly + socket2 = Timeout.timeout(3) { TCPSocket.new("127.0.0.1", port) } + assert socket2, "Server accepted new connection after drop" + ensure + socket&.close + socket2&.close + server_thread&.kill + server_thread&.join(1) + end +end + class TestDisconnectFromSession < Minitest::Test include DisconnectTestHelpers