Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/inbox-delivery.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,22 @@ The `_initialized` gate is important -- it prevents delivery during startup when
| Message delivered during PROCESSING gets lost (agent errors mid-turn) | Low | Message status is DELIVERED; acceptable for v1 |
| Watchdog fires every 5s during long turns | Medium (bounded) | One DB query + one tmux call per interval; no amplification |
| Feature causes regression in non-eager providers | None | Provider flag defaults to False; only opt-in providers affected |

## Reconciliation Sweep

The immediate and watchdog paths can both miss a message when the receiving terminal is *already idle* when the message is queued:

- the single immediate attempt may observe a momentarily stale status and skip delivery, and
- the watchdog only fires on log-file changes, which an already-idle agent that produces no further output never generates.

When both miss, the message would otherwise stay `PENDING` forever (issue #131).

A provider-agnostic background sweep closes this gap. Every `INBOX_RECONCILE_INTERVAL` (default 30s) it re-attempts delivery for any message left `PENDING` longer than `INBOX_RECONCILE_GRACE_SECONDS` (default 30s), routing it back through the same `check_and_send_pending_messages()` gate as the other paths. The work scales with the number of *backlogged* receivers, not the total agent count: when nothing is stuck the sweep runs one cheap query and returns.

### Grace Window

The sweep deliberately ignores messages younger than the grace window. The immediate and watchdog paths own delivery during that window; the sweep only adopts messages they have demonstrably had their chance at and missed. This keeps the sweep from competing with the fast paths on freshly queued messages and minimizes its overlap with them.

### Relationship to the OpenCode Poller

The sweep does not replace the OpenCode poller. They serve different roles: the OpenCode poller is a fast (5s) primary wakeup for a provider whose logs stop changing once its TUI settles, while the sweep is a slow, provider-agnostic safety net. Both reuse `check_and_send_pending_messages()` and so share its known duplicate-wakeup race; the grace window keeps the sweep from overlapping the fast paths in practice. GH #115 tracks unifying all of these wakeup sources into a single coordinated delivery engine that would make delivery atomic.
30 changes: 30 additions & 0 deletions src/cli_agent_orchestrator/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
CORS_ORIGINS,
DEFAULT_PROVIDER,
INBOX_POLLING_INTERVAL,
INBOX_RECONCILE_INTERVAL,
SERVER_HOST,
SERVER_PORT,
SERVER_VERSION,
Expand Down Expand Up @@ -111,6 +112,24 @@ async def opencode_inbox_delivery_daemon(registry: PluginRegistry) -> None:
logger.exception("OpenCode inbox delivery poller error")


async def inbox_reconciliation_daemon(registry: PluginRegistry) -> None:
"""Background task that recovers inbox messages the fast paths missed.

Safety net for issue #131: the immediate (on POST) and watchdog (on log
change) delivery paths can both miss a message when the receiver is already
idle, leaving it PENDING forever. This sweep runs on a slower interval than
the watchdog and re-attempts delivery for anything left pending past the
grace window.
"""
logger.info("Inbox reconciliation daemon started")
while True:
await asyncio.sleep(INBOX_RECONCILE_INTERVAL)
try:
await asyncio.to_thread(inbox_service.reconcile_orphaned_messages, registry)
except Exception:
logger.exception("Inbox reconciliation daemon error")


# Response Models
class TerminalOutputResponse(BaseModel):
output: str
Expand Down Expand Up @@ -183,6 +202,10 @@ async def lifespan(app: FastAPI):
# provider-specific wakeup path with a unified delivery engine.
opencode_inbox_task = asyncio.create_task(opencode_inbox_delivery_daemon(registry))

# Start provider-agnostic reconciliation sweep for orphaned PENDING
# messages the immediate and watchdog paths missed (issue #131).
inbox_reconcile_task = asyncio.create_task(inbox_reconciliation_daemon(registry))

# Start inbox watcher
inbox_observer = PollingObserver(timeout=INBOX_POLLING_INTERVAL)
inbox_observer.schedule(LogFileHandler(registry), str(TERMINAL_LOG_DIR), recursive=False)
Expand Down Expand Up @@ -210,6 +233,13 @@ async def lifespan(app: FastAPI):
except asyncio.CancelledError:
pass

# Cancel inbox reconciliation sweep on shutdown
inbox_reconcile_task.cancel()
try:
await inbox_reconcile_task
except asyncio.CancelledError:
pass

await registry.teardown()
logger.info("Shutting down CLI Agent Orchestrator server...")

Expand Down
32 changes: 31 additions & 1 deletion src/cli_agent_orchestrator/clients/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
import uuid
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

from sqlalchemy import (
Expand Down Expand Up @@ -344,6 +344,36 @@ def list_pending_receiver_ids_by_provider(provider: str) -> List[str]:
return [row[0] for row in rows]


def list_pending_receiver_ids_older_than(min_age_seconds: int) -> List[str]:
"""List receiver terminal IDs whose messages have been PENDING too long.

Returns the distinct receivers of any message still PENDING for longer than
``min_age_seconds``. Used by the inbox reconciliation sweep to find messages
the immediate and watchdog delivery paths missed, without competing with
them for freshly queued ones (issue #131).

The join on ``terminals`` drops messages whose receiver terminal no longer
exists, so the sweep does not keep retrying deliveries to deleted agents.

``created_at`` is stored local-naive (``InboxModel.created_at`` defaults to
``datetime.now``), so the cutoff uses ``datetime.now()`` to match — the same
convention as the retention query in ``cleanup_service.cleanup_old_data``.
"""
cutoff = datetime.now() - timedelta(seconds=min_age_seconds)
with SessionLocal() as db:
rows = (
db.query(InboxModel.receiver_id)
.join(TerminalModel, TerminalModel.id == InboxModel.receiver_id)
.filter(
InboxModel.status == MessageStatus.PENDING.value,
InboxModel.created_at < cutoff,
)
.distinct()
.all()
)
return [row[0] for row in rows]


def delete_terminal(terminal_id: str) -> bool:
"""Delete terminal metadata."""
with SessionLocal() as db:
Expand Down
19 changes: 19 additions & 0 deletions src/cli_agent_orchestrator/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@
# for capable providers (e.g., Claude Code).
EAGER_INBOX_DELIVERY = os.environ.get("CAO_EAGER_INBOX_DELIVERY", "false").lower() == "true"

# Reconciliation sweep for orphaned inbox messages.
# The immediate (on POST) and watchdog (on log-file change) delivery paths can
# both miss a message when the receiving terminal is already idle: the single
# immediate attempt may observe a stale status, and the watchdog only fires on
# log output that an idle agent never produces. Those messages would otherwise
# stay PENDING forever. A slow background sweep re-attempts delivery for any
# message left pending past the grace window below, acting as a catch-all
# fallback under the two fast paths (issue #131).
#
# The interval is deliberately much larger than INBOX_POLLING_INTERVAL: this is
# a safety net, not a primary delivery path, so it trades latency for low load.
INBOX_RECONCILE_INTERVAL = 30 # seconds between reconciliation sweeps

# Only reconcile messages older than this. Keeping the grace window >=
# INBOX_POLLING_INTERVAL (the watchdog poll cadence) ensures the sweep never
# competes with the immediate and watchdog paths for freshly queued messages —
# it only adopts ones those paths have already had their chance at and missed.
INBOX_RECONCILE_GRACE_SECONDS = 30

# =============================================================================
# Cleanup Service Configuration
# =============================================================================
Expand Down
35 changes: 34 additions & 1 deletion src/cli_agent_orchestrator/services/inbox_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@
from cli_agent_orchestrator.clients.database import (
get_pending_messages,
list_pending_receiver_ids_by_provider,
list_pending_receiver_ids_older_than,
update_message_status,
)
from cli_agent_orchestrator.constants import EAGER_INBOX_DELIVERY, TERMINAL_LOG_DIR
from cli_agent_orchestrator.constants import (
EAGER_INBOX_DELIVERY,
INBOX_RECONCILE_GRACE_SECONDS,
TERMINAL_LOG_DIR,
)
from cli_agent_orchestrator.models.inbox import MessageStatus, OrchestrationType
from cli_agent_orchestrator.models.provider import ProviderType
from cli_agent_orchestrator.models.terminal import TerminalStatus
Expand Down Expand Up @@ -161,6 +166,34 @@ def poll_opencode_pending_messages(registry: PluginRegistry | None = None) -> No
logger.debug(f"OpenCode inbox poll failed for {terminal_id}: {e}")


def reconcile_orphaned_messages(registry: PluginRegistry | None = None) -> None:
"""Re-attempt delivery for messages stuck in PENDING past the grace window.

Provider-agnostic safety net for the gap described in issue #131: when a
receiving terminal is already idle, the immediate (on POST) delivery path
may miss on a stale status and the log-watching observer never fires again
(an idle agent produces no new log output), leaving the message orphaned.
This sweep finds any such message and routes it back through the normal
delivery gate.

Only messages older than ``INBOX_RECONCILE_GRACE_SECONDS`` are considered,
so the sweep never competes with the immediate and watchdog paths for
freshly queued messages — it only adopts ones they have already missed.

Like ``poll_opencode_pending_messages``, this reuses ``check_and_send_pending_messages``
and so inherits its known duplicate-wakeup race; the grace window keeps the
sweep from overlapping the fast paths in practice, and GH #115 tracks the
single coordinated delivery engine that would make delivery atomic.
"""
receiver_ids = list_pending_receiver_ids_older_than(INBOX_RECONCILE_GRACE_SECONDS)

for terminal_id in receiver_ids:
try:
check_and_send_pending_messages(terminal_id, registry=registry)
except Exception as e:
logger.debug(f"Inbox reconciliation failed for {terminal_id}: {e}")


class LogFileHandler(FileSystemEventHandler):
"""Handler for terminal log file changes."""

Expand Down
74 changes: 73 additions & 1 deletion test/api/test_api_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@

import pytest

from cli_agent_orchestrator.api.main import app, flow_daemon, opencode_inbox_delivery_daemon
from cli_agent_orchestrator.api.main import (
app,
flow_daemon,
inbox_reconciliation_daemon,
opencode_inbox_delivery_daemon,
)
from cli_agent_orchestrator.models.terminal import Terminal
from cli_agent_orchestrator.services import inbox_service
from cli_agent_orchestrator.utils.skills import SkillNameError

# ── Health endpoint ──────────────────────────────────────────────────
Expand Down Expand Up @@ -995,6 +1001,35 @@ async def fake_sleep(_seconds):
assert mock_to_thread.await_args.args[1] is registry


class TestInboxReconciliationDaemon:
"""Tests for the provider-agnostic inbox reconciliation sweep (issue #131)."""

@pytest.mark.asyncio
async def test_sweep_runs_one_iteration_then_cancels(self):
"""Daemon sleeps, runs the sync sweep in a thread, then handles cancellation."""
sleep_calls = 0
registry = MagicMock()
mock_to_thread = AsyncMock()

async def fake_sleep(_seconds):
nonlocal sleep_calls
sleep_calls += 1
if sleep_calls > 1:
raise asyncio.CancelledError

with (
patch("asyncio.sleep", new=fake_sleep),
patch("asyncio.to_thread", mock_to_thread),
):
with pytest.raises(asyncio.CancelledError):
await inbox_reconciliation_daemon(registry)

mock_to_thread.assert_awaited_once()
# The sweep, not some other sync function, must be the dispatched work.
assert mock_to_thread.await_args.args[0] is inbox_service.reconcile_orphaned_messages
assert mock_to_thread.await_args.args[1] is registry


# ── lifespan ─────────────────────────────────────────────────────────


Expand Down Expand Up @@ -1030,6 +1065,43 @@ async def fake_daemon():
mock_observer.stop.assert_called_once()
mock_observer.join.assert_called_once()

@pytest.mark.asyncio
async def test_lifespan_cancels_inbox_reconciliation_on_shutdown(self):
"""The reconciliation sweep task is cancelled when the server stops (issue #131)."""
from cli_agent_orchestrator.api.main import lifespan

mock_observer = MagicMock()
reconcile_cancelled = {"value": False}

async def fake_flow_daemon():
await asyncio.sleep(3600)

async def fake_reconcile(registry):
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
reconcile_cancelled["value"] = True
raise

with (
patch("cli_agent_orchestrator.api.main.setup_logging"),
patch("cli_agent_orchestrator.api.main.init_db"),
patch("cli_agent_orchestrator.api.main.cleanup_old_data"),
patch(
"cli_agent_orchestrator.api.main.PollingObserver",
return_value=mock_observer,
),
patch("cli_agent_orchestrator.api.main.flow_daemon", fake_flow_daemon),
patch(
"cli_agent_orchestrator.api.main.inbox_reconciliation_daemon",
fake_reconcile,
),
):
async with lifespan(app):
pass

assert reconcile_cancelled["value"] is True


# ── main() entry point ───────────────────────────────────────────────

Expand Down
69 changes: 68 additions & 1 deletion test/clients/test_database.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Tests for the database client."""

import tempfile
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch

Expand All @@ -27,6 +27,7 @@
init_db,
list_flows,
list_pending_receiver_ids_by_provider,
list_pending_receiver_ids_older_than,
list_terminals_by_session,
update_flow_enabled,
update_flow_run_times,
Expand Down Expand Up @@ -234,6 +235,72 @@ def test_list_pending_receiver_ids_by_provider(self, mock_session_class):

assert result == ["receiver-1", "receiver-2"]

def test_list_pending_receiver_ids_older_than(self, test_db):
"""Only messages pending past the grace window — whose receiver
terminal still exists — are returned for reconciliation (issue #131).

Uses the real in-memory DB (not a mocked session) so the age cutoff,
status filter, and terminal join are actually exercised.
"""
old = datetime.now() - timedelta(seconds=120)
fresh = datetime.now()

with test_db() as seed:
seed.add_all(
[
TerminalModel(
id="term-old",
tmux_session="cao-s",
tmux_window="w",
provider="kiro_cli",
),
TerminalModel(
id="term-fresh",
tmux_session="cao-s",
tmux_window="w",
provider="kiro_cli",
),
# Stuck long enough to reconcile, receiver still alive — kept.
InboxModel(
sender_id="a",
receiver_id="term-old",
message="m",
status=MessageStatus.PENDING.value,
created_at=old,
),
# Too recent — left to the immediate/watchdog paths.
InboxModel(
sender_id="a",
receiver_id="term-fresh",
message="m",
status=MessageStatus.PENDING.value,
created_at=fresh,
),
# Already delivered — not pending.
InboxModel(
sender_id="a",
receiver_id="term-old",
message="m",
status=MessageStatus.DELIVERED.value,
created_at=old,
),
# Receiver terminal is gone — dropped by the join.
InboxModel(
sender_id="a",
receiver_id="term-ghost",
message="m",
status=MessageStatus.PENDING.value,
created_at=old,
),
]
)
seed.commit()

with patch("cli_agent_orchestrator.clients.database.SessionLocal", test_db):
result = list_pending_receiver_ids_older_than(30)

assert result == ["term-old"]

@patch("cli_agent_orchestrator.clients.database.SessionLocal")
def test_delete_terminals_by_session(self, mock_session_class):
"""Test deleting all terminals in a session."""
Expand Down
Loading
Loading