Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ Librevox.options[:log_file] = "librevox.log" # default: STDOUT
Librevox.options[:log_level] = Logger::DEBUG # default: Logger::INFO
```

When started with `Librevox.start`, sending `SIGHUP` to the process reopens the log file, making it compatible with `logrotate(1)`.

## Event Socket Protocol

Understanding the outbound event socket protocol is important for working on
Expand Down Expand Up @@ -245,7 +243,7 @@ finished. Application completion is signalled by a `CHANNEL_EXECUTE_COMPLETE`
event:

```
Listener → FS: sendmsg
Listener → FS: sendmsg <uuid>
call-command: execute
execute-app-name: playback
execute-app-arg: welcome.wav
Expand All @@ -270,18 +268,19 @@ being processed until the current application completes.

### Two fibers per connection

Librevox runs two fibers for each outbound connection:
Librevox runs two fibers for each connection:

- **Session fiber** (`run_session`) — runs the setup sequence and then
`session_initiated`. Each `send_message` or `application` call blocks the fiber
until the reply arrives.
- **Read fiber** (`read_loop`) — reads messages from the socket and dispatches
them to `Async::Queue` instances, waking the session fiber.

An `Async::Semaphore(1)` mutex on `send_message` ensures only one command is
in-flight at a time, so replies are always delivered to the correct caller.
This also serializes commands issued by event hooks (which run in their own
fibers) with the main session flow.
`session_initiated`. Each `send_message` or `application` call creates an
`Async::Promise`, pushes it onto an array, and blocks the fiber until the
promise is resolved.
- **Read fiber** (`each_message`) — reads messages from the socket and resolves
promises in FIFO order, waking the session fiber.

No mutex is needed — Ruby's cooperative fiber scheduling guarantees that the
promise push happens before the I/O yield point (the socket write), so
interleaving from concurrent event-hook fibers is safe. When a connection
drops, pending promises are rejected with `ConnectionError`.

## API Documentation

Expand Down
6 changes: 2 additions & 4 deletions lib/librevox.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ class ConnectionError < StandardError; end

autoload :Client, 'librevox/client'
autoload :CommandSocket, 'librevox/command_socket'
autoload :CommandDelegate, 'librevox/command_delegate'
autoload :Commands, 'librevox/commands'
autoload :Applications, 'librevox/applications'
autoload :Runner, 'librevox/runner'
autoload :Server, 'librevox/server'
autoload :Session, 'librevox/session'

module Protocol
autoload :Connection, 'librevox/protocol/connection'
Expand Down Expand Up @@ -46,10 +48,6 @@ def self.logger!
logger
end

def self.reopen_log
@logger = logger!
end

# Start a single listener:
#
# Librevox.start MyInbound
Expand Down
47 changes: 11 additions & 36 deletions lib/librevox/client.rb
Original file line number Diff line number Diff line change
@@ -1,61 +1,36 @@
# frozen_string_literal: true

require 'io/stream'
require 'io/endpoint/host_endpoint'

module Librevox
class Client
def self.start(handler, host: "localhost", port: 8021, **options)
endpoint = IO::Endpoint.tcp(host, port)
new(handler, endpoint, **options).run
end

def initialize(handler, endpoint, **options)
@handler = handler
@endpoint = endpoint
@options = options
end

def connect(socket)
stream = IO::Stream(socket)
connection = Protocol::Connection.new(stream)
listener = @handler.new(connection, @options)

handle_session(connection, listener)
end

def run
loop do
@endpoint.connect(&method(:connect))
rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError => e
rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ResponseError => e
Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s."
sleep 1
end
end

private

def handle_session(connection, listener)
read_task = start_read_loop(connection, listener)

listener.run_session

read_task.wait
ensure
read_task&.stop
connection.close
end

def start_read_loop(connection, listener)
Async do
read_messages(connection, listener)
ensure
# Close queues here (not in handle_session's ensure) so that
# a connection drop unblocks listener.run_session via nil dequeue.
# handle_session's ensure can't run until run_session returns,
# creating a deadlock if queues aren't closed from this fiber.
listener.connection_closed
end
end

def read_messages(connection, listener)
connection.read_loop do |msg|
listener.receive_message(msg)
end
def connect(socket)
connection = Protocol::Connection.new(IO::Stream(socket))
listener = @handler.new(connection, **@options)
Session.new(connection, listener).run
end
end
end
19 changes: 19 additions & 0 deletions lib/librevox/command_delegate.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module Librevox
# In some cases there are both applications and commands with the same
# name, e.g. fifo. But we can't have two `fifo`-methods, so we include
# commands in CommandDelegate, and expose all commands through the `api`
# method, which wraps a CommandDelegate instance.
class CommandDelegate
include Librevox::Commands

def initialize(listener)
@listener = listener
end

def command(*args)
@listener.send_message(super(*args))
end
end
end
22 changes: 11 additions & 11 deletions lib/librevox/command_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ module Librevox
class CommandSocket
include Librevox::Commands

def initialize(args = {})
@server = args[:server] || "127.0.0.1"
@port = args[:port] || "8021"
@auth = args[:auth] || "ClueCon"
def initialize(server: "127.0.0.1", port: "8021", auth: "ClueCon", connect: true)
@server = server
@port = port
@auth = auth

connect unless args[:connect] == false
self.connect if connect
end

def connect
Expand All @@ -23,7 +23,7 @@ def connect
end

def send_message(msg)
@connection.send_message(msg)
@connection.send_data(msg)
read_response
end

Expand All @@ -32,19 +32,19 @@ def command(*args)
end

def read_response
while msg = @connection.read_message
while msg = @connection.receive_data
return msg if msg.command_reply? || msg.api_response?
end
end

def application(uuid, app, args = nil, **params)
headers = params
.merge(
def application(app, uuid, args = nil, **params)
headers = {
event_lock: true,
call_command: "execute",
execute_app_name: app,
execute_app_arg: args,
)
}
.merge(params)
.map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" }

send_message "sendmsg #{uuid}\n#{headers.join("\n")}"
Expand Down
118 changes: 73 additions & 45 deletions lib/librevox/listener/base.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
# frozen_string_literal: true

require 'async/queue'
require 'async/semaphore'
require 'async/barrier'
require 'async/semaphore'
require 'securerandom'

module Librevox
module Listener
class Base
def initialize(connection)
@connection = connection
@reply_queue = Async::Queue.new
@command_mutex = Async::Semaphore.new(1)
@event_barrier = Async::Barrier.new
end

class << self
def hooks
@hooks ||= Hash.new {|hash, key| hash[key] = []}
Expand All @@ -24,20 +17,12 @@ def event(event, &block)
end
end

# In some cases there are both applications and commands with the same
# name, e.g. fifo. But we can't have two `fifo`-methods, so we include
# commands in CommandDelegate, and expose all commands through the `api`
# method, which wraps a CommandDelegate instance.
class CommandDelegate
include Librevox::Commands

def initialize(listener)
@listener = listener
end

def command(*args)
@listener.send_message(super(*args))
end
def initialize(connection)
@connection = connection
@reply_promises = []
@app_promises = {}
@event_barrier = Async::Barrier.new
@write_lock = Async::Semaphore.new(1)
end

# Exposes an instance of {CommandDelegate}, which includes {Librevox::Commands}.
Expand All @@ -51,55 +36,98 @@ def api
end

def send_message(msg)
@command_mutex.acquire do
@connection.send_message(msg)
reply = @reply_queue.dequeue
raise ConnectionError, "Connection closed" if reply.nil?
raise ResponseError, reply.headers[:reply_text] if reply.error?
reply
promise = Async::Promise.new

# Serialize only the enqueue+send so the FIFO order of @reply_promises
# always matches the byte order on the wire, even when concurrent event
# hooks send commands. Must not cover promise.wait — holding the lock
# while awaiting a reply would deadlock every other sender.
@write_lock.acquire do
@reply_promises << promise
@connection.send_data(msg)
end

reply = promise.wait
raise ResponseError, reply.headers[:reply_text] if reply.error?

reply
end

attr_accessor :response
def execute_app(app, uuid, args = nil, **params)
event_uuid = SecureRandom.uuid

def receive_message(response)
@response = response
handle_response
headers = {
event_lock: true,
call_command: "execute",
execute_app_name: app,
execute_app_arg: args,
event_uuid: event_uuid,
}
.merge(params)
.map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" }

promise = Async::Promise.new
@app_promises[event_uuid] = promise

send_message "sendmsg #{uuid}\n#{headers.join("\n")}"

promise.wait
end

def handle_response
def receive_message(response)
if response.reply?
@reply_queue.push(response)
@reply_promises.shift&.resolve(response)
return
end

if response.event?
resp = response
if response.event == "CHANNEL_EXECUTE_COMPLETE"
app_uuid = response.content[:application_uuid]
@app_promises.delete(app_uuid)&.resolve(response)
end

@event_barrier.async do
on_event(resp)
invoke_event_hooks(resp)
on_event(response)
invoke_event_hooks(response)
end
end
end

# override
def on_event(event)
end
def connection_closed
error = ConnectionError.new("Connection closed")

def run_session
end
@reply_promises.each { |p| p.reject(error) }
@app_promises.each_value { |p| p.reject(error) }

@reply_promises.clear
@app_promises.clear

def connection_closed
@reply_queue.close
@event_barrier.wait
rescue ConnectionError
# Expected — event hooks may have been mid-command when disconnected
end

def disconnect
@connection&.close_write
end

# Override to signal that the session has ended by protocol (not by
# socket drop). Polled synchronously by {Session} after every dispatched
# message — keep it cheap and return +true+ exactly once. When +true+,
# the reader loop exits cleanly and in-flight event hooks drain before
# the connection is closed.
def session_complete?
false
end

private

def on_event(event)
end

def run_session
end

def invoke_event_hooks(resp)
event_name = resp.event.downcase.to_sym
hooks = self.class.hooks[event_name]
Expand Down
Loading
Loading