Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Before using CAO, install at least one supported CLI agent tool:
| **Gemini CLI** | [Provider docs](docs/gemini-cli.md) · [Installation](https://github.com/google-gemini/gemini-cli) | Google AI API key |
| **Kimi CLI** | [Provider docs](docs/kimi-cli.md) · [Installation](https://platform.moonshot.cn/docs/kimi-cli) | Moonshot API key |
| **GitHub Copilot CLI** | [Provider docs](docs/copilot-cli.md) · [Installation](https://github.com/features/copilot/cli) | GitHub auth |
| **OpenCode CLI** *(experimental — single-agent only, [#203](https://github.com/awslabs/cli-agent-orchestrator/issues/203))* | [Provider docs](docs/opencode-cli.md) · [Installation](https://opencode.ai) | Per-model API key |
| **OpenCode CLI** *(experimental — temporary inbox polling fallback for multi-agent callbacks, [#203](https://github.com/awslabs/cli-agent-orchestrator/issues/203))* | [Provider docs](docs/opencode-cli.md) · [Installation](https://opencode.ai) | Per-model API key |
| **Q CLI** | [Installation](https://docs.aws.amazon.com/amazonq/latest/qdeveloper-ug/command-line.html) | AWS credentials |

## Quick Start
Expand Down
4 changes: 2 additions & 2 deletions docs/opencode-cli.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# OpenCode CLI Provider

> ⚠️ **Experimental — single-agent flows only.** Multi-agent orchestration (assign / send_message back to a supervisor) is **not yet reliable** on `opencode_cli`: the supervisor's inbox can deadlock with `pending` messages after its turn settles. Single-agent and pure handoff workflows are unaffected. Tracking: [#203](https://github.com/awslabs/cli-agent-orchestrator/issues/203).
> ⚠️ **Experimental.** Multi-agent orchestration (`assign` / `send_message` back to a supervisor) now uses a temporary OpenCode-specific inbox polling fallback for [#203](https://github.com/awslabs/cli-agent-orchestrator/issues/203). This prevents pending supervisor inbox messages from getting stuck after the OpenCode TUI settles, but delivery is still not fully unified with the immediate and watchdog paths until [#115](https://github.com/awslabs/cli-agent-orchestrator/pull/115) replaces them with a single coordinator.

## Overview

Expand Down Expand Up @@ -209,7 +209,7 @@ cao install my_agent --provider opencode_cli
cao launch --agents my_agent --provider opencode_cli
```

CAO's tracking issue for providing a runtime bypass (either via the temp-agent workaround or by consuming an upstream TUI flag once it ships): see the README's *experimental — single-agent only* notice.
Runtime permission bypass support remains unavailable for OpenCode's TUI mode. CAO can revisit this either with a temp-agent workaround or by consuming an upstream TUI flag once one ships.

### Project-local `opencode.json` override

Expand Down
22 changes: 22 additions & 0 deletions src/cli_agent_orchestrator/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ async def flow_daemon():
await asyncio.sleep(60)


async def opencode_inbox_delivery_daemon(registry: PluginRegistry) -> None:
"""Background task to wake OpenCode inbox delivery for pending messages."""
logger.info("OpenCode inbox delivery poller started")
while True:
await asyncio.sleep(INBOX_POLLING_INTERVAL)
try:
await asyncio.to_thread(inbox_service.poll_opencode_pending_messages, registry)
except Exception:
logger.exception("OpenCode inbox delivery poller error")


# Response Models
class TerminalOutputResponse(BaseModel):
output: str
Expand Down Expand Up @@ -138,6 +149,10 @@ async def lifespan(app: FastAPI):
# Start flow daemon as background task
daemon_task = asyncio.create_task(flow_daemon())

# Start temporary OpenCode inbox poller. GH #115 tracks replacing this
# provider-specific wakeup path with a unified delivery engine.
opencode_inbox_task = asyncio.create_task(opencode_inbox_delivery_daemon(registry))

# Start inbox watcher
inbox_observer = PollingObserver(timeout=INBOX_POLLING_INTERVAL)
inbox_observer.schedule(LogFileHandler(registry), str(TERMINAL_LOG_DIR), recursive=False)
Expand All @@ -158,6 +173,13 @@ async def lifespan(app: FastAPI):
except asyncio.CancelledError:
pass

# Cancel OpenCode inbox poller on shutdown
opencode_inbox_task.cancel()
try:
await opencode_inbox_task
except asyncio.CancelledError:
pass

await registry.teardown()
logger.info("Shutting down CLI Agent Orchestrator server...")

Expand Down
16 changes: 16 additions & 0 deletions src/cli_agent_orchestrator/clients/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,22 @@ def list_all_terminals() -> List[Dict[str, Any]]:
]


def list_pending_receiver_ids_by_provider(provider: str) -> List[str]:
"""List receiver terminal IDs with pending messages for a specific provider."""
with SessionLocal() as db:
rows = (
db.query(InboxModel.receiver_id)
.join(TerminalModel, TerminalModel.id == InboxModel.receiver_id)
.filter(
TerminalModel.provider == provider,
InboxModel.status == MessageStatus.PENDING.value,
)
.distinct()
.all()
)
return [row[0] for row in rows]


def delete_terminal(terminal_id: str) -> bool:
"""Delete terminal metadata."""
with SessionLocal() as db:
Expand Down
25 changes: 24 additions & 1 deletion src/cli_agent_orchestrator/services/inbox_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@

from watchdog.events import FileModifiedEvent, FileSystemEventHandler

from cli_agent_orchestrator.clients.database import get_pending_messages, update_message_status
from cli_agent_orchestrator.clients.database import (
get_pending_messages,
list_pending_receiver_ids_by_provider,
update_message_status,
)
from cli_agent_orchestrator.constants import TERMINAL_LOG_DIR
from cli_agent_orchestrator.models.inbox import MessageStatus, OrchestrationType
from cli_agent_orchestrator.models.provider import ProviderType
from cli_agent_orchestrator.models.terminal import TerminalStatus
from cli_agent_orchestrator.plugins import PluginRegistry
from cli_agent_orchestrator.providers.manager import provider_manager
Expand Down Expand Up @@ -132,6 +137,24 @@ def check_and_send_pending_messages(
raise


def poll_opencode_pending_messages(registry: PluginRegistry | None = None) -> None:
"""Poll OpenCode terminals for pending inbox messages.

This is a temporary OpenCode-specific wakeup path for providers whose
pipe-pane logs do not change after the TUI settles. It intentionally reuses
the existing delivery helper and inherits its known duplicate-wakeup race
with immediate and watchdog delivery paths. GH #115 tracks replacing these
paths with a single coordinated delivery engine.
"""
receiver_ids = list_pending_receiver_ids_by_provider(ProviderType.OPENCODE_CLI.value)

for terminal_id in receiver_ids:
try:
check_and_send_pending_messages(terminal_id, registry=registry)
except Exception as e:
logger.debug(f"OpenCode inbox poll failed for {terminal_id}: {e}")


class LogFileHandler(FileSystemEventHandler):
"""Handler for terminal log file changes."""

Expand Down
37 changes: 32 additions & 5 deletions test/api/test_api_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import pytest

from cli_agent_orchestrator.api.main import app, flow_daemon
from cli_agent_orchestrator.api.main import app, flow_daemon, opencode_inbox_delivery_daemon
from cli_agent_orchestrator.models.terminal import Terminal
from cli_agent_orchestrator.utils.skills import SkillNameError

Expand Down Expand Up @@ -849,6 +849,33 @@ async def test_flow_daemon_multiple_flows(self):
assert mock_svc.execute_flow.call_count == 2


class TestOpenCodeInboxDeliveryDaemon:
"""Tests for the OpenCode inbox delivery poller task."""

@pytest.mark.asyncio
async def test_poller_runs_one_iteration_then_cancels(self):
"""Poller sleeps, runs the sync poll in a thread, then handles cancellation."""
sleep_calls = 0
registry = MagicMock()
mock_to_thread = AsyncMock()

async def fake_sleep(_seconds):
nonlocal sleep_calls
sleep_calls += 1
if sleep_calls > 1:
raise asyncio.CancelledError

with (
patch("asyncio.sleep", new=fake_sleep),
patch("asyncio.to_thread", mock_to_thread),
):
with pytest.raises(asyncio.CancelledError):
await opencode_inbox_delivery_daemon(registry)

mock_to_thread.assert_awaited_once()
assert mock_to_thread.await_args.args[1] is registry


# ── lifespan ─────────────────────────────────────────────────────────


Expand All @@ -862,6 +889,9 @@ async def test_lifespan_startup_and_shutdown(self):

mock_observer = MagicMock()

async def fake_daemon():
await asyncio.sleep(3600)

with (
patch("cli_agent_orchestrator.api.main.setup_logging"),
patch("cli_agent_orchestrator.api.main.init_db"),
Expand All @@ -870,10 +900,7 @@ async def test_lifespan_startup_and_shutdown(self):
"cli_agent_orchestrator.api.main.PollingObserver",
return_value=mock_observer,
),
patch(
"cli_agent_orchestrator.api.main.flow_daemon",
return_value=asyncio.sleep(0),
),
patch("cli_agent_orchestrator.api.main.flow_daemon", fake_daemon),
):
async with lifespan(app):
# Inside the lifespan — startup completed
Expand Down
20 changes: 20 additions & 0 deletions test/clients/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
get_terminal_metadata,
init_db,
list_flows,
list_pending_receiver_ids_by_provider,
list_terminals_by_session,
update_flow_enabled,
update_flow_run_times,
Expand Down Expand Up @@ -178,6 +179,25 @@ def test_list_terminals_by_session(self, mock_session_class):
assert len(result) == 1
assert result[0]["id"] == "test123"

@patch("cli_agent_orchestrator.clients.database.SessionLocal")
def test_list_pending_receiver_ids_by_provider(self, mock_session_class):
"""Test listing pending receivers for a specific provider."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=False)

mock_query = MagicMock()
mock_query.join.return_value.filter.return_value.distinct.return_value.all.return_value = [
("receiver-1",),
("receiver-2",),
]
mock_session.query.return_value = mock_query
mock_session_class.return_value = mock_session

result = list_pending_receiver_ids_by_provider("opencode_cli")

assert result == ["receiver-1", "receiver-2"]

@patch("cli_agent_orchestrator.clients.database.SessionLocal")
def test_delete_terminals_by_session(self, mock_session_class):
"""Test deleting all terminals in a session."""
Expand Down
32 changes: 31 additions & 1 deletion test/services/test_inbox_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Tests for the inbox service."""

from pathlib import Path
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, call, patch

import pytest

Expand All @@ -12,6 +12,7 @@
_get_log_tail,
_has_idle_pattern,
check_and_send_pending_messages,
poll_opencode_pending_messages,
)


Expand Down Expand Up @@ -174,6 +175,35 @@ def test_message_send_failure(
mock_update_status.assert_called_once_with(1, MessageStatus.FAILED)


class TestPollOpenCodePendingMessages:
"""Tests for the temporary OpenCode inbox poller."""

@patch("cli_agent_orchestrator.services.inbox_service.check_and_send_pending_messages")
@patch("cli_agent_orchestrator.services.inbox_service.list_pending_receiver_ids_by_provider")
def test_polls_pending_opencode_receivers(self, mock_list_receivers, mock_check_send):
"""Test poller attempts delivery for each pending OpenCode receiver."""
mock_list_receivers.return_value = ["receiver-1", "receiver-2"]

poll_opencode_pending_messages()

mock_list_receivers.assert_called_once_with("opencode_cli")
assert mock_check_send.call_args_list == [
call("receiver-1", registry=None),
call("receiver-2", registry=None),
]

@patch("cli_agent_orchestrator.services.inbox_service.check_and_send_pending_messages")
@patch("cli_agent_orchestrator.services.inbox_service.list_pending_receiver_ids_by_provider")
def test_survives_per_receiver_failure(self, mock_list_receivers, mock_check_send):
"""Test one failed receiver does not stop the poll loop."""
mock_list_receivers.return_value = ["receiver-1", "receiver-2"]
mock_check_send.side_effect = [Exception("tmux busy"), False]

poll_opencode_pending_messages()

assert mock_check_send.call_count == 2


class TestLogFileHandler:
"""Tests for LogFileHandler class."""

Expand Down
Loading