diff --git a/docs/inbox-delivery.md b/docs/inbox-delivery.md index 40c7d617..8a920189 100644 --- a/docs/inbox-delivery.md +++ b/docs/inbox-delivery.md @@ -72,3 +72,22 @@ The `_initialized` gate is important -- it prevents delivery during startup when | Message delivered during PROCESSING gets lost (agent errors mid-turn) | Low | Message status is DELIVERED; acceptable for v1 | | Watchdog fires every 5s during long turns | Medium (bounded) | One DB query + one tmux call per interval; no amplification | | Feature causes regression in non-eager providers | None | Provider flag defaults to False; only opt-in providers affected | + +## Reconciliation Sweep + +The immediate and watchdog paths can both miss a message when the receiving terminal is *already idle* when the message is queued: + +- the single immediate attempt may observe a momentarily stale status and skip delivery, and +- the watchdog only fires on log-file changes, which an already-idle agent that produces no further output never generates. + +When both miss, the message would otherwise stay `PENDING` forever (issue #131). + +A provider-agnostic background sweep closes this gap. Every `INBOX_RECONCILE_INTERVAL` (default 30s) it re-attempts delivery for any message left `PENDING` longer than `INBOX_RECONCILE_GRACE_SECONDS` (default 30s), routing it back through the same `check_and_send_pending_messages()` gate as the other paths. The work scales with the number of *backlogged* receivers, not the total agent count: when nothing is stuck the sweep runs one cheap query and returns. + +### Grace Window + +The sweep deliberately ignores messages younger than the grace window. The immediate and watchdog paths own delivery during that window; the sweep only adopts messages they have demonstrably had their chance at and missed. This keeps the sweep from competing with the fast paths on freshly queued messages and minimizes its overlap with them. + +### Relationship to the OpenCode Poller + +The sweep does not replace the OpenCode poller. They serve different roles: the OpenCode poller is a fast (5s) primary wakeup for a provider whose logs stop changing once its TUI settles, while the sweep is a slow, provider-agnostic safety net. Both reuse `check_and_send_pending_messages()` and so share its known duplicate-wakeup race; the grace window keeps the sweep from overlapping the fast paths in practice. GH #115 tracks unifying all of these wakeup sources into a single coordinated delivery engine that would make delivery atomic. diff --git a/src/cli_agent_orchestrator/api/main.py b/src/cli_agent_orchestrator/api/main.py index bf8f00be..f06b7508 100644 --- a/src/cli_agent_orchestrator/api/main.py +++ b/src/cli_agent_orchestrator/api/main.py @@ -42,6 +42,7 @@ CORS_ORIGINS, DEFAULT_PROVIDER, INBOX_POLLING_INTERVAL, + INBOX_RECONCILE_INTERVAL, SERVER_HOST, SERVER_PORT, SERVER_VERSION, @@ -111,6 +112,24 @@ async def opencode_inbox_delivery_daemon(registry: PluginRegistry) -> None: logger.exception("OpenCode inbox delivery poller error") +async def inbox_reconciliation_daemon(registry: PluginRegistry) -> None: + """Background task that recovers inbox messages the fast paths missed. + + Safety net for issue #131: the immediate (on POST) and watchdog (on log + change) delivery paths can both miss a message when the receiver is already + idle, leaving it PENDING forever. This sweep runs on a slower interval than + the watchdog and re-attempts delivery for anything left pending past the + grace window. + """ + logger.info("Inbox reconciliation daemon started") + while True: + await asyncio.sleep(INBOX_RECONCILE_INTERVAL) + try: + await asyncio.to_thread(inbox_service.reconcile_orphaned_messages, registry) + except Exception: + logger.exception("Inbox reconciliation daemon error") + + # Response Models class TerminalOutputResponse(BaseModel): output: str @@ -183,6 +202,10 @@ async def lifespan(app: FastAPI): # provider-specific wakeup path with a unified delivery engine. opencode_inbox_task = asyncio.create_task(opencode_inbox_delivery_daemon(registry)) + # Start provider-agnostic reconciliation sweep for orphaned PENDING + # messages the immediate and watchdog paths missed (issue #131). + inbox_reconcile_task = asyncio.create_task(inbox_reconciliation_daemon(registry)) + # Start inbox watcher inbox_observer = PollingObserver(timeout=INBOX_POLLING_INTERVAL) inbox_observer.schedule(LogFileHandler(registry), str(TERMINAL_LOG_DIR), recursive=False) @@ -210,6 +233,13 @@ async def lifespan(app: FastAPI): except asyncio.CancelledError: pass + # Cancel inbox reconciliation sweep on shutdown + inbox_reconcile_task.cancel() + try: + await inbox_reconcile_task + except asyncio.CancelledError: + pass + await registry.teardown() logger.info("Shutting down CLI Agent Orchestrator server...") diff --git a/src/cli_agent_orchestrator/clients/database.py b/src/cli_agent_orchestrator/clients/database.py index 95f4dde6..772ce7f6 100644 --- a/src/cli_agent_orchestrator/clients/database.py +++ b/src/cli_agent_orchestrator/clients/database.py @@ -2,7 +2,7 @@ import logging import uuid -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional from sqlalchemy import ( @@ -344,6 +344,36 @@ def list_pending_receiver_ids_by_provider(provider: str) -> List[str]: return [row[0] for row in rows] +def list_pending_receiver_ids_older_than(min_age_seconds: int) -> List[str]: + """List receiver terminal IDs whose messages have been PENDING too long. + + Returns the distinct receivers of any message still PENDING for longer than + ``min_age_seconds``. Used by the inbox reconciliation sweep to find messages + the immediate and watchdog delivery paths missed, without competing with + them for freshly queued ones (issue #131). + + The join on ``terminals`` drops messages whose receiver terminal no longer + exists, so the sweep does not keep retrying deliveries to deleted agents. + + ``created_at`` is stored local-naive (``InboxModel.created_at`` defaults to + ``datetime.now``), so the cutoff uses ``datetime.now()`` to match — the same + convention as the retention query in ``cleanup_service.cleanup_old_data``. + """ + cutoff = datetime.now() - timedelta(seconds=min_age_seconds) + with SessionLocal() as db: + rows = ( + db.query(InboxModel.receiver_id) + .join(TerminalModel, TerminalModel.id == InboxModel.receiver_id) + .filter( + InboxModel.status == MessageStatus.PENDING.value, + InboxModel.created_at < cutoff, + ) + .distinct() + .all() + ) + return [row[0] for row in rows] + + def delete_terminal(terminal_id: str) -> bool: """Delete terminal metadata.""" with SessionLocal() as db: diff --git a/src/cli_agent_orchestrator/constants.py b/src/cli_agent_orchestrator/constants.py index c3ef3e39..50a6f0a9 100644 --- a/src/cli_agent_orchestrator/constants.py +++ b/src/cli_agent_orchestrator/constants.py @@ -66,6 +66,25 @@ # for capable providers (e.g., Claude Code). EAGER_INBOX_DELIVERY = os.environ.get("CAO_EAGER_INBOX_DELIVERY", "false").lower() == "true" +# Reconciliation sweep for orphaned inbox messages. +# The immediate (on POST) and watchdog (on log-file change) delivery paths can +# both miss a message when the receiving terminal is already idle: the single +# immediate attempt may observe a stale status, and the watchdog only fires on +# log output that an idle agent never produces. Those messages would otherwise +# stay PENDING forever. A slow background sweep re-attempts delivery for any +# message left pending past the grace window below, acting as a catch-all +# fallback under the two fast paths (issue #131). +# +# The interval is deliberately much larger than INBOX_POLLING_INTERVAL: this is +# a safety net, not a primary delivery path, so it trades latency for low load. +INBOX_RECONCILE_INTERVAL = 30 # seconds between reconciliation sweeps + +# Only reconcile messages older than this. Keeping the grace window >= +# INBOX_POLLING_INTERVAL (the watchdog poll cadence) ensures the sweep never +# competes with the immediate and watchdog paths for freshly queued messages — +# it only adopts ones those paths have already had their chance at and missed. +INBOX_RECONCILE_GRACE_SECONDS = 30 + # ============================================================================= # Cleanup Service Configuration # ============================================================================= diff --git a/src/cli_agent_orchestrator/services/inbox_service.py b/src/cli_agent_orchestrator/services/inbox_service.py index 4046cf25..e227dd81 100644 --- a/src/cli_agent_orchestrator/services/inbox_service.py +++ b/src/cli_agent_orchestrator/services/inbox_service.py @@ -31,9 +31,14 @@ from cli_agent_orchestrator.clients.database import ( get_pending_messages, list_pending_receiver_ids_by_provider, + list_pending_receiver_ids_older_than, update_message_status, ) -from cli_agent_orchestrator.constants import EAGER_INBOX_DELIVERY, TERMINAL_LOG_DIR +from cli_agent_orchestrator.constants import ( + EAGER_INBOX_DELIVERY, + INBOX_RECONCILE_GRACE_SECONDS, + TERMINAL_LOG_DIR, +) from cli_agent_orchestrator.models.inbox import MessageStatus, OrchestrationType from cli_agent_orchestrator.models.provider import ProviderType from cli_agent_orchestrator.models.terminal import TerminalStatus @@ -161,6 +166,34 @@ def poll_opencode_pending_messages(registry: PluginRegistry | None = None) -> No logger.debug(f"OpenCode inbox poll failed for {terminal_id}: {e}") +def reconcile_orphaned_messages(registry: PluginRegistry | None = None) -> None: + """Re-attempt delivery for messages stuck in PENDING past the grace window. + + Provider-agnostic safety net for the gap described in issue #131: when a + receiving terminal is already idle, the immediate (on POST) delivery path + may miss on a stale status and the log-watching observer never fires again + (an idle agent produces no new log output), leaving the message orphaned. + This sweep finds any such message and routes it back through the normal + delivery gate. + + Only messages older than ``INBOX_RECONCILE_GRACE_SECONDS`` are considered, + so the sweep never competes with the immediate and watchdog paths for + freshly queued messages — it only adopts ones they have already missed. + + Like ``poll_opencode_pending_messages``, this reuses ``check_and_send_pending_messages`` + and so inherits its known duplicate-wakeup race; the grace window keeps the + sweep from overlapping the fast paths in practice, and GH #115 tracks the + single coordinated delivery engine that would make delivery atomic. + """ + receiver_ids = list_pending_receiver_ids_older_than(INBOX_RECONCILE_GRACE_SECONDS) + + for terminal_id in receiver_ids: + try: + check_and_send_pending_messages(terminal_id, registry=registry) + except Exception as e: + logger.debug(f"Inbox reconciliation failed for {terminal_id}: {e}") + + class LogFileHandler(FileSystemEventHandler): """Handler for terminal log file changes.""" diff --git a/test/api/test_api_endpoints.py b/test/api/test_api_endpoints.py index e5c63ddd..4389d4ca 100644 --- a/test/api/test_api_endpoints.py +++ b/test/api/test_api_endpoints.py @@ -11,8 +11,14 @@ import pytest -from cli_agent_orchestrator.api.main import app, flow_daemon, opencode_inbox_delivery_daemon +from cli_agent_orchestrator.api.main import ( + app, + flow_daemon, + inbox_reconciliation_daemon, + opencode_inbox_delivery_daemon, +) from cli_agent_orchestrator.models.terminal import Terminal +from cli_agent_orchestrator.services import inbox_service from cli_agent_orchestrator.utils.skills import SkillNameError # ── Health endpoint ────────────────────────────────────────────────── @@ -995,6 +1001,35 @@ async def fake_sleep(_seconds): assert mock_to_thread.await_args.args[1] is registry +class TestInboxReconciliationDaemon: + """Tests for the provider-agnostic inbox reconciliation sweep (issue #131).""" + + @pytest.mark.asyncio + async def test_sweep_runs_one_iteration_then_cancels(self): + """Daemon sleeps, runs the sync sweep in a thread, then handles cancellation.""" + sleep_calls = 0 + registry = MagicMock() + mock_to_thread = AsyncMock() + + async def fake_sleep(_seconds): + nonlocal sleep_calls + sleep_calls += 1 + if sleep_calls > 1: + raise asyncio.CancelledError + + with ( + patch("asyncio.sleep", new=fake_sleep), + patch("asyncio.to_thread", mock_to_thread), + ): + with pytest.raises(asyncio.CancelledError): + await inbox_reconciliation_daemon(registry) + + mock_to_thread.assert_awaited_once() + # The sweep, not some other sync function, must be the dispatched work. + assert mock_to_thread.await_args.args[0] is inbox_service.reconcile_orphaned_messages + assert mock_to_thread.await_args.args[1] is registry + + # ── lifespan ───────────────────────────────────────────────────────── @@ -1030,6 +1065,43 @@ async def fake_daemon(): mock_observer.stop.assert_called_once() mock_observer.join.assert_called_once() + @pytest.mark.asyncio + async def test_lifespan_cancels_inbox_reconciliation_on_shutdown(self): + """The reconciliation sweep task is cancelled when the server stops (issue #131).""" + from cli_agent_orchestrator.api.main import lifespan + + mock_observer = MagicMock() + reconcile_cancelled = {"value": False} + + async def fake_flow_daemon(): + await asyncio.sleep(3600) + + async def fake_reconcile(registry): + try: + await asyncio.sleep(3600) + except asyncio.CancelledError: + reconcile_cancelled["value"] = True + raise + + with ( + patch("cli_agent_orchestrator.api.main.setup_logging"), + patch("cli_agent_orchestrator.api.main.init_db"), + patch("cli_agent_orchestrator.api.main.cleanup_old_data"), + patch( + "cli_agent_orchestrator.api.main.PollingObserver", + return_value=mock_observer, + ), + patch("cli_agent_orchestrator.api.main.flow_daemon", fake_flow_daemon), + patch( + "cli_agent_orchestrator.api.main.inbox_reconciliation_daemon", + fake_reconcile, + ), + ): + async with lifespan(app): + pass + + assert reconcile_cancelled["value"] is True + # ── main() entry point ─────────────────────────────────────────────── diff --git a/test/clients/test_database.py b/test/clients/test_database.py index 8eb41d28..9809e88c 100644 --- a/test/clients/test_database.py +++ b/test/clients/test_database.py @@ -1,7 +1,7 @@ """Tests for the database client.""" import tempfile -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from unittest.mock import MagicMock, patch @@ -27,6 +27,7 @@ init_db, list_flows, list_pending_receiver_ids_by_provider, + list_pending_receiver_ids_older_than, list_terminals_by_session, update_flow_enabled, update_flow_run_times, @@ -234,6 +235,72 @@ def test_list_pending_receiver_ids_by_provider(self, mock_session_class): assert result == ["receiver-1", "receiver-2"] + def test_list_pending_receiver_ids_older_than(self, test_db): + """Only messages pending past the grace window — whose receiver + terminal still exists — are returned for reconciliation (issue #131). + + Uses the real in-memory DB (not a mocked session) so the age cutoff, + status filter, and terminal join are actually exercised. + """ + old = datetime.now() - timedelta(seconds=120) + fresh = datetime.now() + + with test_db() as seed: + seed.add_all( + [ + TerminalModel( + id="term-old", + tmux_session="cao-s", + tmux_window="w", + provider="kiro_cli", + ), + TerminalModel( + id="term-fresh", + tmux_session="cao-s", + tmux_window="w", + provider="kiro_cli", + ), + # Stuck long enough to reconcile, receiver still alive — kept. + InboxModel( + sender_id="a", + receiver_id="term-old", + message="m", + status=MessageStatus.PENDING.value, + created_at=old, + ), + # Too recent — left to the immediate/watchdog paths. + InboxModel( + sender_id="a", + receiver_id="term-fresh", + message="m", + status=MessageStatus.PENDING.value, + created_at=fresh, + ), + # Already delivered — not pending. + InboxModel( + sender_id="a", + receiver_id="term-old", + message="m", + status=MessageStatus.DELIVERED.value, + created_at=old, + ), + # Receiver terminal is gone — dropped by the join. + InboxModel( + sender_id="a", + receiver_id="term-ghost", + message="m", + status=MessageStatus.PENDING.value, + created_at=old, + ), + ] + ) + seed.commit() + + with patch("cli_agent_orchestrator.clients.database.SessionLocal", test_db): + result = list_pending_receiver_ids_older_than(30) + + assert result == ["term-old"] + @patch("cli_agent_orchestrator.clients.database.SessionLocal") def test_delete_terminals_by_session(self, mock_session_class): """Test deleting all terminals in a session.""" diff --git a/test/services/test_inbox_service.py b/test/services/test_inbox_service.py index 3dea87e5..38976d90 100644 --- a/test/services/test_inbox_service.py +++ b/test/services/test_inbox_service.py @@ -5,6 +5,7 @@ import pytest +from cli_agent_orchestrator.constants import INBOX_RECONCILE_GRACE_SECONDS from cli_agent_orchestrator.models.inbox import MessageStatus from cli_agent_orchestrator.models.terminal import TerminalStatus from cli_agent_orchestrator.services.inbox_service import ( @@ -13,6 +14,7 @@ _has_idle_pattern, check_and_send_pending_messages, poll_opencode_pending_messages, + reconcile_orphaned_messages, ) @@ -392,6 +394,35 @@ def test_survives_per_receiver_failure(self, mock_list_receivers, mock_check_sen assert mock_check_send.call_count == 2 +class TestReconcileOrphanedMessages: + """Tests for the provider-agnostic inbox reconciliation sweep (issue #131).""" + + @patch("cli_agent_orchestrator.services.inbox_service.check_and_send_pending_messages") + @patch("cli_agent_orchestrator.services.inbox_service.list_pending_receiver_ids_older_than") + def test_reconciles_stale_receivers(self, mock_list_receivers, mock_check_send): + """Sweep attempts delivery for each receiver with an orphaned message.""" + mock_list_receivers.return_value = ["receiver-1", "receiver-2"] + + reconcile_orphaned_messages() + + mock_list_receivers.assert_called_once_with(INBOX_RECONCILE_GRACE_SECONDS) + assert mock_check_send.call_args_list == [ + call("receiver-1", registry=None), + call("receiver-2", registry=None), + ] + + @patch("cli_agent_orchestrator.services.inbox_service.check_and_send_pending_messages") + @patch("cli_agent_orchestrator.services.inbox_service.list_pending_receiver_ids_older_than") + def test_survives_per_receiver_failure(self, mock_list_receivers, mock_check_send): + """One failed receiver does not stop the sweep.""" + mock_list_receivers.return_value = ["receiver-1", "receiver-2"] + mock_check_send.side_effect = [Exception("tmux busy"), False] + + reconcile_orphaned_messages() + + assert mock_check_send.call_count == 2 + + class TestLogFileHandler: """Tests for LogFileHandler class."""