diff --git a/.env.example b/.env.example index 8d8eec5..90a7651 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,7 @@ NTFY_URL=https://ntfy.example.com/your-channel NATS_URL=nats://localhost:4222 +AGENT_BUS_COMMS_DIR= +AGENT_BUS_CROSS_AGENT_RETENTION_DAYS= +AGENT_BUS_SESSION_RETENTION_DAYS= +AGENT_BUS_WEBHOOK_URL= +AGENT_BUS_WEBHOOK_EVENTS= diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..d11b0ae --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,49 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + test: + name: Test (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + python-version: ["3.11", "3.12", "3.13"] + + steps: + - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Import smoke test + run: python -c "import server; import reconcile; import agent_bus_client" + + audit: + name: Dependency audit + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1 + + - name: Set up Python + uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0 + with: + python-version: "3.12" + + - name: Install pip-audit + run: pip install pip-audit + + - name: Audit dependencies + run: pip-audit -r requirements.txt diff --git a/.gitleaks.toml b/.gitleaks.toml index 01c7e43..7c90631 100644 --- a/.gitleaks.toml +++ b/.gitleaks.toml @@ -7,7 +7,7 @@ useDefault = true [[rules]] id = "ntfy-real-channel" description = "Real ntfy channel URL (internal homelab endpoints)" -regex = '''https?://ntfy\.(your-domain|tadmstr)\.me/''' +regex = '''https?://ntfy\.(your-domain|tadmstr|glitch42)\.(me|com)/''' tags = ["homelab", "internal-domain"] [[rules]] diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..b2f30e2 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,30 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [Unreleased] + +### Added +- `AGENT_BUS_COMMS_DIR` env var — base directory for logs, artifacts, and cursors is now + configurable (default: `~/.claude/comms`). Propagated to `server.py`, `reconcile.py`, + `cleanup.sh`, and `ecosystem.config.js`. +- `AGENT_BUS_CROSS_AGENT_RETENTION_DAYS` and `AGENT_BUS_SESSION_RETENTION_DAYS` env vars — + log retention periods are now configurable in `cleanup.sh` (defaults: 90 and 30 days). +- `AGENT_BUS_WEBHOOK_URL` and `AGENT_BUS_WEBHOOK_EVENTS` env vars — fire-and-forget HTTP + webhook support; POSTs event JSON on matching event types (`*` fires on all events). +- `get_status` MCP tool — returns current server configuration and health: configured paths, + active integrations (NATS/ntfy/webhook), log date range, and today's event count. +- `agent_bus_client.py` — direct JSONL writer for non-MCP callers (PM2 cron jobs, task + dispatchers); uses the same event schema as the server. +- GitHub Actions CI workflow — import smoke test on Python 3.11/3.12/3.13 plus `pip-audit` + dependency security audit. +- CI badge in README. + +### Changed +- README: added optional components table (NATS, ntfy, webhook), full environment variables + reference table, `get_status` tool documentation, real clone URL, updated storage layout + to reference `$AGENT_BUS_COMMS_DIR`. +- Removed Helm-specific language from code comments. + +### Fixed +- Upgraded `fastmcp` from 3.1.0 to 3.2.4 to resolve CVE-2025-64340 and CVE-2026-27124. diff --git a/README.md b/README.md index ee51fd0..c890406 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,9 @@ # agent-bus -A FastMCP server that provides a unified inter-agent event log for multi-agent Claude Code setups. Agents log communication events (task handoffs, audit requests, build completions) via MCP tools; events are written to local JSONL files and federated to NATS JetStream for real-time observability. +[![Built with Claude Code](https://img.shields.io/badge/Built_with-Claude_Code-6B57FF?logo=claude&logoColor=white)](https://claude.ai/code) +[![CI](https://github.com/TadMSTR/agent-bus/actions/workflows/ci.yml/badge.svg)](https://github.com/TadMSTR/agent-bus/actions/workflows/ci.yml) + +A FastMCP server that provides a unified inter-agent event log for multi-agent Claude Code setups. Agents log communication events (task handoffs, audit requests, build completions) via MCP tools; events are written to local JSONL files and optionally federated to NATS JetStream for real-time observability. ## Why @@ -12,6 +15,16 @@ When multiple Claude Code agents run concurrently — a dev agent, a security ag - A background federation loop replays events to NATS JetStream for downstream consumers - A reconciler catches artifacts (build plans, audit requests, handoffs) that were created without a corresponding log event +## Optional Components + +| Component | What it adds | Required? | +|-----------|-------------|-----------| +| NATS JetStream | Real-time event federation; stream replay for downstream consumers | No — local JSONL log works standalone | +| ntfy | Push notifications for high-priority events (task failures, audit requests) | No — events are still logged without it | +| HTTP webhook | POST event JSON to any URL on matching events — integrates with n8n, Home Assistant, Make.com, or any custom API | No | + +The server operates fully without NATS, ntfy, and webhooks. Add them when you want real-time observability or push alerts. + ## Architecture ``` @@ -21,28 +34,29 @@ Claude Code Agent ▼ server.py (FastMCP, stdio transport) │ - ├── append to ~/.claude/comms/logs/YYYY-MM-DD-{scope}.jsonl + ├── append to $AGENT_BUS_COMMS_DIR/logs/YYYY-MM-DD-{scope}.jsonl ├── emit_nats() — inline publish to agent-bus.{hostname}.events - └── emit_ntfy() — push notification for high-priority events - (audit.requested, task.failed, task.routing-failed, handoff.created) + ├── emit_ntfy() — push notification for high-priority events + │ (audit.requested, task.failed, task.routing-failed, handoff.created) + └── emit_webhook() — fire-and-forget POST to AGENT_BUS_WEBHOOK_URL Background federation loop (every 30s): Read logs from file+offset cursor → publish unseen events to NATS (gap-fill for NATS downtime; inline emit handles real-time) reconcile.py (PM2 cron, every 5 min): - Scan ~/.claude/comms/artifacts/ for files newer than mtime cursor + Scan $AGENT_BUS_COMMS_DIR/artifacts/ for files newer than mtime cursor → log artifact.untracked events for each file not yet in today's log cleanup.sh (PM2 cron, 3:50 AM daily): - Delete cross-agent logs older than 90 days - Delete session logs older than 30 days + Delete cross-agent logs older than AGENT_BUS_CROSS_AGENT_RETENTION_DAYS (default 90) + Delete session logs older than AGENT_BUS_SESSION_RETENTION_DAYS (default 30) ``` ## Installation ```bash -git clone ~/repos/agent-bus +git clone https://github.com/TadMSTR/agent-bus ~/repos/agent-bus cd ~/repos/agent-bus python3 -m venv venv venv/bin/pip install -r requirements.txt @@ -74,6 +88,20 @@ Configure as an MCP server in your Claude Desktop or Claude Code settings: `NATS_URL` and `NTFY_URL` are optional — the server operates without them (local JSONL log only). +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `AGENT_BUS_COMMS_DIR` | `~/.claude/comms` | Base directory for logs, artifacts, and cursors | +| `NATS_URL` | `nats://localhost:4222` | NATS server URL (optional) | +| `NTFY_URL` | — | ntfy topic URL for push notifications (optional) | +| `AGENT_BUS_CROSS_AGENT_RETENTION_DAYS` | `90` | Days to retain cross-agent log files | +| `AGENT_BUS_SESSION_RETENTION_DAYS` | `30` | Days to retain session log files | +| `AGENT_BUS_WEBHOOK_URL` | — | URL to POST event JSON to (optional) | +| `AGENT_BUS_WEBHOOK_EVENTS` | — | Comma-separated event types to fire on, or `*` for all (optional) | + +Copy `.env.example` to `.env` and fill in the values you need. Blank values use the defaults shown above. + ## MCP Tools ### `log_event` @@ -111,6 +139,12 @@ Returns events most-recent-first. Retrieve a single event by UUID. +### `get_status` + +Returns the current server configuration and health: configured paths, which optional +integrations are active (NATS, ntfy, webhook), date range of available logs, and +event count for today. Use this to verify setup after installation. + ## Event Vocabulary Events that automatically route to `cross-agent` scope regardless of the `scope` parameter: @@ -139,7 +173,7 @@ For session-scoped events (memory flushes, skill executions, etc.), use `scope=" ## Storage Layout ``` -~/.claude/comms/ +$AGENT_BUS_COMMS_DIR/ (default: ~/.claude/comms) ├── logs/ │ ├── 2026-03-29-cross-agent.jsonl # inter-agent events │ └── 2026-03-29-session.jsonl # session-scoped events @@ -201,4 +235,4 @@ The client writes directly to the JSONL files using the same schema as the serve - Python 3.11+ - `fastmcp==3.1.0` - NATS CLI on PATH (optional, for federation) -- `curl` on PATH (optional, for ntfy notifications) +- `curl` on PATH (optional, for ntfy notifications and webhooks) diff --git a/agent_bus_client.py b/agent_bus_client.py new file mode 100644 index 0000000..1e79276 --- /dev/null +++ b/agent_bus_client.py @@ -0,0 +1,76 @@ +""" +agent_bus_client.py — direct JSONL writer for non-MCP callers. + +For Python scripts that can't call MCP directly (e.g. PM2 cron jobs, +task dispatchers), this module writes events to the same JSONL files +as the server — no MCP round-trip, no external dependency. + +Usage: + from agent_bus_client import log_event + + log_event( + event_type="task.dispatched", + source="task-dispatcher", + target="claudebox", + summary="Build phase 1 dispatched", + ) +""" +import json +import os +import uuid +from datetime import datetime, timezone +from pathlib import Path + +COMMS_DIR = Path(os.environ.get("AGENT_BUS_COMMS_DIR") or str(Path.home() / ".claude" / "comms")) +LOGS_DIR = COMMS_DIR / "logs" + +CROSS_AGENT_EVENTS = { + "task.dispatched", "task.approved", "task.completed", "task.failed", + "task.routing-failed", "handoff.created", "handoff.picked-up", + "handoff.completed", "audit.requested", "audit.completed", + "build-plan.created", "diagnose.started", "diagnose.completed", + "artifact.untracked", +} + + +def log_event( + event_type: str, + source: str, + summary: str, + scope: str = "cross-agent", + target: str | None = None, + artifact_path: str | None = None, + metadata: dict | None = None, +) -> dict: + """ + Write an event directly to the JSONL log. Returns the event dict with assigned id. + Uses the same schema as the MCP server — events written here are visible to + query_events and get_event tool calls. + """ + LOGS_DIR.mkdir(parents=True, exist_ok=True) + + hostname = os.uname().nodename + scope_resolved = "cross-agent" if event_type in CROSS_AGENT_EVENTS else scope + date = datetime.now(timezone.utc).strftime("%Y-%m-%d") + suffix = "cross-agent" if scope_resolved == "cross-agent" else "session" + log_path = LOGS_DIR / f"{date}-{suffix}.jsonl" + + event = { + "id": str(uuid.uuid4()), + "ts": datetime.now(timezone.utc).isoformat(), + "event": event_type, + "scope": scope_resolved, + "source": source, + "target": target, + "artifact_path": str(artifact_path) if artifact_path else None, + "summary": summary, + "hostname": hostname, + "metadata": metadata or {}, + } + + with open(log_path, "a") as f: + f.write(json.dumps(event, ensure_ascii=False) + "\n") + f.flush() + os.fsync(f.fileno()) + + return event diff --git a/cleanup.sh b/cleanup.sh index f834d55..ee4e31a 100755 --- a/cleanup.sh +++ b/cleanup.sh @@ -2,7 +2,10 @@ # cleanup.sh — prune agent-bus logs past retention window set -euo pipefail -LOGS_DIR="$HOME/.claude/comms/logs" -find "$LOGS_DIR" -name "*-cross-agent.jsonl" -mtime +90 -delete -find "$LOGS_DIR" -name "*-session.jsonl" -mtime +30 -delete +COMMS_DIR="${AGENT_BUS_COMMS_DIR:-$HOME/.claude/comms}" +LOGS_DIR="$COMMS_DIR/logs" +CROSS_AGENT_RETENTION="${AGENT_BUS_CROSS_AGENT_RETENTION_DAYS:-90}" +SESSION_RETENTION="${AGENT_BUS_SESSION_RETENTION_DAYS:-30}" +find "$LOGS_DIR" -name "*-cross-agent.jsonl" -mtime +"$CROSS_AGENT_RETENTION" -delete +find "$LOGS_DIR" -name "*-session.jsonl" -mtime +"$SESSION_RETENTION" -delete echo "agent-bus-cleanup: done $(date -u +%Y-%m-%dT%H:%M:%SZ)" diff --git a/ecosystem.config.js b/ecosystem.config.js index 9db815e..1f2cc09 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -28,6 +28,9 @@ module.exports = { PYTHONUNBUFFERED: '1', NTFY_URL: envVars.NTFY_URL || '', NATS_URL: envVars.NATS_URL || 'nats://localhost:4222', + AGENT_BUS_COMMS_DIR: envVars.AGENT_BUS_COMMS_DIR || '', + AGENT_BUS_WEBHOOK_URL: envVars.AGENT_BUS_WEBHOOK_URL || '', + AGENT_BUS_WEBHOOK_EVENTS: envVars.AGENT_BUS_WEBHOOK_EVENTS || '', }, }, { @@ -38,6 +41,10 @@ module.exports = { cron_restart: '*/5 * * * *', autorestart: false, watch: false, + env: { + PYTHONUNBUFFERED: '1', + AGENT_BUS_COMMS_DIR: envVars.AGENT_BUS_COMMS_DIR || '', + }, }, { name: 'agent-bus-cleanup', @@ -47,6 +54,11 @@ module.exports = { cron_restart: '50 3 * * *', autorestart: false, watch: false, + env: { + AGENT_BUS_COMMS_DIR: envVars.AGENT_BUS_COMMS_DIR || '', + AGENT_BUS_CROSS_AGENT_RETENTION_DAYS: envVars.AGENT_BUS_CROSS_AGENT_RETENTION_DAYS || '', + AGENT_BUS_SESSION_RETENTION_DAYS: envVars.AGENT_BUS_SESSION_RETENTION_DAYS || '', + }, }, ], }; diff --git a/reconcile.py b/reconcile.py index 98f0c92..6ee4d85 100644 --- a/reconcile.py +++ b/reconcile.py @@ -10,13 +10,13 @@ from datetime import datetime, timezone from pathlib import Path -COMMS_DIR = Path.home() / ".claude" / "comms" +COMMS_DIR = Path(os.environ.get("AGENT_BUS_COMMS_DIR") or str(Path.home() / ".claude" / "comms")) ARTIFACTS_DIR = COMMS_DIR / "artifacts" LOGS_DIR = COMMS_DIR / "logs" CURSOR_FILE = COMMS_DIR / ".reconcile-cursor" HOSTNAME = os.uname().nodename -# Self-healing: create log dir if missing (e.g. fresh Helm host) +# Self-healing: create log dir if missing on first run LOGS_DIR.mkdir(parents=True, exist_ok=True) diff --git a/requirements.txt b/requirements.txt index b54577a..69b2c6c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -fastmcp==3.1.0 +fastmcp==3.2.4 diff --git a/server.py b/server.py index 4e35b7a..483858a 100644 --- a/server.py +++ b/server.py @@ -9,10 +9,10 @@ from fastmcp import FastMCP -COMMS_DIR = Path.home() / ".claude" / "comms" +COMMS_DIR = Path(os.environ.get("AGENT_BUS_COMMS_DIR") or str(Path.home() / ".claude" / "comms")) LOGS_DIR = COMMS_DIR / "logs" -# Ensure log directory exists on first run (self-healing for fresh Helm deployments) +# Ensure log directory exists on first run LOGS_DIR.mkdir(parents=True, exist_ok=True) @@ -29,6 +29,10 @@ async def lifespan(server): HOSTNAME = os.uname().nodename NTFY_URL = os.environ.get("NTFY_URL", "") NATS_URL = os.environ.get("NATS_URL", "nats://localhost:4222") +WEBHOOK_URL = os.environ.get("AGENT_BUS_WEBHOOK_URL", "") +WEBHOOK_EVENTS = set( + e.strip() for e in os.environ.get("AGENT_BUS_WEBHOOK_EVENTS", "").split(",") if e.strip() +) CROSS_AGENT_EVENTS = { "task.dispatched", "task.approved", "task.completed", "task.failed", @@ -63,13 +67,17 @@ def emit_ntfy(event: dict) -> None: if not NTFY_URL: return try: + # Strip \r\n from interpolated fields to prevent header injection + def _clean(s: str) -> str: + return s.replace("\r", "").replace("\n", " ") + subprocess.run( [ "curl", "-s", "-o", "/dev/null", "-X", "POST", NTFY_URL, - "-H", f"Title: agent-bus: {event['event']}", + "-H", f"Title: agent-bus: {_clean(event['event'])}", "-H", "Priority: default", "-H", "Tags: agent", - "-d", f"{event['source']} → {event.get('target') or 'n/a'}: {event['summary']}", + "-d", f"{_clean(event['source'])} → {_clean(event.get('target') or 'n/a')}: {_clean(event['summary'])}", ], timeout=5, capture_output=True, @@ -90,6 +98,26 @@ def emit_nats(event: dict) -> None: pass # NATS unavailable — local log is authoritative +def emit_webhook(event: dict) -> None: + if not WEBHOOK_URL: + return + # "*" in WEBHOOK_EVENTS matches all event types + if WEBHOOK_EVENTS and event["event"] not in WEBHOOK_EVENTS and "*" not in WEBHOOK_EVENTS: + return + try: + subprocess.run( + [ + "curl", "-s", "-o", "/dev/null", "-X", "POST", WEBHOOK_URL, + "-H", "Content-Type: application/json", + "-d", json.dumps(event), + ], + timeout=5, + capture_output=True, + ) + except Exception: + pass # webhook failure never blocks event logging + + @mcp.tool() def log_event( event_type: str, @@ -126,6 +154,7 @@ def log_event( emit_ntfy(event) emit_nats(event) + emit_webhook(event) return {"id": event["id"], "logged": True, "scope": scope_resolved} @@ -186,6 +215,47 @@ def get_event(event_id: str) -> dict | None: return None +@mcp.tool() +def get_status() -> dict: + """ + Return the current configuration and health of the agent-bus server. + Useful for verifying setup after installation. + """ + # Collect log file info + log_files = sorted(LOGS_DIR.glob("*.jsonl")) if LOGS_DIR.exists() else [] + date_range = None + if log_files: + first = log_files[0].stem.split("-cross-agent")[0].split("-session")[0] + last = log_files[-1].stem.split("-cross-agent")[0].split("-session")[0] + date_range = {"first": first, "last": last, "files": len(log_files)} + + # Count today's events + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + today_count = 0 + for path in (LOGS_DIR.glob(f"{today}-*.jsonl") if LOGS_DIR.exists() else []): + try: + today_count += sum(1 for line in path.read_text().splitlines() if line.strip()) + except Exception: + pass + + return { + "comms_dir": str(COMMS_DIR), + "logs_dir": str(LOGS_DIR), + "hostname": HOSTNAME, + "integrations": { + "nats": {"enabled": bool(NATS_URL), "url": NATS_URL or None}, + "ntfy": {"enabled": bool(NTFY_URL), "url": NTFY_URL or None}, + "webhook": { + "enabled": bool(WEBHOOK_URL), + "url": WEBHOOK_URL or None, + "events": list(WEBHOOK_EVENTS) if WEBHOOK_EVENTS else ["*"] if WEBHOOK_URL else [], + }, + }, + "logs": date_range, + "events_today": today_count, + } + + # ── Federation background task ───────────────────────────────────────────────── # Note: emit_nats() is called inline on every log_event(). The federation loop # replays events from the file+offset cursor — events already published inline will be