Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions CODEBASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ src/cli_agent_orchestrator/
├── api/ # Entry Point: HTTP API
│ └── main.py # FastAPI endpoints (port 9889)
├── services/ # Service Layer: Business logic
│ ├── event_bus.py # Pub/sub event routing with wildcard topic matching
│ ├── fifo_reader.py # Publisher: terminal.{id}.output (FIFO → event bus)
│ ├── status_monitor.py # Consumer: terminal.{id}.output → Publisher: terminal.{id}.status
│ ├── log_writer.py # Consumer: terminal.{id}.output (writes debug logs)
│ ├── inbox_service.py # Consumer: terminal.{id}.status (delivers queued messages)
│ ├── session_service.py # List, get, delete sessions
│ ├── terminal_service.py# Create, get, send input (+ mark_input_received), get output, delete terminals
│ ├── inbox_service.py # Terminal-to-terminal messaging with watchdog
│ ├── terminal_service.py# Create, get, send input, get output, delete terminals
│ └── flow_service.py # Scheduled flow execution
├── clients/ # Client Layer: External systems
│ ├── tmux.py # Tmux operations (sets CAO_TERMINAL_ID, send_keys, send_keys_via_paste for bracketed paste)
Expand Down Expand Up @@ -117,7 +121,7 @@ provider_manager.create_provider()
provider.initialize() # Waits for shell (all providers), sends command, waits for IDLE
inbox_service.register_terminal() # Starts watchdog observer
fifo_manager.create_reader(terminal_id) # Starts FIFO reader thread
Returns Terminal model
```
Expand All @@ -133,9 +137,9 @@ database.create_inbox_message() # Status: PENDING
inbox_service.check_and_send_pending_messages()
If receiver IDLE → send immediately
If receiver PROCESSING → watchdog monitors log file
If receiver PROCESSING → DeliveryConsumer waits for status event
On log change → detect IDLE pattern → send message
On status change to IDLE → DeliveryConsumer delivers message
Update message status: DELIVERED
```
Expand Down
2 changes: 1 addition & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ Send a message to another terminal's inbox.
**Behavior:**
- Messages are queued and delivered when the receiver terminal is IDLE
- Messages are delivered in order (oldest first)
- Delivery is automatic via watchdog file monitoring
- Delivery is automatic via event-driven status detection

---

Expand Down
138 changes: 138 additions & 0 deletions docs/event-driven-architecture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Event-Driven Architecture

## Overview

CAO uses an event-driven architecture for terminal output processing, status detection, and inbox message delivery. Terminal output streams through a pipeline of components connected by an in-process pub/sub event bus.

## Architecture

```
┌───────────────────┐ publish ┌──────────────────────────┐ subscribe ┌─────────────┐
│ FifoReader │────────────▶│ EVENT BUS │────────────▶│ LogWriter │
│ (thread) │ terminal. │ │ terminal. │ (async) │
│ │ {id}. │ pub/sub with wildcard │ {id}. │ │
│ tmux pipe-pane │ output │ topic matching │ output │ writes to │
│ ▼ Named FIFO │ │ │ │ log files │
│ ▼ os.read() │ │ │ └─────────────┘
└───────────────────┘ │ │
│ │ subscribe ┌───────────────┐
│ │────────────▶│ StatusMonitor │
│ │ terminal. │ (async) │
│ │ {id}. │ │
│ │ output │ rolling buffer│
│ │ │ + detection │
│ │◀────────────│ │
│ │ publish └───────────────┘
│ │ terminal.
│ │ {id}.
│ │ status
│ │
│ │ subscribe ┌─────────────┐
│ │────────────▶│InboxService │
│ │ terminal. │ (async) │
│ │ {id}. │ │
│ │ status │ delivers │
└──────────────────────────┘ │ messages │
└─────────────┘
```

```mermaid
graph LR
subgraph FifoReader ["FifoReader (thread)"]
FR1[tmux pipe-pane]
FR2[Named FIFO]
FR3[os.read]
FR1 --> FR2 --> FR3
end

EB["EVENT BUS — pub/sub with wildcard topic matching"]

subgraph LogWriter ["LogWriter (async)"]
LW[writes to log files]
end

subgraph StatusMonitor ["StatusMonitor (async)"]
SM[rolling buffer + detection]
end

subgraph InboxService ["InboxService (async)"]
IS[delivers messages]
end

FifoReader -- "terminal.{id}.output" --> EB
EB -- "terminal.{id}.output" --> LogWriter
EB -- "terminal.{id}.output" --> StatusMonitor
StatusMonitor -- "terminal.{id}.status" --> EB
EB -- "terminal.{id}.status" --> InboxService
```

All inter-service communication flows through the event bus. No service calls another service directly for event processing — the bus is the sole brokering mechanism.

## Event Bus (`services/event_bus.py`)

The event bus is the **central brokering mechanism** that connects all publishers and consumers. It implements an in-process pub/sub router with wildcard topic matching, thread-safe publishing, and async consumption via `asyncio.Queue`.

Every component in the pipeline communicates exclusively through the event bus — publishers never call consumers directly. This decouples components, allows new consumers to be added without modifying publishers, and ensures a clear data flow through the system.

**Topics:**

| Topic | Publisher | Consumers |
|-------|----------|-----------|
| `terminal.{id}.output` | FifoReader | StatusMonitor, LogWriter |
| `terminal.{id}.status` | StatusMonitor | InboxService |

**Subscription patterns:**

- Exact: `terminal.abc12345.output`
- Wildcard: `terminal.*.output` (matches any terminal ID)

**Thread safety:** Publishers call `bus.publish()` from any thread. The event bus uses `loop.call_soon_threadsafe()` to dispatch events into the asyncio event loop registered at startup via `bus.set_loop()`.

## Component Roles

Each service has a clearly defined role as a **publisher**, **consumer**, or **both**:

| Component | Role | Subscribes To | Publishes To |
|-----------|------|---------------|--------------|
| **FifoReader** | Publisher only | — (reads from OS FIFO) | `terminal.{id}.output` |
| **StatusMonitor** | Publisher + Consumer | `terminal.*.output` | `terminal.{id}.status` |
| **LogWriter** | Consumer only | `terminal.*.output` | — |
| **InboxService** | Consumer only | `terminal.*.status` | — (delivers via `send_input`) |

- **Pure publishers** (FifoReader) are the data sources that inject events into the bus.
- **Pure consumers** (LogWriter, InboxService) react to events and perform side effects (writing logs, delivering messages).
- **Publisher + Consumer** (StatusMonitor) transforms events: it consumes raw output, derives status, and publishes status change events for downstream consumers.

> **Warning: Threading and event loop discipline.** Publisher and consumer implementations must take great care when managing threading. The FifoReader runs in a dedicated OS thread (blocking `os.read` on the FIFO) and publishes into the asyncio loop via `call_soon_threadsafe`. All consumers (`StatusMonitor`, `LogWriter`, `InboxService`) run as asyncio tasks on the main event loop. Consumer `run()` methods must **always yield back to the event loop** (via `await queue.get()`) and avoid long-running synchronous operations that would block other consumers from processing events. If a consumer needs to perform blocking I/O, it should offload to a thread pool via `asyncio.to_thread()`.

## Components

### FIFO Reader (`services/fifo_reader.py`) — Publisher

Creates a named pipe (FIFO) per terminal and starts a daemon reader thread. tmux's `pipe-pane` writes terminal output to the FIFO; the reader reads 4KB chunks and publishes `terminal.{id}.output` events.

### Status Monitor (`services/status_monitor.py`) — Publisher + Consumer

Subscribes to `terminal.*.output`. Accumulates output into a rolling buffer (8KB) per terminal, detects status via the registered provider (or a generic shell prompt pattern before init), and publishes `terminal.{id}.status` on change. Also the source of truth for current terminal status.

### Log Writer (`services/log_writer.py`) — Consumer

Subscribes to `terminal.*.output`. Appends chunks to per-terminal log files (`~/.cao/logs/terminal/{id}.log`) for debugging.

### Inbox Service (`services/inbox_service.py`) — Consumer

Subscribes to `terminal.*.status`. On IDLE or COMPLETED, delivers the oldest pending inbox message to the terminal via `send_input` and updates the message status in the database.

## Startup & Shutdown

During server startup (`api/main.py` lifespan):

1. Register the asyncio event loop with the event bus: `bus.set_loop(loop)`
2. Start consumer tasks: `StatusMonitor.run()`, `LogWriter.run()`, `InboxService.run()`

During shutdown:

1. Cancel all consumer tasks
2. `asyncio.gather()` with `return_exceptions=True` to wait for clean exit

FIFO readers are started/stopped per-terminal by `terminal_service` during create/delete operations.
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ dependencies = [
"uvicorn[standard]>=0.24.0",
"websockets>=12.0",
"libtmux>=0.51.0",
"aiofiles>=24.1.0",
"click>=8.0.0",
"python-frontmatter>=1.1.0",
"watchdog==6.0.0",
"requests>=2.32.0",
]

Expand Down
56 changes: 31 additions & 25 deletions src/cli_agent_orchestrator/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from fastapi import FastAPI, HTTPException, Path, Query, status
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from pydantic import BaseModel, Field, field_validator
from watchdog.observers.polling import PollingObserver

from cli_agent_orchestrator.clients.database import (
create_inbox_message,
Expand All @@ -21,19 +20,20 @@
SERVER_HOST,
SERVER_PORT,
SERVER_VERSION,
TERMINAL_LOG_DIR,
)
from cli_agent_orchestrator.models.inbox import MessageStatus
from cli_agent_orchestrator.models.terminal import Terminal, TerminalId
from cli_agent_orchestrator.providers.manager import provider_manager
from cli_agent_orchestrator.services import (
flow_service,
inbox_service,
session_service,
terminal_service,
)
from cli_agent_orchestrator.services.cleanup_service import cleanup_old_data
from cli_agent_orchestrator.services.inbox_service import LogFileHandler
from cli_agent_orchestrator.services.event_bus import bus
from cli_agent_orchestrator.services.inbox_service import inbox_service
from cli_agent_orchestrator.services.log_writer import log_writer
from cli_agent_orchestrator.services.status_monitor import status_monitor
from cli_agent_orchestrator.services.terminal_service import OutputMode
from cli_agent_orchestrator.utils.agent_profiles import resolve_provider
from cli_agent_orchestrator.utils.logging import setup_logging
Expand All @@ -50,7 +50,7 @@ async def flow_daemon():
flows = flow_service.get_flows_to_run()
for flow in flows:
try:
executed = flow_service.execute_flow(flow.name)
executed = await flow_service.execute_flow(flow.name)
if executed:
logger.info(f"Flow '{flow.name}' executed successfully")
else:
Expand Down Expand Up @@ -90,23 +90,32 @@ async def lifespan(app: FastAPI):
# Start flow daemon as background task
daemon_task = asyncio.create_task(flow_daemon())

# Start inbox watcher
inbox_observer = PollingObserver(timeout=INBOX_POLLING_INTERVAL)
inbox_observer.schedule(LogFileHandler(), str(TERMINAL_LOG_DIR), recursive=False)
inbox_observer.start()
logger.info("Inbox watcher started (PollingObserver)")
# Register event loop with event bus for thread-safe publishing
loop = asyncio.get_running_loop()
bus.set_loop(loop)

yield
# Start event bus consumers as background tasks
status_monitor_task = asyncio.create_task(status_monitor.run())
log_writer_task = asyncio.create_task(log_writer.run())
inbox_service_task = asyncio.create_task(inbox_service.run())
logger.info("Event bus consumers started (StatusMonitor, LogWriter, InboxService)")

# Stop inbox observer
inbox_observer.stop()
inbox_observer.join()
logger.info("Inbox watcher stopped")
yield

# Cancel daemon on shutdown
# Cancel consumer tasks on shutdown
status_monitor_task.cancel()
log_writer_task.cancel()
inbox_service_task.cancel()
daemon_task.cancel()

try:
await daemon_task
await asyncio.gather(
status_monitor_task,
log_writer_task,
inbox_service_task,
daemon_task,
return_exceptions=True,
)
except asyncio.CancelledError:
pass

Expand Down Expand Up @@ -143,7 +152,7 @@ async def create_session(
) -> Terminal:
"""Create a new session with exactly one terminal."""
try:
result = terminal_service.create_terminal(
result = await terminal_service.create_terminal(
provider=provider,
agent_profile=agent_profile,
session_name=session_name,
Expand Down Expand Up @@ -213,8 +222,7 @@ async def create_terminal_in_session(
"""Create additional terminal in existing session."""
try:
resolved_provider = resolve_provider(agent_profile, fallback_provider=provider)

result = terminal_service.create_terminal(
result = await terminal_service.create_terminal(
provider=resolved_provider,
agent_profile=agent_profile,
session_name=session_name,
Expand Down Expand Up @@ -359,12 +367,10 @@ async def create_inbox_message_endpoint(
detail=f"Failed to create inbox message: {str(e)}",
)

# Best-effort immediate delivery. If the receiver terminal is idle, the
# message is delivered now; otherwise the watchdog will deliver it when
# the terminal becomes idle. Delivery failures must not cause the API
# to report an error — the message was already persisted above.
# Attempt immediate delivery if terminal is already IDLE.
# If not, InboxService will deliver on next IDLE status event.
try:
inbox_service.check_and_send_pending_messages(receiver_id)
inbox_service.deliver_pending(receiver_id)
except Exception as e:
logger.warning(f"Immediate delivery attempt failed for {receiver_id}: {e}")

Expand Down
15 changes: 11 additions & 4 deletions src/cli_agent_orchestrator/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@
TERMINAL_LOG_DIR = LOG_DIR / "terminal" # Per-terminal log files for pipe-pane output
TERMINAL_LOG_DIR.mkdir(parents=True, exist_ok=True)

# FIFO directory for event-driven terminal output streaming
FIFO_DIR = CAO_HOME_DIR / "fifos" # Named pipes for tmux pipe-pane streaming
FIFO_DIR.mkdir(parents=True, exist_ok=True)

# =============================================================================
# Inbox Service Configuration
# Event-Driven State Detection Configuration
# =============================================================================
# Polling interval for detecting log file changes (seconds)
# Lower values = faster response, higher CPU usage
INBOX_POLLING_INTERVAL = 5
# Rolling buffer size for state detection (8KB)
# Keeps trailing 8KB of terminal output for pattern matching
STATE_BUFFER_MAX = 8192

# Max events buffered per subscriber queue before dropping
EVENT_BUS_MAX_QUEUE_SIZE = 1024

# =============================================================================
# Cleanup Service Configuration
Expand Down
1 change: 1 addition & 0 deletions src/cli_agent_orchestrator/models/terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
class TerminalStatus(str, Enum):
"""Terminal status enumeration with provider-aware states."""

UNKNOWN = "unknown"
IDLE = "idle"
PROCESSING = "processing"
COMPLETED = "completed"
Expand Down
Loading
Loading