Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
NTFY_URL=https://ntfy.example.com/your-channel
NATS_URL=nats://localhost:4222
AGENT_BUS_COMMS_DIR=
AGENT_BUS_CROSS_AGENT_RETENTION_DAYS=
AGENT_BUS_SESSION_RETENTION_DAYS=
AGENT_BUS_WEBHOOK_URL=
AGENT_BUS_WEBHOOK_EVENTS=
49 changes: 49 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: CI

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
test:
name: Test (Python ${{ matrix.python-version }})
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
python-version: ["3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: pip install -r requirements.txt

- name: Import smoke test
run: python -c "import server; import reconcile; import agent_bus_client"

audit:
name: Dependency audit
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1

- name: Set up Python
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.12"

- name: Install pip-audit
run: pip install pip-audit

- name: Audit dependencies
run: pip-audit -r requirements.txt
2 changes: 1 addition & 1 deletion .gitleaks.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ useDefault = true
[[rules]]
id = "ntfy-real-channel"
description = "Real ntfy channel URL (internal homelab endpoints)"
regex = '''https?://ntfy\.(your-domain|tadmstr)\.me/'''
regex = '''https?://ntfy\.(your-domain|tadmstr|glitch42)\.(me|com)/'''
tags = ["homelab", "internal-domain"]

[[rules]]
Expand Down
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Changelog

All notable changes to this project will be documented in this file.

## [Unreleased]

### Added
- `AGENT_BUS_COMMS_DIR` env var — base directory for logs, artifacts, and cursors is now
configurable (default: `~/.claude/comms`). Propagated to `server.py`, `reconcile.py`,
`cleanup.sh`, and `ecosystem.config.js`.
- `AGENT_BUS_CROSS_AGENT_RETENTION_DAYS` and `AGENT_BUS_SESSION_RETENTION_DAYS` env vars —
log retention periods are now configurable in `cleanup.sh` (defaults: 90 and 30 days).
- `AGENT_BUS_WEBHOOK_URL` and `AGENT_BUS_WEBHOOK_EVENTS` env vars — fire-and-forget HTTP
webhook support; POSTs event JSON on matching event types (`*` fires on all events).
- `get_status` MCP tool — returns current server configuration and health: configured paths,
active integrations (NATS/ntfy/webhook), log date range, and today's event count.
- `agent_bus_client.py` — direct JSONL writer for non-MCP callers (PM2 cron jobs, task
dispatchers); uses the same event schema as the server.
- GitHub Actions CI workflow — import smoke test on Python 3.11/3.12/3.13 plus `pip-audit`
dependency security audit.
- CI badge in README.

### Changed
- README: added optional components table (NATS, ntfy, webhook), full environment variables
reference table, `get_status` tool documentation, real clone URL, updated storage layout
to reference `$AGENT_BUS_COMMS_DIR`.
- Removed Helm-specific language from code comments.

### Fixed
- Upgraded `fastmcp` from 3.1.0 to 3.2.4 to resolve CVE-2025-64340 and CVE-2026-27124.
54 changes: 44 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# agent-bus

A FastMCP server that provides a unified inter-agent event log for multi-agent Claude Code setups. Agents log communication events (task handoffs, audit requests, build completions) via MCP tools; events are written to local JSONL files and federated to NATS JetStream for real-time observability.
[![Built with Claude Code](https://img.shields.io/badge/Built_with-Claude_Code-6B57FF?logo=claude&logoColor=white)](https://claude.ai/code)
[![CI](https://github.com/TadMSTR/agent-bus/actions/workflows/ci.yml/badge.svg)](https://github.com/TadMSTR/agent-bus/actions/workflows/ci.yml)

A FastMCP server that provides a unified inter-agent event log for multi-agent Claude Code setups. Agents log communication events (task handoffs, audit requests, build completions) via MCP tools; events are written to local JSONL files and optionally federated to NATS JetStream for real-time observability.

## Why

Expand All @@ -12,6 +15,16 @@ When multiple Claude Code agents run concurrently — a dev agent, a security ag
- A background federation loop replays events to NATS JetStream for downstream consumers
- A reconciler catches artifacts (build plans, audit requests, handoffs) that were created without a corresponding log event

## Optional Components

| Component | What it adds | Required? |
|-----------|-------------|-----------|
| NATS JetStream | Real-time event federation; stream replay for downstream consumers | No — local JSONL log works standalone |
| ntfy | Push notifications for high-priority events (task failures, audit requests) | No — events are still logged without it |
| HTTP webhook | POST event JSON to any URL on matching events — integrates with n8n, Home Assistant, Make.com, or any custom API | No |

The server operates fully without NATS, ntfy, and webhooks. Add them when you want real-time observability or push alerts.

## Architecture

```
Expand All @@ -21,28 +34,29 @@ Claude Code Agent
server.py (FastMCP, stdio transport)
├── append to ~/.claude/comms/logs/YYYY-MM-DD-{scope}.jsonl
├── append to $AGENT_BUS_COMMS_DIR/logs/YYYY-MM-DD-{scope}.jsonl
├── emit_nats() — inline publish to agent-bus.{hostname}.events
└── emit_ntfy() — push notification for high-priority events
(audit.requested, task.failed, task.routing-failed, handoff.created)
├── emit_ntfy() — push notification for high-priority events
│ (audit.requested, task.failed, task.routing-failed, handoff.created)
└── emit_webhook() — fire-and-forget POST to AGENT_BUS_WEBHOOK_URL

Background federation loop (every 30s):
Read logs from file+offset cursor → publish unseen events to NATS
(gap-fill for NATS downtime; inline emit handles real-time)

reconcile.py (PM2 cron, every 5 min):
Scan ~/.claude/comms/artifacts/ for files newer than mtime cursor
Scan $AGENT_BUS_COMMS_DIR/artifacts/ for files newer than mtime cursor
→ log artifact.untracked events for each file not yet in today's log

cleanup.sh (PM2 cron, 3:50 AM daily):
Delete cross-agent logs older than 90 days
Delete session logs older than 30 days
Delete cross-agent logs older than AGENT_BUS_CROSS_AGENT_RETENTION_DAYS (default 90)
Delete session logs older than AGENT_BUS_SESSION_RETENTION_DAYS (default 30)
```

## Installation

```bash
git clone <repo> ~/repos/agent-bus
git clone https://github.com/TadMSTR/agent-bus ~/repos/agent-bus
cd ~/repos/agent-bus
python3 -m venv venv
venv/bin/pip install -r requirements.txt
Expand Down Expand Up @@ -74,6 +88,20 @@ Configure as an MCP server in your Claude Desktop or Claude Code settings:

`NATS_URL` and `NTFY_URL` are optional — the server operates without them (local JSONL log only).

## Environment Variables

| Variable | Default | Description |
|----------|---------|-------------|
| `AGENT_BUS_COMMS_DIR` | `~/.claude/comms` | Base directory for logs, artifacts, and cursors |
| `NATS_URL` | `nats://localhost:4222` | NATS server URL (optional) |
| `NTFY_URL` | — | ntfy topic URL for push notifications (optional) |
| `AGENT_BUS_CROSS_AGENT_RETENTION_DAYS` | `90` | Days to retain cross-agent log files |
| `AGENT_BUS_SESSION_RETENTION_DAYS` | `30` | Days to retain session log files |
| `AGENT_BUS_WEBHOOK_URL` | — | URL to POST event JSON to (optional) |
| `AGENT_BUS_WEBHOOK_EVENTS` | — | Comma-separated event types to fire on, or `*` for all (optional) |

Copy `.env.example` to `.env` and fill in the values you need. Blank values use the defaults shown above.

## MCP Tools

### `log_event`
Expand Down Expand Up @@ -111,6 +139,12 @@ Returns events most-recent-first.

Retrieve a single event by UUID.

### `get_status`

Returns the current server configuration and health: configured paths, which optional
integrations are active (NATS, ntfy, webhook), date range of available logs, and
event count for today. Use this to verify setup after installation.

## Event Vocabulary

Events that automatically route to `cross-agent` scope regardless of the `scope` parameter:
Expand Down Expand Up @@ -139,7 +173,7 @@ For session-scoped events (memory flushes, skill executions, etc.), use `scope="
## Storage Layout

```
~/.claude/comms/
$AGENT_BUS_COMMS_DIR/ (default: ~/.claude/comms)
├── logs/
│ ├── 2026-03-29-cross-agent.jsonl # inter-agent events
│ └── 2026-03-29-session.jsonl # session-scoped events
Expand Down Expand Up @@ -201,4 +235,4 @@ The client writes directly to the JSONL files using the same schema as the serve
- Python 3.11+
- `fastmcp==3.1.0`
- NATS CLI on PATH (optional, for federation)
- `curl` on PATH (optional, for ntfy notifications)
- `curl` on PATH (optional, for ntfy notifications and webhooks)
76 changes: 76 additions & 0 deletions agent_bus_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
agent_bus_client.py — direct JSONL writer for non-MCP callers.

For Python scripts that can't call MCP directly (e.g. PM2 cron jobs,
task dispatchers), this module writes events to the same JSONL files
as the server — no MCP round-trip, no external dependency.

Usage:
from agent_bus_client import log_event

log_event(
event_type="task.dispatched",
source="task-dispatcher",
target="claudebox",
summary="Build phase 1 dispatched",
)
"""
import json
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path

COMMS_DIR = Path(os.environ.get("AGENT_BUS_COMMS_DIR") or str(Path.home() / ".claude" / "comms"))
LOGS_DIR = COMMS_DIR / "logs"

CROSS_AGENT_EVENTS = {
"task.dispatched", "task.approved", "task.completed", "task.failed",
"task.routing-failed", "handoff.created", "handoff.picked-up",
"handoff.completed", "audit.requested", "audit.completed",
"build-plan.created", "diagnose.started", "diagnose.completed",
"artifact.untracked",
}


def log_event(
event_type: str,
source: str,
summary: str,
scope: str = "cross-agent",
target: str | None = None,
artifact_path: str | None = None,
metadata: dict | None = None,
) -> dict:
"""
Write an event directly to the JSONL log. Returns the event dict with assigned id.
Uses the same schema as the MCP server — events written here are visible to
query_events and get_event tool calls.
"""
LOGS_DIR.mkdir(parents=True, exist_ok=True)

hostname = os.uname().nodename
scope_resolved = "cross-agent" if event_type in CROSS_AGENT_EVENTS else scope
date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
suffix = "cross-agent" if scope_resolved == "cross-agent" else "session"
log_path = LOGS_DIR / f"{date}-{suffix}.jsonl"

event = {
"id": str(uuid.uuid4()),
"ts": datetime.now(timezone.utc).isoformat(),
"event": event_type,
"scope": scope_resolved,
"source": source,
"target": target,
"artifact_path": str(artifact_path) if artifact_path else None,
"summary": summary,
"hostname": hostname,
"metadata": metadata or {},
}

with open(log_path, "a") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
f.flush()
os.fsync(f.fileno())

return event
9 changes: 6 additions & 3 deletions cleanup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
# cleanup.sh — prune agent-bus logs past retention window
set -euo pipefail

LOGS_DIR="$HOME/.claude/comms/logs"
find "$LOGS_DIR" -name "*-cross-agent.jsonl" -mtime +90 -delete
find "$LOGS_DIR" -name "*-session.jsonl" -mtime +30 -delete
COMMS_DIR="${AGENT_BUS_COMMS_DIR:-$HOME/.claude/comms}"
LOGS_DIR="$COMMS_DIR/logs"
CROSS_AGENT_RETENTION="${AGENT_BUS_CROSS_AGENT_RETENTION_DAYS:-90}"
SESSION_RETENTION="${AGENT_BUS_SESSION_RETENTION_DAYS:-30}"
find "$LOGS_DIR" -name "*-cross-agent.jsonl" -mtime +"$CROSS_AGENT_RETENTION" -delete
find "$LOGS_DIR" -name "*-session.jsonl" -mtime +"$SESSION_RETENTION" -delete
echo "agent-bus-cleanup: done $(date -u +%Y-%m-%dT%H:%M:%SZ)"
12 changes: 12 additions & 0 deletions ecosystem.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ module.exports = {
PYTHONUNBUFFERED: '1',
NTFY_URL: envVars.NTFY_URL || '',
NATS_URL: envVars.NATS_URL || 'nats://localhost:4222',
AGENT_BUS_COMMS_DIR: envVars.AGENT_BUS_COMMS_DIR || '',
AGENT_BUS_WEBHOOK_URL: envVars.AGENT_BUS_WEBHOOK_URL || '',
AGENT_BUS_WEBHOOK_EVENTS: envVars.AGENT_BUS_WEBHOOK_EVENTS || '',
},
},
{
Expand All @@ -38,6 +41,10 @@ module.exports = {
cron_restart: '*/5 * * * *',
autorestart: false,
watch: false,
env: {
PYTHONUNBUFFERED: '1',
AGENT_BUS_COMMS_DIR: envVars.AGENT_BUS_COMMS_DIR || '',
},
},
{
name: 'agent-bus-cleanup',
Expand All @@ -47,6 +54,11 @@ module.exports = {
cron_restart: '50 3 * * *',
autorestart: false,
watch: false,
env: {
AGENT_BUS_COMMS_DIR: envVars.AGENT_BUS_COMMS_DIR || '',
AGENT_BUS_CROSS_AGENT_RETENTION_DAYS: envVars.AGENT_BUS_CROSS_AGENT_RETENTION_DAYS || '',
AGENT_BUS_SESSION_RETENTION_DAYS: envVars.AGENT_BUS_SESSION_RETENTION_DAYS || '',
},
},
],
};
4 changes: 2 additions & 2 deletions reconcile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
from datetime import datetime, timezone
from pathlib import Path

COMMS_DIR = Path.home() / ".claude" / "comms"
COMMS_DIR = Path(os.environ.get("AGENT_BUS_COMMS_DIR") or str(Path.home() / ".claude" / "comms"))
ARTIFACTS_DIR = COMMS_DIR / "artifacts"
LOGS_DIR = COMMS_DIR / "logs"
CURSOR_FILE = COMMS_DIR / ".reconcile-cursor"
HOSTNAME = os.uname().nodename

# Self-healing: create log dir if missing (e.g. fresh Helm host)
# Self-healing: create log dir if missing on first run
LOGS_DIR.mkdir(parents=True, exist_ok=True)


Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
fastmcp==3.1.0
fastmcp==3.2.4
Loading
Loading