Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/librevox.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

module Librevox
class ResponseError < StandardError; end
class ConnectionError < StandardError; end

autoload :Client, 'librevox/client'
autoload :CommandSocket, 'librevox/command_socket'
Expand Down
46 changes: 34 additions & 12 deletions lib/librevox/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,52 @@ def initialize(handler, endpoint, **options)
@options = options
end

attr :endpoint

def connect(socket)
stream = IO::Stream(socket)
connection = Protocol::Connection.new(stream)

listener = @handler.new(connection, @options)

session_task = Async { listener.run_session }
connection.read_loop { |msg| listener.receive_message(msg) }
ensure
session_task&.stop
connection.close
handle_session(connection, listener)
end

def run
loop do
@endpoint.connect do |socket|
connect(socket)
end
rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET => e
@endpoint.connect(&method(:connect))
rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError => e
Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s."
sleep 1
end
end

private

def handle_session(connection, listener)
read_task = start_read_loop(connection, listener)

listener.run_session

read_task.wait
ensure
read_task&.stop
connection.close
end

def start_read_loop(connection, listener)
Async do
read_messages(connection, listener)
ensure
# Close queues here (not in handle_session's ensure) so that
# a connection drop unblocks listener.run_session via nil dequeue.
# handle_session's ensure can't run until run_session returns,
# creating a deadlock if queues aren't closed from this fiber.
listener.connection_closed
end
end

def read_messages(connection, listener)
connection.read_loop do |msg|
listener.receive_message(msg)
end
end
end
end
12 changes: 10 additions & 2 deletions lib/librevox/listener/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'async/queue'
require 'async/semaphore'
require 'async/barrier'

module Librevox
module Listener
Expand All @@ -10,6 +11,7 @@ def initialize(connection)
@connection = connection
@reply_queue = Async::Queue.new
@command_mutex = Async::Semaphore.new(1)
@event_barrier = Async::Barrier.new
end

class << self
Expand Down Expand Up @@ -52,7 +54,8 @@ def send_message(msg)
@command_mutex.acquire do
@connection.send_message(msg)
reply = @reply_queue.dequeue
raise Librevox::ResponseError, reply.headers[:reply_text] if reply.error?
raise ConnectionError, "Connection closed" if reply.nil?
raise ResponseError, reply.headers[:reply_text] if reply.error?
reply
end
end
Expand All @@ -72,7 +75,7 @@ def handle_response

if response.event?
resp = response
Async do
@event_barrier.async do
on_event(resp)
invoke_event_hooks(resp)
end
Expand All @@ -86,6 +89,11 @@ def on_event(event)
def run_session
end

def connection_closed
@reply_queue.close
@event_barrier.wait
end

def disconnect
@connection&.close_write
end
Expand Down
2 changes: 2 additions & 0 deletions lib/librevox/listener/inbound.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def self.run(barrier, host: "localhost", port: 8021, **options)

def initialize(connection, args = {})
super(connection)

@auth = args[:auth] || "ClueCon"
end

Expand All @@ -35,6 +36,7 @@ def run_session
send_message "auth #{@auth}"

events = self.class.subscribe_events || ['ALL']

send_message "event plain #{events.join(' ')}"

filters = self.class.subscribe_filters || {}
Expand Down
16 changes: 15 additions & 1 deletion lib/librevox/listener/outbound.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ def application(app, args = nil, **params)
send_message "sendmsg\n#{headers.join("\n")}"

response = @app_complete_queue.dequeue

if response.nil?
raise ConnectionError, "Connection closed"
end

@session = response.content

variable(variable_name) if variable_name
Expand All @@ -41,19 +46,28 @@ def session_initiated

def initialize(connection, options = {})
super(connection)

@session = nil
@app_complete_queue = Async::Queue.new
end

def run_session
@session = send_message("connect").headers

send_message "myevents"
send_message "linger"

session_initiated
rescue Librevox::ResponseError => e
rescue ResponseError, ConnectionError, IOError, Errno::EPIPE => e
Librevox.logger.error "Session error: #{e.message}"
end

def connection_closed
super

@app_complete_queue.close
end

def handle_response
if response.event? && response.event == "CHANNEL_DATA"
@session = response.content
Expand Down
2 changes: 1 addition & 1 deletion lib/librevox/protocol/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def read_message
content = ""
end

return Librevox::Protocol::Response.new(headers, content)
return Response.new(headers, content)
end
end

Expand Down
4 changes: 4 additions & 0 deletions lib/librevox/protocol/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def command_reply?
@headers[:content_type] == "command/reply"
end

def disconnect_notice?
@headers[:content_type] == "text/disconnect-notice"
end

def reply?
api_response? || command_reply?
end
Expand Down
47 changes: 41 additions & 6 deletions lib/librevox/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@ def initialize(handler, endpoint, **options)
def accept(socket, _address)
stream = IO::Stream(socket)
connection = Protocol::Connection.new(stream)

listener = @handler.new(connection, @options)

session_task = Async { listener.run_session }
connection.read_loop { |msg| listener.receive_message(msg) }
handle_session(connection, listener)
rescue => e
Librevox.logger.error "Session error: #{e.full_message}"
ensure
session_task&.stop
connection&.close
end

def run
Expand All @@ -33,5 +28,45 @@ def run
task.children.each(&:wait)
end
end

private

def handle_session(connection, listener)
read_task = start_read_loop(connection, listener)

listener.run_session

read_task.wait
ensure
read_task&.stop
connection.close
end

def start_read_loop(connection, listener)
Async do
read_messages(connection, listener)
ensure
# Close queues here (not in handle_session's ensure) so that
# a connection drop unblocks listener.run_session via nil dequeue.
# handle_session's ensure can't run until run_session returns,
# creating a deadlock if queues aren't closed from this fiber.
listener.connection_closed
end
end

def read_messages(connection, listener)
disconnecting = false
hung_up = false

connection.read_loop do |msg|
if msg.disconnect_notice?
disconnecting = true
else
listener.receive_message(msg)
hung_up = true if msg.event? && msg.event == "CHANNEL_HANGUP_COMPLETE"
end
break if disconnecting && hung_up
end
end
end
end
59 changes: 59 additions & 0 deletions test/integration/inbound_disconnect_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# frozen_string_literal: true

require_relative '../test_helper'

require 'librevox/listener/inbound'
require 'librevox/client'
require 'async'
require 'io/endpoint'
require 'timeout'

class InboundDisconnectListener < Librevox::Listener::Inbound
end

class TestInboundReconnectAfterMidSessionDrop < Minitest::Test
def test_reconnects_after_connection_drop_during_handshake
tcp_server = TCPServer.new("127.0.0.1", 0)
port = tcp_server.local_address.ip_port
tcp_server.close

connection_count = 0

# Fake FS: accept connections, reply to auth, then drop before event reply
fs_thread = Thread.new do
server = TCPServer.new("127.0.0.1", port)
2.times do
socket = server.accept
# auth
socket.gets("\n\n")
socket.write("Content-Type: command/reply\nReply-Text: +OK accepted\n\n")
# event subscribe — read the command but drop instead of replying
socket.gets("\n\n")

connection_count += 1
socket.close
end
server.close
end

client_thread = Thread.new do
Sync do
endpoint = IO::Endpoint.tcp("127.0.0.1", port)
client = Librevox::Client.new(InboundDisconnectListener, endpoint, auth: "ClueCon")
client.run
end
end

# Wait for at least 2 connections (proves reconnect worked)
Timeout.timeout(5) do
sleep 0.1 until connection_count >= 2
end

assert connection_count >= 2, "Expected at least 2 connections (reconnect), got #{connection_count}"
ensure
client_thread&.kill
client_thread&.join(1)
fs_thread&.kill
fs_thread&.join(1)
end
end
Loading
Loading