From 459713018ad305582787526628757e1c70dfd4e9 Mon Sep 17 00:00:00 2001 From: patricka3125 Date: Mon, 27 Apr 2026 12:00:18 -0700 Subject: [PATCH 1/2] fix(opencode): Add a poller to opencode CLI inbox delivery to drain stuck messages in pending status --- src/cli_agent_orchestrator/api/main.py | 22 +++++++++++ .../clients/database.py | 16 ++++++++ .../services/inbox_service.py | 25 ++++++++++++- test/api/test_api_endpoints.py | 37 ++++++++++++++++--- test/clients/test_database.py | 20 ++++++++++ test/services/test_inbox_service.py | 32 +++++++++++++++- 6 files changed, 145 insertions(+), 7 deletions(-) diff --git a/src/cli_agent_orchestrator/api/main.py b/src/cli_agent_orchestrator/api/main.py index 45e620bbb..56f8a0e86 100644 --- a/src/cli_agent_orchestrator/api/main.py +++ b/src/cli_agent_orchestrator/api/main.py @@ -83,6 +83,17 @@ async def flow_daemon(): await asyncio.sleep(60) +async def opencode_inbox_delivery_daemon(registry: PluginRegistry) -> None: + """Background task to wake OpenCode inbox delivery for pending messages.""" + logger.info("OpenCode inbox delivery poller started") + while True: + await asyncio.sleep(INBOX_POLLING_INTERVAL) + try: + await asyncio.to_thread(inbox_service.poll_opencode_pending_messages, registry) + except Exception: + logger.exception("OpenCode inbox delivery poller error") + + # Response Models class TerminalOutputResponse(BaseModel): output: str @@ -138,6 +149,10 @@ async def lifespan(app: FastAPI): # Start flow daemon as background task daemon_task = asyncio.create_task(flow_daemon()) + # Start temporary OpenCode inbox poller. GH #115 tracks replacing this + # provider-specific wakeup path with a unified delivery engine. + opencode_inbox_task = asyncio.create_task(opencode_inbox_delivery_daemon(registry)) + # Start inbox watcher inbox_observer = PollingObserver(timeout=INBOX_POLLING_INTERVAL) inbox_observer.schedule(LogFileHandler(registry), str(TERMINAL_LOG_DIR), recursive=False) @@ -158,6 +173,13 @@ async def lifespan(app: FastAPI): except asyncio.CancelledError: pass + # Cancel OpenCode inbox poller on shutdown + opencode_inbox_task.cancel() + try: + await opencode_inbox_task + except asyncio.CancelledError: + pass + await registry.teardown() logger.info("Shutting down CLI Agent Orchestrator server...") diff --git a/src/cli_agent_orchestrator/clients/database.py b/src/cli_agent_orchestrator/clients/database.py index cdd7ca2a7..b37541898 100644 --- a/src/cli_agent_orchestrator/clients/database.py +++ b/src/cli_agent_orchestrator/clients/database.py @@ -191,6 +191,22 @@ def list_all_terminals() -> List[Dict[str, Any]]: ] +def list_pending_receiver_ids_by_provider(provider: str) -> List[str]: + """List receiver terminal IDs with pending messages for a specific provider.""" + with SessionLocal() as db: + rows = ( + db.query(InboxModel.receiver_id) + .join(TerminalModel, TerminalModel.id == InboxModel.receiver_id) + .filter( + TerminalModel.provider == provider, + InboxModel.status == MessageStatus.PENDING.value, + ) + .distinct() + .all() + ) + return [row[0] for row in rows] + + def delete_terminal(terminal_id: str) -> bool: """Delete terminal metadata.""" with SessionLocal() as db: diff --git a/src/cli_agent_orchestrator/services/inbox_service.py b/src/cli_agent_orchestrator/services/inbox_service.py index 3c2a55cfd..6c5cfdec8 100644 --- a/src/cli_agent_orchestrator/services/inbox_service.py +++ b/src/cli_agent_orchestrator/services/inbox_service.py @@ -28,9 +28,14 @@ from watchdog.events import FileModifiedEvent, FileSystemEventHandler -from cli_agent_orchestrator.clients.database import get_pending_messages, update_message_status +from cli_agent_orchestrator.clients.database import ( + get_pending_messages, + list_pending_receiver_ids_by_provider, + update_message_status, +) from cli_agent_orchestrator.constants import TERMINAL_LOG_DIR from cli_agent_orchestrator.models.inbox import MessageStatus, OrchestrationType +from cli_agent_orchestrator.models.provider import ProviderType from cli_agent_orchestrator.models.terminal import TerminalStatus from cli_agent_orchestrator.plugins import PluginRegistry from cli_agent_orchestrator.providers.manager import provider_manager @@ -132,6 +137,24 @@ def check_and_send_pending_messages( raise +def poll_opencode_pending_messages(registry: PluginRegistry | None = None) -> None: + """Poll OpenCode terminals for pending inbox messages. + + This is a temporary OpenCode-specific wakeup path for providers whose + pipe-pane logs do not change after the TUI settles. It intentionally reuses + the existing delivery helper and inherits its known duplicate-wakeup race + with immediate and watchdog delivery paths. GH #115 tracks replacing these + paths with a single coordinated delivery engine. + """ + receiver_ids = list_pending_receiver_ids_by_provider(ProviderType.OPENCODE_CLI.value) + + for terminal_id in receiver_ids: + try: + check_and_send_pending_messages(terminal_id, registry=registry) + except Exception as e: + logger.debug(f"OpenCode inbox poll failed for {terminal_id}: {e}") + + class LogFileHandler(FileSystemEventHandler): """Handler for terminal log file changes.""" diff --git a/test/api/test_api_endpoints.py b/test/api/test_api_endpoints.py index 46daf9ba9..5882bd912 100644 --- a/test/api/test_api_endpoints.py +++ b/test/api/test_api_endpoints.py @@ -11,7 +11,7 @@ import pytest -from cli_agent_orchestrator.api.main import app, flow_daemon +from cli_agent_orchestrator.api.main import app, flow_daemon, opencode_inbox_delivery_daemon from cli_agent_orchestrator.models.terminal import Terminal from cli_agent_orchestrator.utils.skills import SkillNameError @@ -846,6 +846,33 @@ async def test_flow_daemon_multiple_flows(self): assert mock_svc.execute_flow.call_count == 2 +class TestOpenCodeInboxDeliveryDaemon: + """Tests for the OpenCode inbox delivery poller task.""" + + @pytest.mark.asyncio + async def test_poller_runs_one_iteration_then_cancels(self): + """Poller sleeps, runs the sync poll in a thread, then handles cancellation.""" + sleep_calls = 0 + registry = MagicMock() + mock_to_thread = AsyncMock() + + async def fake_sleep(_seconds): + nonlocal sleep_calls + sleep_calls += 1 + if sleep_calls > 1: + raise asyncio.CancelledError + + with ( + patch("asyncio.sleep", new=fake_sleep), + patch("asyncio.to_thread", mock_to_thread), + ): + with pytest.raises(asyncio.CancelledError): + await opencode_inbox_delivery_daemon(registry) + + mock_to_thread.assert_awaited_once() + assert mock_to_thread.await_args.args[1] is registry + + # ── lifespan ───────────────────────────────────────────────────────── @@ -859,6 +886,9 @@ async def test_lifespan_startup_and_shutdown(self): mock_observer = MagicMock() + async def fake_daemon(): + await asyncio.sleep(3600) + with ( patch("cli_agent_orchestrator.api.main.setup_logging"), patch("cli_agent_orchestrator.api.main.init_db"), @@ -867,10 +897,7 @@ async def test_lifespan_startup_and_shutdown(self): "cli_agent_orchestrator.api.main.PollingObserver", return_value=mock_observer, ), - patch( - "cli_agent_orchestrator.api.main.flow_daemon", - return_value=asyncio.sleep(0), - ), + patch("cli_agent_orchestrator.api.main.flow_daemon", fake_daemon), ): async with lifespan(app): # Inside the lifespan — startup completed diff --git a/test/clients/test_database.py b/test/clients/test_database.py index 121cb8095..4a4911e72 100644 --- a/test/clients/test_database.py +++ b/test/clients/test_database.py @@ -26,6 +26,7 @@ get_terminal_metadata, init_db, list_flows, + list_pending_receiver_ids_by_provider, list_terminals_by_session, update_flow_enabled, update_flow_run_times, @@ -178,6 +179,25 @@ def test_list_terminals_by_session(self, mock_session_class): assert len(result) == 1 assert result[0]["id"] == "test123" + @patch("cli_agent_orchestrator.clients.database.SessionLocal") + def test_list_pending_receiver_ids_by_provider(self, mock_session_class): + """Test listing pending receivers for a specific provider.""" + mock_session = MagicMock() + mock_session.__enter__ = MagicMock(return_value=mock_session) + mock_session.__exit__ = MagicMock(return_value=False) + + mock_query = MagicMock() + mock_query.join.return_value.filter.return_value.distinct.return_value.all.return_value = [ + ("receiver-1",), + ("receiver-2",), + ] + mock_session.query.return_value = mock_query + mock_session_class.return_value = mock_session + + result = list_pending_receiver_ids_by_provider("opencode_cli") + + assert result == ["receiver-1", "receiver-2"] + @patch("cli_agent_orchestrator.clients.database.SessionLocal") def test_delete_terminals_by_session(self, mock_session_class): """Test deleting all terminals in a session.""" diff --git a/test/services/test_inbox_service.py b/test/services/test_inbox_service.py index 7944eec16..53a3f93e2 100644 --- a/test/services/test_inbox_service.py +++ b/test/services/test_inbox_service.py @@ -1,7 +1,7 @@ """Tests for the inbox service.""" from pathlib import Path -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, call, patch import pytest @@ -12,6 +12,7 @@ _get_log_tail, _has_idle_pattern, check_and_send_pending_messages, + poll_opencode_pending_messages, ) @@ -174,6 +175,35 @@ def test_message_send_failure( mock_update_status.assert_called_once_with(1, MessageStatus.FAILED) +class TestPollOpenCodePendingMessages: + """Tests for the temporary OpenCode inbox poller.""" + + @patch("cli_agent_orchestrator.services.inbox_service.check_and_send_pending_messages") + @patch("cli_agent_orchestrator.services.inbox_service.list_pending_receiver_ids_by_provider") + def test_polls_pending_opencode_receivers(self, mock_list_receivers, mock_check_send): + """Test poller attempts delivery for each pending OpenCode receiver.""" + mock_list_receivers.return_value = ["receiver-1", "receiver-2"] + + poll_opencode_pending_messages() + + mock_list_receivers.assert_called_once_with("opencode_cli") + assert mock_check_send.call_args_list == [ + call("receiver-1", registry=None), + call("receiver-2", registry=None), + ] + + @patch("cli_agent_orchestrator.services.inbox_service.check_and_send_pending_messages") + @patch("cli_agent_orchestrator.services.inbox_service.list_pending_receiver_ids_by_provider") + def test_survives_per_receiver_failure(self, mock_list_receivers, mock_check_send): + """Test one failed receiver does not stop the poll loop.""" + mock_list_receivers.return_value = ["receiver-1", "receiver-2"] + mock_check_send.side_effect = [Exception("tmux busy"), False] + + poll_opencode_pending_messages() + + assert mock_check_send.call_count == 2 + + class TestLogFileHandler: """Tests for LogFileHandler class.""" From 29a9d8b0cbcf625f3a19f41a732740a5ace4f040 Mon Sep 17 00:00:00 2001 From: patricka3125 Date: Tue, 28 Apr 2026 11:20:16 -0700 Subject: [PATCH 2/2] docs: update opencode polling limitation --- README.md | 2 +- docs/opencode-cli.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 3b542d7ca..974f40da8 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ Before using CAO, install at least one supported CLI agent tool: | **Gemini CLI** | [Provider docs](docs/gemini-cli.md) · [Installation](https://github.com/google-gemini/gemini-cli) | Google AI API key | | **Kimi CLI** | [Provider docs](docs/kimi-cli.md) · [Installation](https://platform.moonshot.cn/docs/kimi-cli) | Moonshot API key | | **GitHub Copilot CLI** | [Provider docs](docs/copilot-cli.md) · [Installation](https://github.com/features/copilot/cli) | GitHub auth | -| **OpenCode CLI** *(experimental — single-agent only, [#203](https://github.com/awslabs/cli-agent-orchestrator/issues/203))* | [Provider docs](docs/opencode-cli.md) · [Installation](https://opencode.ai) | Per-model API key | +| **OpenCode CLI** *(experimental — temporary inbox polling fallback for multi-agent callbacks, [#203](https://github.com/awslabs/cli-agent-orchestrator/issues/203))* | [Provider docs](docs/opencode-cli.md) · [Installation](https://opencode.ai) | Per-model API key | | **Q CLI** | [Installation](https://docs.aws.amazon.com/amazonq/latest/qdeveloper-ug/command-line.html) | AWS credentials | ## Quick Start diff --git a/docs/opencode-cli.md b/docs/opencode-cli.md index 736b1bdc7..5f5febb56 100644 --- a/docs/opencode-cli.md +++ b/docs/opencode-cli.md @@ -1,6 +1,6 @@ # OpenCode CLI Provider -> ⚠️ **Experimental — single-agent flows only.** Multi-agent orchestration (assign / send_message back to a supervisor) is **not yet reliable** on `opencode_cli`: the supervisor's inbox can deadlock with `pending` messages after its turn settles. Single-agent and pure handoff workflows are unaffected. Tracking: [#203](https://github.com/awslabs/cli-agent-orchestrator/issues/203). +> ⚠️ **Experimental.** Multi-agent orchestration (`assign` / `send_message` back to a supervisor) now uses a temporary OpenCode-specific inbox polling fallback for [#203](https://github.com/awslabs/cli-agent-orchestrator/issues/203). This prevents pending supervisor inbox messages from getting stuck after the OpenCode TUI settles, but delivery is still not fully unified with the immediate and watchdog paths until [#115](https://github.com/awslabs/cli-agent-orchestrator/pull/115) replaces them with a single coordinator. ## Overview @@ -209,7 +209,7 @@ cao install my_agent --provider opencode_cli cao launch --agents my_agent --provider opencode_cli ``` -CAO's tracking issue for providing a runtime bypass (either via the temp-agent workaround or by consuming an upstream TUI flag once it ships): see the README's *experimental — single-agent only* notice. +Runtime permission bypass support remains unavailable for OpenCode's TUI mode. CAO can revisit this either with a temp-agent workaround or by consuming an upstream TUI flag once one ships. ### Project-local `opencode.json` override