diff --git a/.gitignore b/.gitignore index fd246c0..8576fdb 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,8 @@ yarn-error.log* # typescript *.tsbuildinfo next-env.d.ts + +# memory sidecar +/memory-agent/venv/ +/memory-agent/inbox/*.json +/memory-agent/inbox/failed/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..449d030 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,80 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Commands + +```bash +npm run dev # Development (NODE_ENV=development tsx server.ts) +npm start # Production (tsx server.ts) +npm run build # Next.js build +npm run lint # ESLint +``` + +No test suite exists. There are no migration scripts — migrations run inline at startup. + +## Architecture + +ChatLocal is a self-hosted AI chat app. The entry point is `server.ts`, which runs this startup sequence: +1. Load `.env` via dotenv +2. Run DB migrations (`src/lib/db/index.ts` → inline `sqlite.exec()` calls) +3. Initialize MCP servers from `mcp-servers.json` +4. Create HTTP server → attach Socket.IO → mount Next.js handler +5. Register Socket.IO event handlers (`src/server/socket.ts`) +6. Auto-setup and spawn memory sidecar (`memory-agent/agent.py`) + +### Backend + +**`server.ts`** — Entry point. Owns the HTTP server and orchestrates all initialization. + +**`src/lib/agent/loop.ts`** — The core engine. `runAgentLoop()` handles: +- Memory recall: keyword extraction → FTS5 search → system prompt injection (when `memoryEnabled=true`) +- Streaming completions from vLLM via OpenAI SDK +- Extracting `...` blocks into separate `thinking` fields +- Tool call loop (max 10 iterations) using MCP tools +- Context compression when token usage exceeds `contextThreshold` (default 0.8) +- Auto-session naming after first exchange +- Saving partial messages on abort +- Writing completed exchanges to `memory-agent/inbox/` for sidecar ingestion + +**`src/server/socket.ts`** — All Socket.IO event handlers. Listens for `send_message` (now with `memoryEnabled` field) and `cancel_generation`; emits `token`, `thinking_token`, `tool_call_start`, `tool_call_result`, `message_complete`, `generation_error`, `session_renamed`, `vllm_status`, `memory_recall_start`, `memory_recall_result`. + +**`src/lib/db/memory.ts`** — Read-only `better-sqlite3` connection to `memory.db`. Exports `searchMemories()`, `listMemories()`, `getMemory()`, `deleteMemory()`, `getMemoryHealth()`. + +**`memory-agent/agent.py`** — Python sidecar. Google ADK agents (IngestAgent, ConsolidateAgent) routed via LiteLLM to vLLM. Watches `inbox/` every 10s, consolidates every 30 min. Writes status to `sidecar_status` table in `memory.db`. + +**`src/lib/db/index.ts`** — SQLite setup (WAL mode, FTS5 search). All migrations are inline SQL strings in `runMigrations()`. Schema: `sessions`, `messages`, `settings` tables plus FTS5 virtual table with auto-sync triggers. + +**`src/lib/mcp/manager.ts`** — Singleton MCP client. Reads `mcp-servers.json`, spawns stdio processes, supports env var interpolation (`${VAR_NAME}`), and auto-restarts on disconnect. + +**`src/lib/vllm/client.ts`** — OpenAI SDK instance pointed at `VLLM_BASE_URL`. + +### Frontend + +Next.js App Router (`src/app/`). Key routes: +- `/chat/[sessionId]` — main chat UI +- `/settings` — model and UI configuration +- `/memories` — browse, search, manage stored memories; manual text + file upload +- `/api/models`, `/api/health`, `/api/sessions`, `/api/settings`, `/api/youtube`, `/api/webpage` +- `/api/memories` (GET list/search, POST manual), `/api/memories/[id]` (GET, DELETE), `/api/memories/upload` (POST), `/api/memory/health` (GET) + +State: Zustand (`src/stores/settings.ts`) for client-side settings/theme. Socket.IO module-level singleton in `src/hooks/useSocket.ts` persists across navigations. + +### Environment Variables + +| Variable | Default | Purpose | +|---|---|---| +| `VLLM_BASE_URL` | `http://localhost:8000` | vLLM (or Ollama/LM Studio) base URL | +| `PORT` | `3000` | HTTP server port | +| `DATABASE_PATH` | `./data/chatlocal.db` | SQLite file location | +| `MEMORY_DB_PATH` | `./data/memory.db` | SQLite file for memory storage (read by server.ts, written by sidecar) | +| `MEMORY_INBOX_PATH` | `./memory-agent/inbox` | Inbox directory watched by sidecar | +| `VLLM_MODEL` | (empty) | Model ID passed to sidecar for LiteLLM routing | + +### Key Patterns + +- **Singletons**: MCP manager, vLLM client, and Socket.IO client are module-level singletons. +- **DB migrations**: Add new migrations as additional `sqlite.exec()` calls in `runMigrations()` — existing migrations are idempotent (`CREATE TABLE IF NOT EXISTS`, `ADD COLUMN IF NOT EXISTS`). +- **Streaming**: Agent loop accumulates tokens and emits `token`/`thinking_token` socket events per delta. +- **MCP tools**: Adding tools requires only updating `mcp-servers.json` and restarting — no code changes. +- **`jan/`**: Reference project directory excluded from tsconfig, do not modify. diff --git a/DEPLOY.md b/DEPLOY.md new file mode 100644 index 0000000..d3f97bd --- /dev/null +++ b/DEPLOY.md @@ -0,0 +1,76 @@ +# Deployment + +## 1. Build + +```bash +cd /home/nurbot/ws/chatlocal +npm run build +``` + +## 2. systemd service + +Create `/etc/systemd/system/chatlocal.service`: + +```ini +[Unit] +Description=ChatLocal +After=network.target + +[Service] +Type=simple +User=nurbot +WorkingDirectory=/home/nurbot/ws/chatlocal +EnvironmentFile=/home/nurbot/ws/chatlocal/.env + +# nvm-managed node — must use full paths; tsx is local to the project +ExecStart=/home/nurbot/.nvm/versions/node/v22.17.0/bin/node \ + /home/nurbot/ws/chatlocal/node_modules/.bin/tsx \ + server.ts + +Restart=on-failure +RestartSec=5 + +StandardOutput=journal +StandardError=journal +SyslogIdentifier=chatlocal + +[Install] +WantedBy=multi-user.target +``` + +Enable and start: + +```bash +sudo systemctl daemon-reload +sudo systemctl enable --now chatlocal +sudo journalctl -u chatlocal -f +``` + +## 3. Deploy an update + +```bash +git pull +npm run build +sudo systemctl restart chatlocal +``` + +## 4. Development flow + +Stop the service to free the port, then run the dev server: + +```bash +sudo systemctl stop chatlocal +npm run dev +``` + +When done: + +```bash +npm run build +sudo systemctl start chatlocal +``` + +## Notes + +- **Node path**: systemd does not inherit the nvm `PATH`. If you upgrade Node via nvm, update the absolute paths in `ExecStart`. +- **Environment**: Variables are injected by systemd via `EnvironmentFile`. The `dotenv` call in `server.ts` is a harmless no-op in this case. diff --git a/chatlocal.service b/chatlocal.service new file mode 100644 index 0000000..f76da1a --- /dev/null +++ b/chatlocal.service @@ -0,0 +1,24 @@ +[Unit] +Description=ChatLocal +After=network.target + +[Service] +Type=simple +User=nurbot +WorkingDirectory=/home/nurbot/ws/chatlocal +EnvironmentFile=/home/nurbot/ws/chatlocal/.env + +# nvm-managed node — must use full paths; tsx is local to the project +ExecStart=/home/nurbot/.nvm/versions/node/v22.17.0/bin/node \ + /home/nurbot/ws/chatlocal/node_modules/.bin/tsx \ + server.ts + +Restart=on-failure +RestartSec=5 + +StandardOutput=journal +StandardError=journal +SyslogIdentifier=chatlocal + +[Install] +WantedBy=multi-user.target diff --git a/docs/memory.md b/docs/memory.md new file mode 100644 index 0000000..b41dba1 --- /dev/null +++ b/docs/memory.md @@ -0,0 +1,360 @@ +# Always-On Memory Integration Spec + +## Overview + +Integrate an always-on memory system into ChatLocal. The memory system continuously extracts facts from conversations, consolidates them into connected knowledge over time, and injects relevant memories into chat context when enabled. The system runs as a Python sidecar process alongside the existing Node.js server, communicating exclusively through a shared SQLite database and an inbox directory. + +## Reference Project + +The reference implementation lives at `/home/nurbot/ws/generative-ai/gemini/agents/always-on-memory-agent`. It uses Google ADK with Gemini to run three specialized agents (ingest, consolidate, query) with a Streamlit dashboard. This integration adapts that architecture for ChatLocal. + +--- + +## Architecture + +### Hybrid Approach + +The existing chat loop (`src/lib/agent/loop.ts`) remains on the OpenAI SDK against vLLM. A Python sidecar (`memory-agent/`) handles all background memory processing using Google ADK, with LiteLLM used as an in-process library to route ADK's model calls to the local vLLM instance. + +``` +┌──────────────────────────────────┐ ┌──────────────────────────────┐ +│ server.ts │ │ memory-agent/agent.py │ +│ ┌────────────┐ ┌─────────────┐ │ │ ┌───────────┐ │ +│ │ Agent Loop │ │ Socket.IO │ │ │ │ IngestAgent│ (ADK) │ +│ │ (OpenAI SDK)│ │ Handlers │ │ │ ├───────────┤ │ +│ └─────┬──────┘ └──────┬──────┘ │ │ │Consolidate│ (ADK) │ +│ │ │ │ │ │ Agent │ │ +│ │ writes JSON │ │ │ ├───────────┤ │ +│ ├───────────────►│ inbox/ │◄────│ │QueryAgent │ (ADK) │ +│ │ │ │ │ └─────┬─────┘ │ +│ reads │ │ │ │ │ LiteLLM (library) │ +│ ┌─────┴──────┐ │ │ │ ▼ │ +│ │ memory.db │◄───────┼─────────┼─────│ vLLM (OpenAI compat) │ +│ │ (SQLite) │ │ │ │ │ +│ └────────────┘ │ │ └──────────────────────────────┘ +└──────────────────────────────────┘ +``` + +### Communication: Shared DB + Inbox Directory + +No direct IPC between server.ts and the sidecar. All communication flows through: +- **`inbox/`** directory: server.ts writes exchange JSON files after each completed agent loop. The sidecar's file watcher picks them up for ingestion. +- **`memory.db`** (separate from `chatlocal.db`): The sidecar writes memories/consolidations. server.ts reads them for FTS5 recall. + +--- + +## Sidecar Process (`memory-agent/`) + +### Directory Structure + +``` +memory-agent/ +├── agent.py # Main sidecar entry point +├── requirements.txt # Python dependencies +├── setup.sh # Auto-setup script (venv + deps) +└── inbox/ # Watched directory for ingestion + └── failed/ # Files that failed processing +``` + +### Dependencies + +``` +google-adk>=1.0.0 +litellm>=1.0.0 +aiohttp>=3.9.0 +``` + +LiteLLM is used as an **in-process library** (not a proxy server). The ADK model parameter is configured to route through LiteLLM to the local vLLM instance. + +### Lifecycle + +1. **server.ts** spawns the sidecar Python process on startup (like MCP server spawning). +2. On first startup, if the Python venv doesn't exist, **server.ts auto-runs `setup.sh`** to create the venv and install dependencies. +3. The sidecar starts, initializes ADK agents, and begins: + - Watching `inbox/` for new files (poll every **10 seconds**) + - Running the consolidation timer (every **30 minutes**, fixed interval) +4. The sidecar lifecycle is tied to server.ts — starts and stops together. + +### Agents (Google ADK) + +Three specialized agents, matching the reference project: + +**IngestAgent** — Processes inbox files. For each file: +- Analyzes content (text, or stores non-text files for future multimodal processing) +- Generates a 1-2 sentence summary +- Extracts entities (people, companies, concepts) +- Assigns 2-4 topic tags +- Rates importance (0.0-1.0) +- Stores to `memory.db` via `store_memory` tool +- **Importance threshold: 0.3** — memories rated below this are discarded + +**ConsolidateAgent** — Runs on the 30-minute timer: +- Reads unconsolidated memories from `memory.db` +- **Minimum threshold: 3 memories** — skips if fewer than 3 unconsolidated memories exist +- Finds cross-cutting patterns and connections +- Creates synthesized summaries and insights +- Tracks connections between memories (relationship types: complements, contradicts, relates_to, depends_on) +- Marks processed memories as consolidated + +**QueryAgent** — Not directly used in the hybrid architecture (server.ts handles recall via FTS5), but available for the sidecar's internal use. + +### Inbox Processing + +- Files processed in **FIFO order** (by filesystem timestamp) +- Chat exchanges arrive as JSON: `{ userMessage, assistantResponse, sessionId, timestamp }` +- Uploaded files arrive as-is (text, images, PDFs, etc.) +- **Supported file types** (future-proofed to match reference): text (.txt, .md, .json, .csv, .log, .xml, .yaml, .yml), images (.png, .jpg, .jpeg, .gif, .webp, .bmp, .svg), audio (.mp3, .wav, .ogg, .flac, .m4a, .aac), video (.mp4, .webm, .mov, .avi, .mkv), documents (.pdf) + - Non-text files are stored but only fully processed when a multimodal model is available +- **On vLLM failure**: move file to `inbox/failed/`. User can manually move files back for reprocessing. + +### Health Endpoint + +The sidecar exposes health data via `memory.db` — a `sidecar_status` table with: +- Last heartbeat timestamp +- Last consolidation time +- Total memory count +- Pending (unconsolidated) count +- Sidecar version/status + +server.ts reads this table to serve `/api/memory/health`. + +--- + +## Database Schema (`memory.db`) + +Separate SQLite database file from `chatlocal.db`. Both processes access it (SQLite WAL mode for concurrency). + +```sql +-- Core memory storage +CREATE TABLE IF NOT EXISTS memories ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source TEXT DEFAULT '', -- 'chat', 'upload', filename + raw_text TEXT, -- Full exchange (user + assistant) or file content + summary TEXT, -- LLM-generated 1-2 sentence summary + entities TEXT DEFAULT '[]', -- JSON array of extracted entities + topics TEXT DEFAULT '[]', -- JSON array of topic tags + connections TEXT DEFAULT '[]', -- JSON array: [{linked_to: id, relationship: str}] + importance REAL DEFAULT 0.5, -- 0.0-1.0 scale + created_at TEXT, -- ISO 8601 + consolidated INTEGER DEFAULT 0 -- 0=pending, 1=consolidated +); + +-- FTS5 virtual table for full-text search +CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5( + summary, + entities, + topics, + raw_text, + content='memories', + content_rowid='id' +); + +-- Auto-sync triggers for FTS5 +-- (INSERT, UPDATE, DELETE triggers to keep FTS5 in sync with memories table) + +-- Consolidation insights +CREATE TABLE IF NOT EXISTS consolidations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_ids TEXT, -- JSON array of memory IDs + summary TEXT, -- Synthesized summary + insight TEXT, -- Key cross-cutting insight + created_at TEXT -- ISO 8601 +); + +-- File processing tracking +CREATE TABLE IF NOT EXISTS processed_files ( + path TEXT PRIMARY KEY, + processed_at TEXT -- ISO 8601 +); + +-- Sidecar health status +CREATE TABLE IF NOT EXISTS sidecar_status ( + key TEXT PRIMARY KEY, + value TEXT +); +``` + +### Connection Cleanup + +When a memory is deleted, all references to its ID in other memories' `connections` arrays are removed (referential integrity maintained). + +--- + +## Server-Side Changes (`server.ts` / Node.js) + +### Startup Sequence (additions) + +After existing initialization (DB migrations, MCP init, Socket.IO, Next.js): +1. Check for `memory-agent/venv/`. If missing, run `memory-agent/setup.sh`. +2. Spawn `memory-agent/agent.py` as a child process. +3. Log memory sidecar status. + +### Agent Loop Changes (`src/lib/agent/loop.ts`) + +After each completed agent loop (final assistant response generated): +1. Write a JSON file to `memory-agent/inbox/` containing: + ```json + { + "type": "chat_exchange", + "userMessage": "...", + "assistantResponse": "...", + "sessionId": "...", + "timestamp": "2026-03-07T..." + } + ``` +2. File named with timestamp for FIFO ordering: `{timestamp}_{sessionId}.json` + +### Memory Recall (before vLLM completion) + +When the memory toggle is ON, before sending the user's message to vLLM: + +1. **Keyword extraction**: Send the user's message to vLLM with a fast extraction prompt ("Extract 3-5 search keywords from this message"). Uses the **same vLLM model** as chat. +2. **Emit socket event**: `memory_recall_start` with `{ query: extractedKeywords }` +3. **FTS5 search**: Query `memories_fts` in `memory.db` using extracted keywords. +4. **Rank results**: Combine FTS5 relevance score with importance rating. Return **top 10** memories. +5. **Emit socket event**: `memory_recall_result` with the matched memories (or empty array if none found). +6. **Inject into system prompt**: Append recalled memories to the system prompt as structured context: + ``` + ## Recalled Memories + [Memory #1] (importance: 0.85): Summary text here + [Memory #2] (importance: 0.72): Summary text here + ... + ``` +7. Proceed with normal vLLM completion. + +If FTS5 returns zero results, still emit `memory_recall_result` with an empty array (UI shows "no memories found"). + +### New API Routes + +| Route | Method | Purpose | +|---|---|---| +| `/api/memory/health` | GET | Reads `sidecar_status` table from memory.db. Returns last heartbeat, consolidation time, memory count, pending count. | +| `/api/memories` | GET | List all memories with pagination. Supports `?q=` for FTS5 search. | +| `/api/memories/:id` | GET | Get single memory with full details. | +| `/api/memories/:id` | DELETE | Delete memory. Clean up connections in other memories. | +| `/api/memories/upload` | POST | Accept file upload, save to `memory-agent/inbox/`. | +| `/api/memories/manual` | POST | Accept `{ text, source? }`, write as JSON to `memory-agent/inbox/`. | + +--- + +## Frontend Changes + +### Memory Toggle (Chat Input Bar) + +- **Icon**: Brain icon (from lucide-react) +- **Placement**: In the chat input bar, near the send button +- **Behavior**: Toggles memory recall ON/OFF. When ON (colored), recalled memories are injected into context before each message. When OFF (gray), no memory recall occurs. +- **State**: **Defaults to ON**. Does not persist across browser sessions — resets to ON on page load. +- **No count badge** on the icon. + +### Memory Recall Display (Chat Messages) + +Matches the existing `tool_call_start` / `tool_call_result` UI pattern: + +1. When `memory_recall_start` event received, show a collapsible block: + ``` + Searching memories for: "extracted keywords here" + ``` +2. When `memory_recall_result` event received, populate the block with results: + - Each memory shown with summary and importance score + - If no memories found, show "No relevant memories found" +3. Block is **collapsible/expandable**, defaulting to collapsed after the response completes. + +### Socket.IO Events (new) + +| Event | Direction | Payload | +|---|---|---| +| `memory_recall_start` | server → client | `{ query: string }` | +| `memory_recall_result` | server → client | `{ memories: Memory[] }` or `{ memories: [] }` | +| `memory_status` | server → client | `{ healthy: boolean, lastConsolidation: string, memoryCount: number }` | + +### `/memories` Page (New Route) + +A dedicated page for browsing and managing all stored memories. + +**Header**: Memory count and sidecar health indicator (online/offline, last consolidation time). + +**Search**: Single FTS5-powered search box. + +**Memory Cards**: Each memory displayed as a card showing: +- Memory ID and timestamp +- Summary text +- Topic tags (badges) +- Entity tags (badges) +- Importance indicator (color-coded: green >= 0.7, yellow >= 0.4, gray < 0.4) +- **Connection badges**: Show linked memory IDs with relationship type. Clicking a badge **scrolls to and highlights** the connected memory on the page. +- **Delete button**: Individual delete with confirmation. Cleans up connection references in other memories. + +**Manual Memory Creation**: +- Text input area for typing facts to remember +- File upload dropzone (accepts all supported file types) +- Both write to `memory-agent/inbox/` for sidecar processing + +**No bulk operations** in initial release (individual delete only). + +### Navigation + +Add "Memories" link to the existing navigation (sidebar/header), alongside Chat and Settings. + +--- + +## Memory Scope + +**Global single store**. All sessions contribute to and recall from the same memory pool. No per-session isolation or scoping. + +--- + +## Auto-Setup Script (`memory-agent/setup.sh`) + +```bash +#!/bin/bash +# Creates Python venv and installs sidecar dependencies +cd "$(dirname "$0")" +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt +``` + +server.ts runs this automatically if `memory-agent/venv/` doesn't exist on startup. + +--- + +## Environment Variables (additions) + +| Variable | Default | Purpose | +|---|---|---| +| `MEMORY_DB_PATH` | `./data/memory.db` | SQLite file for memory storage | +| `MEMORY_INBOX_PATH` | `./memory-agent/inbox` | Inbox directory for ingestion | + +--- + +## Summary of Key Decisions + +| Decision | Choice | +|---|---| +| Framework | Hybrid: ADK sidecar + existing OpenAI SDK loop | +| LiteLLM | In-process library (not proxy server) | +| Memory scope | Global (single store) | +| Retrieval | FTS5 keyword search, top 10 importance-weighted | +| Keyword extraction | LLM-driven (same vLLM model), runs on every message | +| Memory creation | LLM-driven per exchange (after full agent loop) | +| Consolidation | Background timer, 30 min fixed interval, min 3 memories | +| Connection graph | Full tracking with relationship types | +| IPC | Shared DB + inbox directory (no direct communication) | +| DB location | Separate `memory.db` file | +| Sidecar lifecycle | Spawned by server.ts, auto-setup on first run | +| Toggle | Brain icon in chat input bar, defaults ON, no persist | +| Recall display | Matches tool_call UI pattern, shows keywords + results | +| Management UI | Dedicated /memories page with FTS5 search | +| Manual creation | Text input + file upload on /memories page | +| File types | All 27 types accepted (future-proofed), text processed now | +| Import threshold | 0.3 minimum (below discarded) | +| Consolidation threshold | 3 unconsolidated memories minimum | +| Processing order | FIFO (timestamp) | +| Inbox poll interval | 10 seconds | +| vLLM failure handling | Move to inbox/failed/ | +| Delete behavior | Clean up connection references | +| Privacy | None (self-hosted, user controls machine) | +| Empty recall | Show "no memories found" block | +| Python setup | Auto-run setup.sh if venv missing | +| Toggle persist | No persist, defaults ON each page load | diff --git a/docs/spec.md b/docs/project.md similarity index 100% rename from docs/spec.md rename to docs/project.md diff --git a/memory-agent/agent.py b/memory-agent/agent.py new file mode 100644 index 0000000..b161be3 --- /dev/null +++ b/memory-agent/agent.py @@ -0,0 +1,585 @@ +""" +Always-on memory sidecar for ChatLocal. + +Runs as a background process alongside server.ts. +Communicates exclusively through: + - inbox/ directory: receives JSON exchange files from server.ts + - memory.db: writes memories; server.ts reads for FTS5 recall + +Agents (Google ADK + LiteLLM → vLLM): + - IngestAgent: processes inbox files → stores memories + - ConsolidateAgent: runs every 30 min → finds patterns, links memories +""" + +import asyncio +import json +import logging +import os +import signal +import sqlite3 +import time +from datetime import datetime, timezone +from pathlib import Path + +import litellm +from google.adk.agents import Agent +from google.adk.runners import Runner +from google.adk.sessions import InMemorySessionService +from google.genai import types + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8000") +MEMORY_DB_PATH = os.environ.get("MEMORY_DB_PATH", "./data/memory.db") +MEMORY_INBOX_PATH = os.environ.get("MEMORY_INBOX_PATH", "./memory-agent/inbox") +CONSOLIDATION_INTERVAL = 30 * 60 # 30 minutes +INBOX_POLL_INTERVAL = 10 # seconds +IMPORTANCE_THRESHOLD = 0.3 +CONSOLIDATION_MIN_MEMORIES = 3 +VERSION = "1.0.0" + +# LiteLLM routes ADK model calls to local vLLM +litellm.api_base = f"{VLLM_BASE_URL}/v1" +litellm.api_key = "dummy" # vLLM doesn't require auth; LiteLLM's OpenAI provider still needs a non-empty key +litellm.drop_params = True # ignore unsupported params silently + +# Model name — use openai/ prefix so LiteLLM routes to the OpenAI-compatible endpoint +_raw_model = os.environ.get("VLLM_MODEL", "") +MODEL = f"openai/{_raw_model}" if _raw_model and not _raw_model.startswith("openai/") else ( + _raw_model or "openai/default" +) + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(message)s", + datefmt="[%H:%M]", +) +log = logging.getLogger("memory-agent") + +# --------------------------------------------------------------------------- +# Database +# --------------------------------------------------------------------------- + +def get_db() -> sqlite3.Connection: + db_path = Path(MEMORY_DB_PATH).resolve() + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(db_path), check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode = WAL") + conn.execute("PRAGMA foreign_keys = ON") + return conn + + +def init_db(conn: sqlite3.Connection) -> None: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS memories ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source TEXT DEFAULT '', + raw_text TEXT, + summary TEXT, + entities TEXT DEFAULT '[]', + topics TEXT DEFAULT '[]', + connections TEXT DEFAULT '[]', + importance REAL DEFAULT 0.5, + created_at TEXT, + consolidated INTEGER DEFAULT 0 + ); + + CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5( + summary, + entities, + topics, + raw_text, + content='memories', + content_rowid='id' + ); + + CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN + INSERT INTO memories_fts(rowid, summary, entities, topics, raw_text) + VALUES (new.id, new.summary, new.entities, new.topics, new.raw_text); + END; + + CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN + INSERT INTO memories_fts(memories_fts, rowid, summary, entities, topics, raw_text) + VALUES ('delete', old.id, old.summary, old.entities, old.topics, old.raw_text); + END; + + CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN + INSERT INTO memories_fts(memories_fts, rowid, summary, entities, topics, raw_text) + VALUES ('delete', old.id, old.summary, old.entities, old.topics, old.raw_text); + INSERT INTO memories_fts(rowid, summary, entities, topics, raw_text) + VALUES (new.id, new.summary, new.entities, new.topics, new.raw_text); + END; + + CREATE TABLE IF NOT EXISTS consolidations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_ids TEXT, + summary TEXT, + insight TEXT, + created_at TEXT + ); + + CREATE TABLE IF NOT EXISTS processed_files ( + path TEXT PRIMARY KEY, + processed_at TEXT + ); + + CREATE TABLE IF NOT EXISTS sidecar_status ( + key TEXT PRIMARY KEY, + value TEXT + ); + """) + conn.commit() + + +def set_status(conn: sqlite3.Connection, key: str, value: str) -> None: + conn.execute( + "INSERT OR REPLACE INTO sidecar_status (key, value) VALUES (?, ?)", + (key, value), + ) + conn.commit() + + +def update_heartbeat(conn: sqlite3.Connection) -> None: + now = datetime.now(timezone.utc).isoformat() + row = conn.execute("SELECT COUNT(*) as n FROM memories").fetchone() + pending = conn.execute( + "SELECT COUNT(*) as n FROM memories WHERE consolidated = 0" + ).fetchone() + set_status(conn, "last_heartbeat", now) + set_status(conn, "memory_count", str(row["n"])) + set_status(conn, "pending_count", str(pending["n"])) + set_status(conn, "version", VERSION) + set_status(conn, "status", "online") + + +# --------------------------------------------------------------------------- +# Agent tools +# --------------------------------------------------------------------------- + +# Module-level db connection shared by tool functions +_db: sqlite3.Connection | None = None + + +def _get_db() -> sqlite3.Connection: + global _db + if _db is None: + raise RuntimeError("DB not initialized") + return _db + + +def store_memory( + summary: str, + entities: str, + topics: str, + importance: float, + raw_text: str = "", + source: str = "chat", +) -> str: + """Store a new memory in the database. importance must be 0.0–1.0.""" + if importance < IMPORTANCE_THRESHOLD: + return f"Discarded: importance {importance:.2f} below threshold {IMPORTANCE_THRESHOLD}" + db = _get_db() + now = datetime.now(timezone.utc).isoformat() + # Normalize JSON strings + try: + json.loads(entities) + except (json.JSONDecodeError, TypeError): + entities = json.dumps([e.strip() for e in str(entities).split(",") if e.strip()]) + try: + json.loads(topics) + except (json.JSONDecodeError, TypeError): + topics = json.dumps([t.strip() for t in str(topics).split(",") if t.strip()]) + + cur = db.execute( + """INSERT INTO memories (source, raw_text, summary, entities, topics, importance, created_at, consolidated) + VALUES (?, ?, ?, ?, ?, ?, ?, 0)""", + (source, raw_text[:50000], summary, entities, topics, float(importance), now), + ) + db.commit() + mem_id = cur.lastrowid + log.info("🧠 Stored memory #%d (importance: %.2f): %s", mem_id, importance, summary[:80]) + return f"Stored memory #{mem_id}" + + +def read_unconsolidated_memories() -> str: + """Return up to 10 unconsolidated memories as JSON for the consolidation agent.""" + db = _get_db() + rows = db.execute( + "SELECT id, summary, entities, topics, importance, raw_text FROM memories WHERE consolidated = 0 ORDER BY id ASC LIMIT 10" + ).fetchall() + if not rows: + return json.dumps([]) + return json.dumps([dict(r) for r in rows]) + + +def store_consolidation( + source_ids: str, + summary: str, + insight: str, + connections: str = "[]", +) -> str: + """Store a consolidation result and update source memories with connections.""" + db = _get_db() + now = datetime.now(timezone.utc).isoformat() + + # Parse and validate source_ids + try: + ids: list[int] = json.loads(source_ids) + except (json.JSONDecodeError, TypeError): + ids = [] + + # Store consolidation record + db.execute( + "INSERT INTO consolidations (source_ids, summary, insight, created_at) VALUES (?, ?, ?, ?)", + (json.dumps(ids), summary, insight, now), + ) + + # Parse connections + try: + conn_list = json.loads(connections) + except (json.JSONDecodeError, TypeError): + conn_list = [] + + # Update each source memory: mark as consolidated and set connections + for mem_id in ids: + mem_connections = [c for c in conn_list if c.get("from_id") == mem_id] + # Build connection list: links to other memories in this consolidation + mem_conn = [ + {"linked_to": c["linked_to"], "relationship": c.get("relationship", "relates_to")} + for c in mem_connections + ] + if not mem_conn: + # Default: connect to all other memories in this batch + mem_conn = [ + {"linked_to": other_id, "relationship": "relates_to"} + for other_id in ids + if other_id != mem_id + ] + db.execute( + "UPDATE memories SET consolidated = 1, connections = ? WHERE id = ?", + (json.dumps(mem_conn), mem_id), + ) + + db.commit() + log.info("🔮 Consolidated %d memories → insight: %s", len(ids), insight[:80]) + return f"Consolidated {len(ids)} memories" + + +# --------------------------------------------------------------------------- +# ADK agent setup +# --------------------------------------------------------------------------- + +INGEST_INSTRUCTION = """You are a Memory Ingest Agent for a personal AI chat assistant. +When you receive content (a chat exchange or text), you must: +1. Thoroughly analyze what the content is about +2. Write a concise 1-2 sentence summary capturing the key information +3. Extract key entities (people, companies, products, concepts, locations) as a JSON array of strings +4. Assign 2-4 topic tags as a JSON array of strings +5. Rate importance from 0.0 to 1.0: + - 0.9-1.0: crucial personal information, important decisions, key facts to always remember + - 0.7-0.8: useful context, preferences, notable events + - 0.4-0.6: interesting but not critical information + - 0.0-0.3: trivial conversation, greetings, simple confirmations (these will be discarded) +6. Call store_memory with all extracted information + +For chat exchanges, focus on what the USER shared or what was decided/learned, not just pleasantries. +Be selective — only store information that would be valuable to recall in future conversations.""" + +CONSOLIDATE_INSTRUCTION = """You are a Memory Consolidation Agent. Your job is to find patterns and connections across memories. +When called: +1. Call read_unconsolidated_memories to get pending memories +2. If fewer than 3 memories returned, respond with "Nothing to consolidate yet." +3. Analyze the memories to find: + - Common themes and patterns + - Complementary information (memories that reinforce each other) + - Contradictions (memories that conflict) + - Dependencies (one memory provides context for another) +4. Create a synthesized summary that captures the overall pattern across these memories +5. Identify one key insight — the most important cross-cutting finding +6. Build a connections array where each item has: from_id, linked_to, relationship + (relationship must be one of: complements, contradicts, relates_to, depends_on) +7. Call store_consolidation with source_ids (JSON array of memory IDs), summary, insight, and connections + +Focus on genuine patterns, not forced connections. If memories are truly unrelated, note that in the summary.""" + + +def _make_runner(agent: Agent) -> tuple[Runner, InMemorySessionService]: + session_service = InMemorySessionService() + runner = Runner(agent=agent, app_name=agent.name, session_service=session_service) + return runner, session_service + + +ingest_agent = Agent( + name="ingest_agent", + model=MODEL, + description="Analyzes content and stores important information as memories", + instruction=INGEST_INSTRUCTION, + tools=[store_memory], +) + +consolidate_agent = Agent( + name="consolidate_agent", + model=MODEL, + description="Finds patterns across memories and creates consolidations", + instruction=CONSOLIDATE_INSTRUCTION, + tools=[read_unconsolidated_memories, store_consolidation], +) + +ingest_runner, ingest_sessions = _make_runner(ingest_agent) +consolidate_runner, consolidate_sessions = _make_runner(consolidate_agent) + + +async def _run_agent(runner: Runner, sessions: InMemorySessionService, prompt: str, session_id: str) -> str: + """Run an ADK agent with a prompt, return the final text response.""" + await sessions.create_session(app_name=runner.agent.name, user_id="system", session_id=session_id) + content = types.Content(role="user", parts=[types.Part(text=prompt)]) + response_text = "" + async for event in runner.run_async( + user_id="system", session_id=session_id, new_message=content + ): + if event.is_final_response() and event.content and event.content.parts: + for part in event.content.parts: + if hasattr(part, "text") and part.text: + response_text += part.text + return response_text + + +# --------------------------------------------------------------------------- +# File type classification +# --------------------------------------------------------------------------- + +TEXT_EXTENSIONS = {".txt", ".md", ".json", ".csv", ".log", ".xml", ".yaml", ".yml"} +IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".svg"} +AUDIO_EXTENSIONS = {".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac"} +VIDEO_EXTENSIONS = {".mp4", ".webm", ".mov", ".avi", ".mkv"} +DOC_EXTENSIONS = {".pdf"} + +ALL_SUPPORTED = TEXT_EXTENSIONS | IMAGE_EXTENSIONS | AUDIO_EXTENSIONS | VIDEO_EXTENSIONS | DOC_EXTENSIONS + + +# --------------------------------------------------------------------------- +# Ingest logic +# --------------------------------------------------------------------------- + +async def ingest_file(file_path: Path, db: sqlite3.Connection) -> None: + """Process a single inbox file through the IngestAgent.""" + suffix = file_path.suffix.lower() + source = file_path.name + + if suffix not in ALL_SUPPORTED: + log.info("⏭️ Skipping unsupported file type: %s", file_path.name) + return + + # Check if already processed + already = db.execute( + "SELECT 1 FROM processed_files WHERE path = ?", (str(file_path),) + ).fetchone() + if already: + return + + log.info("📥 Ingesting: %s", file_path.name) + + try: + if suffix == ".json": + # Might be a chat exchange from server.ts + try: + data = json.loads(file_path.read_text(encoding="utf-8")) + if data.get("type") == "chat_exchange": + user_msg = data.get("userMessage", "") + assistant_msg = data.get("assistantResponse", "") + session_id = data.get("sessionId", "unknown") + raw = f"[User]: {user_msg}\n\n[Assistant]: {assistant_msg}" + prompt = f"Analyze this chat exchange and store any valuable information as a memory:\n\n{raw[:10000]}" + source = f"chat:{session_id}" + elif data.get("type") == "manual": + raw = data.get("text", "") + prompt = f"Analyze this text and store any valuable information as a memory:\n\n{raw[:10000]}" + source = data.get("source", "manual") + else: + raw = file_path.read_text(encoding="utf-8", errors="replace")[:10000] + prompt = f"Analyze this content and store any valuable information as a memory:\n\n{raw}" + except json.JSONDecodeError: + raw = file_path.read_text(encoding="utf-8", errors="replace")[:10000] + prompt = f"Analyze this content and store any valuable information as a memory:\n\n{raw}" + + elif suffix in TEXT_EXTENSIONS: + raw = file_path.read_text(encoding="utf-8", errors="replace")[:10000] + prompt = f"Analyze this content and store any valuable information as a memory:\n\n{raw}" + + elif suffix in (IMAGE_EXTENSIONS | AUDIO_EXTENSIONS | VIDEO_EXTENSIONS | DOC_EXTENSIONS): + # Non-text: store a placeholder record (future multimodal support) + db.execute( + """INSERT INTO memories (source, raw_text, summary, entities, topics, importance, created_at, consolidated) + VALUES (?, ?, ?, ?, ?, ?, ?, 0)""", + ( + source, + f"[Binary file: {file_path.name}]", + f"Binary file stored for future multimodal processing: {file_path.name}", + "[]", + "[]", + 0.4, + datetime.now(timezone.utc).isoformat(), + ), + ) + db.commit() + log.info("📄 Stored placeholder for binary file: %s", file_path.name) + _mark_processed(db, file_path) + return + else: + return + + # Run IngestAgent + session_id = f"ingest_{int(time.time() * 1000)}" + await _run_agent(ingest_runner, ingest_sessions, prompt, session_id) + + _mark_processed(db, file_path) + file_path.unlink(missing_ok=True) + + except Exception as exc: + log.error("❌ Ingest failed for %s: %s", file_path.name, exc) + failed_dir = file_path.parent / "failed" + failed_dir.mkdir(exist_ok=True) + target = failed_dir / file_path.name + file_path.rename(target) + log.info("🗑️ Moved to failed/: %s", file_path.name) + + +def _mark_processed(db: sqlite3.Connection, file_path: Path) -> None: + db.execute( + "INSERT OR REPLACE INTO processed_files (path, processed_at) VALUES (?, ?)", + (str(file_path), datetime.now(timezone.utc).isoformat()), + ) + db.commit() + + +# --------------------------------------------------------------------------- +# Consolidation +# --------------------------------------------------------------------------- + +async def run_consolidation(db: sqlite3.Connection) -> None: + """Run the ConsolidateAgent if enough unconsolidated memories exist.""" + count = db.execute( + "SELECT COUNT(*) as n FROM memories WHERE consolidated = 0" + ).fetchone()["n"] + + if count < CONSOLIDATION_MIN_MEMORIES: + log.info("🔄 Consolidation skipped: only %d unconsolidated memories (need %d)", count, CONSOLIDATION_MIN_MEMORIES) + return + + log.info("🔄 Running consolidation (%d memories pending)...", count) + session_id = f"consolidate_{int(time.time() * 1000)}" + try: + await _run_agent( + consolidate_runner, + consolidate_sessions, + "Please consolidate the unconsolidated memories now.", + session_id, + ) + now = datetime.now(timezone.utc).isoformat() + set_status(db, "last_consolidation", now) + log.info("✅ Consolidation complete") + except Exception as exc: + log.error("❌ Consolidation failed: %s", exc) + + +# --------------------------------------------------------------------------- +# Background tasks +# --------------------------------------------------------------------------- + +async def watch_inbox(inbox_dir: Path, db: sqlite3.Connection) -> None: + """Poll inbox/ directory every INBOX_POLL_INTERVAL seconds and process files.""" + log.info("👁️ Watching inbox: %s (every %ds)", inbox_dir, INBOX_POLL_INTERVAL) + while True: + try: + # Collect files sorted by mtime for FIFO ordering + files = sorted( + (f for f in inbox_dir.iterdir() if f.is_file() and f.suffix.lower() != ".gitkeep"), + key=lambda f: f.stat().st_mtime, + ) + for file_path in files: + await ingest_file(file_path, db) + update_heartbeat(db) + except Exception as exc: + log.error("❌ Inbox watch error: %s", exc) + + await asyncio.sleep(INBOX_POLL_INTERVAL) + + +async def consolidation_loop(db: sqlite3.Connection) -> None: + """Run consolidation every CONSOLIDATION_INTERVAL seconds.""" + # Wait before first run to let inbox warm up + await asyncio.sleep(60) + while True: + await run_consolidation(db) + await asyncio.sleep(CONSOLIDATION_INTERVAL) + + +async def heartbeat_loop(db: sqlite3.Connection) -> None: + """Update sidecar_status every 30 seconds.""" + while True: + try: + update_heartbeat(db) + except Exception as exc: + log.error("❌ Heartbeat error: %s", exc) + await asyncio.sleep(30) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +async def main() -> None: + inbox_dir = Path(MEMORY_INBOX_PATH).resolve() + inbox_dir.mkdir(parents=True, exist_ok=True) + (inbox_dir / "failed").mkdir(exist_ok=True) + + log.info("🚀 Memory sidecar starting (model: %s)", MODEL) + log.info("📦 DB: %s", MEMORY_DB_PATH) + log.info("📬 Inbox: %s", inbox_dir) + + db = get_db() + init_db(db) + + # Inject db into tool closure + global _db + _db = db + + set_status(db, "status", "online") + update_heartbeat(db) + + # Graceful shutdown + loop = asyncio.get_running_loop() + stop_event = asyncio.Event() + + def _signal_handler() -> None: + log.info("👋 Shutting down memory sidecar...") + set_status(db, "status", "offline") + db.commit() + stop_event.set() + + loop.add_signal_handler(signal.SIGINT, _signal_handler) + loop.add_signal_handler(signal.SIGTERM, _signal_handler) + + tasks = [ + asyncio.create_task(watch_inbox(inbox_dir, db)), + asyncio.create_task(consolidation_loop(db)), + asyncio.create_task(heartbeat_loop(db)), + ] + + await stop_event.wait() + + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + log.info("✅ Memory sidecar stopped") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/memory-agent/requirements.txt b/memory-agent/requirements.txt new file mode 100644 index 0000000..a362b46 --- /dev/null +++ b/memory-agent/requirements.txt @@ -0,0 +1,3 @@ +google-adk>=1.0.0 +litellm>=1.0.0 +aiohttp>=3.9.0 diff --git a/memory-agent/setup.sh b/memory-agent/setup.sh new file mode 100755 index 0000000..da02583 --- /dev/null +++ b/memory-agent/setup.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e +cd "$(dirname "$0")" +echo "[memory-agent] Creating Python virtual environment..." +python3 -m venv venv +echo "[memory-agent] Installing dependencies..." +venv/bin/pip install --quiet -r requirements.txt +echo "[memory-agent] Setup complete." diff --git a/server.ts b/server.ts index 4bf42ea..a5c0498 100644 --- a/server.ts +++ b/server.ts @@ -2,6 +2,9 @@ import "dotenv/config"; import { createServer } from "http"; import { Server as SocketIOServer } from "socket.io"; import next from "next"; +import { execSync, spawn, type ChildProcess } from "child_process"; +import path from "path"; +import fs from "fs"; import { runMigrations } from "./src/lib/db/index"; import { mcpManager } from "./src/lib/mcp/manager"; import { registerSocketHandlers } from "./src/server/socket"; @@ -48,12 +51,60 @@ async function main() { console.log(`[Server] Mode: ${dev ? "development" : "production"}`); }); + // Start memory sidecar + let sidecarProcess: ChildProcess | null = null; + const sidecarDir = path.resolve("memory-agent"); + const venvPython = path.join(sidecarDir, "venv", "bin", "python"); + const agentScript = path.join(sidecarDir, "agent.py"); + + if (fs.existsSync(agentScript)) { + if (!fs.existsSync(venvPython)) { + console.log("[Memory] venv not found — running setup.sh..."); + try { + execSync(`bash ${path.join(sidecarDir, "setup.sh")}`, { stdio: "inherit" }); + } catch (err) { + console.error("[Memory] setup.sh failed:", err); + } + } + if (fs.existsSync(venvPython)) { + const vllmModel = process.env.VLLM_MODEL ?? ""; + sidecarProcess = spawn(venvPython, [agentScript], { + env: { + ...process.env, + VLLM_BASE_URL: process.env.VLLM_BASE_URL ?? "http://localhost:8000", + MEMORY_DB_PATH: process.env.MEMORY_DB_PATH ?? "./data/memory.db", + MEMORY_INBOX_PATH: process.env.MEMORY_INBOX_PATH ?? "./memory-agent/inbox", + VLLM_MODEL: vllmModel, + }, + stdio: ["ignore", "pipe", "pipe"], + }); + sidecarProcess.stdout?.on("data", (d: Buffer) => + process.stdout.write(`[memory] ${d.toString()}`) + ); + sidecarProcess.stderr?.on("data", (d: Buffer) => + process.stderr.write(`[memory] ${d.toString()}`) + ); + sidecarProcess.on("exit", (code) => { + if (code !== 0 && code !== null) + console.error(`[Memory] Sidecar exited with code ${code}`); + }); + console.log("[Memory] Sidecar started"); + } else { + console.warn("[Memory] Sidecar venv unavailable — memory features disabled"); + } + } else { + console.warn("[Memory] memory-agent/agent.py not found — memory features disabled"); + } + // Graceful shutdown — force exit after 3s so open Socket.IO connections don't hang let shuttingDown = false; const shutdown = async () => { if (shuttingDown) return; shuttingDown = true; console.log("[Server] Shutting down..."); + if (sidecarProcess) { + sidecarProcess.kill("SIGTERM"); + } setTimeout(() => process.exit(0), 3000).unref(); await mcpManager.shutdown(); io.close(); diff --git a/src/app/api/memories/[id]/route.ts b/src/app/api/memories/[id]/route.ts new file mode 100644 index 0000000..e1c40e2 --- /dev/null +++ b/src/app/api/memories/[id]/route.ts @@ -0,0 +1,72 @@ +import { NextRequest, NextResponse } from "next/server"; +import { getMemory } from "@/lib/db/memory"; +import Database from "better-sqlite3"; +import path from "path"; +import fs from "fs"; + +const memoryDbPath = process.env.MEMORY_DB_PATH ?? "./data/memory.db"; + +function getWriteDb(): InstanceType | null { + const resolved = path.resolve(memoryDbPath); + if (!fs.existsSync(resolved)) return null; + try { + const db = new Database(resolved); + db.pragma("journal_mode = WAL"); + db.pragma("foreign_keys = ON"); + return db; + } catch { + return null; + } +} + +export async function GET(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { + const { id } = await params; + const memory = getMemory(parseInt(id, 10)); + if (!memory) return NextResponse.json({ error: "Not found" }, { status: 404 }); + return NextResponse.json(memory); +} + +export async function DELETE(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { + const { id } = await params; + const memoryId = parseInt(id, 10); + + const db = getWriteDb(); + if (!db) return NextResponse.json({ error: "Memory database unavailable" }, { status: 503 }); + + try { + const existing = db.prepare("SELECT id FROM memories WHERE id = ?").get(memoryId); + if (!existing) { + db.close(); + return NextResponse.json({ error: "Not found" }, { status: 404 }); + } + + // Clean up connection references in other memories + const others = db + .prepare("SELECT id, connections FROM memories WHERE connections != '[]' AND id != ?") + .all(memoryId) as { id: number; connections: string }[]; + + for (const other of others) { + try { + const conns: Array<{ linked_to: number; relationship: string }> = JSON.parse(other.connections); + const cleaned = conns.filter((c) => c.linked_to !== memoryId); + if (cleaned.length !== conns.length) { + db.prepare("UPDATE memories SET connections = ? WHERE id = ?").run( + JSON.stringify(cleaned), + other.id + ); + } + } catch { + // skip malformed connections + } + } + + db.prepare("DELETE FROM memories WHERE id = ?").run(memoryId); + db.close(); + + return NextResponse.json({ success: true }); + } catch (err) { + db.close(); + const msg = err instanceof Error ? err.message : String(err); + return NextResponse.json({ error: msg }, { status: 500 }); + } +} diff --git a/src/app/api/memories/manual/route.ts b/src/app/api/memories/manual/route.ts new file mode 100644 index 0000000..bccb936 --- /dev/null +++ b/src/app/api/memories/manual/route.ts @@ -0,0 +1,37 @@ +import { NextRequest, NextResponse } from "next/server"; +import fs from "fs"; +import path from "path"; + +const inboxPath = process.env.MEMORY_INBOX_PATH ?? "./memory-agent/inbox"; + +export async function POST(req: NextRequest) { + try { + const body = await req.json(); + const { text, source = "manual" } = body as { text: string; source?: string }; + + if (!text?.trim()) { + return NextResponse.json({ error: "text is required" }, { status: 400 }); + } + + const inboxDir = path.resolve(inboxPath); + fs.mkdirSync(inboxDir, { recursive: true }); + + const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + const filename = `${timestamp}_manual.json`; + const filePath = path.join(inboxDir, filename); + + const payload = { + type: "manual", + text: text.trim(), + source, + timestamp: new Date().toISOString(), + }; + + fs.writeFileSync(filePath, JSON.stringify(payload, null, 2), "utf-8"); + + return NextResponse.json({ success: true, filename }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return NextResponse.json({ error: msg }, { status: 500 }); + } +} diff --git a/src/app/api/memories/route.ts b/src/app/api/memories/route.ts new file mode 100644 index 0000000..61eda02 --- /dev/null +++ b/src/app/api/memories/route.ts @@ -0,0 +1,12 @@ +import { NextRequest, NextResponse } from "next/server"; +import { listMemories } from "@/lib/db/memory"; + +export async function GET(req: NextRequest) { + const { searchParams } = req.nextUrl; + const q = searchParams.get("q") ?? undefined; + const page = parseInt(searchParams.get("page") ?? "1", 10); + const limit = Math.min(parseInt(searchParams.get("limit") ?? "20", 10), 100); + + const result = listMemories({ q, page, limit }); + return NextResponse.json(result); +} diff --git a/src/app/api/memories/upload/route.ts b/src/app/api/memories/upload/route.ts new file mode 100644 index 0000000..3c1be9d --- /dev/null +++ b/src/app/api/memories/upload/route.ts @@ -0,0 +1,44 @@ +import { NextRequest, NextResponse } from "next/server"; +import fs from "fs"; +import path from "path"; + +const inboxPath = process.env.MEMORY_INBOX_PATH ?? "./memory-agent/inbox"; + +const SUPPORTED_EXTENSIONS = new Set([ + ".txt", ".md", ".json", ".csv", ".log", ".xml", ".yaml", ".yml", + ".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".svg", + ".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac", + ".mp4", ".webm", ".mov", ".avi", ".mkv", + ".pdf", +]); + +export async function POST(req: NextRequest) { + try { + const formData = await req.formData(); + const file = formData.get("file") as File | null; + + if (!file) { + return NextResponse.json({ error: "No file provided" }, { status: 400 }); + } + + const ext = path.extname(file.name).toLowerCase(); + if (!SUPPORTED_EXTENSIONS.has(ext)) { + return NextResponse.json({ error: `Unsupported file type: ${ext}` }, { status: 400 }); + } + + const inboxDir = path.resolve(inboxPath); + fs.mkdirSync(inboxDir, { recursive: true }); + + const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + const safeFilename = `${timestamp}_${file.name.replace(/[^a-zA-Z0-9._-]/g, "_")}`; + const filePath = path.join(inboxDir, safeFilename); + + const arrayBuffer = await file.arrayBuffer(); + fs.writeFileSync(filePath, Buffer.from(arrayBuffer)); + + return NextResponse.json({ success: true, filename: safeFilename }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return NextResponse.json({ error: msg }, { status: 500 }); + } +} diff --git a/src/app/api/memory/health/route.ts b/src/app/api/memory/health/route.ts new file mode 100644 index 0000000..18114cd --- /dev/null +++ b/src/app/api/memory/health/route.ts @@ -0,0 +1,7 @@ +import { NextResponse } from "next/server"; +import { getMemoryHealth } from "@/lib/db/memory"; + +export async function GET() { + const health = getMemoryHealth(); + return NextResponse.json(health); +} diff --git a/src/app/memories/page.tsx b/src/app/memories/page.tsx new file mode 100644 index 0000000..26155ae --- /dev/null +++ b/src/app/memories/page.tsx @@ -0,0 +1,302 @@ +"use client"; + +import { useState, useEffect, useCallback, useRef } from "react"; +import { Input } from "@/components/ui/input"; +import { Button } from "@/components/ui/button"; +import { Badge } from "@/components/ui/badge"; +import { Textarea } from "@/components/ui/textarea"; +import { Brain, Search, Upload, Save, Loader2 } from "lucide-react"; +import { cn } from "@/lib/utils"; +import { MemoryCard } from "@/components/memories/MemoryCard"; +import type { Memory } from "@/lib/db/memory"; + +interface HealthData { + healthy: boolean; + lastHeartbeat: string | null; + lastConsolidation: string | null; + memoryCount: number; + pendingCount: number; + status: string; +} + +interface MemoriesResponse { + memories: Memory[]; + total: number; +} + +const PAGE_SIZE = 20; + +function useDebounce(value: T, delay: number): T { + const [debounced, setDebounced] = useState(value); + useEffect(() => { + const t = setTimeout(() => setDebounced(value), delay); + return () => clearTimeout(t); + }, [value, delay]); + return debounced; +} + +export default function MemoriesPage() { + const [memories, setMemories] = useState([]); + const [total, setTotal] = useState(0); + const [page, setPage] = useState(1); + const [search, setSearch] = useState(""); + const [health, setHealth] = useState(null); + const [loading, setLoading] = useState(false); + + const [manualText, setManualText] = useState(""); + const [manualSaving, setManualSaving] = useState(false); + const [manualError, setManualError] = useState(null); + const [manualSuccess, setManualSuccess] = useState(false); + + const [uploading, setUploading] = useState(false); + const [uploadError, setUploadError] = useState(null); + const [uploadSuccess, setUploadSuccess] = useState(false); + + const fileInputRef = useRef(null); + const debouncedSearch = useDebounce(search, 300); + + const fetchHealth = useCallback(async () => { + try { + const res = await fetch("/api/memory/health"); + const data: HealthData = await res.json(); + setHealth(data); + } catch { + // health unavailable + } + }, []); + + const fetchMemories = useCallback(async (q: string, p: number) => { + setLoading(true); + try { + const params = new URLSearchParams({ page: String(p), limit: String(PAGE_SIZE) }); + if (q.trim()) params.set("q", q.trim()); + const res = await fetch(`/api/memories?${params}`); + const data: MemoriesResponse = await res.json(); + setMemories(data.memories ?? []); + setTotal(data.total ?? 0); + } catch { + setMemories([]); + } finally { + setLoading(false); + } + }, []); + + useEffect(() => { + fetchHealth(); + const interval = setInterval(fetchHealth, 60_000); + return () => clearInterval(interval); + }, [fetchHealth]); + + useEffect(() => { + setPage(1); + fetchMemories(debouncedSearch, 1); + }, [debouncedSearch, fetchMemories]); + + useEffect(() => { + fetchMemories(debouncedSearch, page); + }, [page]); // eslint-disable-line react-hooks/exhaustive-deps + + const handleDelete = useCallback(async (id: number) => { + try { + await fetch(`/api/memories/${id}`, { method: "DELETE" }); + setMemories((prev) => prev.filter((m) => m.id !== id)); + setTotal((prev) => Math.max(0, prev - 1)); + } catch { + // ignore + } + }, []); + + const handleScrollTo = useCallback((id: number) => { + const el = document.getElementById(`memory-${id}`); + if (el) { + el.scrollIntoView({ behavior: "smooth", block: "center" }); + el.classList.add("ring-2", "ring-primary", "ring-offset-2"); + setTimeout(() => el.classList.remove("ring-2", "ring-primary", "ring-offset-2"), 2000); + } + }, []); + + const handleManualSave = async () => { + if (!manualText.trim()) return; + setManualSaving(true); + setManualError(null); + setManualSuccess(false); + try { + const res = await fetch("/api/memories/manual", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: manualText }), + }); + if (!res.ok) throw new Error((await res.json()).error ?? "Failed"); + setManualText(""); + setManualSuccess(true); + setTimeout(() => setManualSuccess(false), 3000); + } catch (err) { + setManualError(err instanceof Error ? err.message : String(err)); + } finally { + setManualSaving(false); + } + }; + + const handleFileUpload = async (file: File) => { + setUploading(true); + setUploadError(null); + setUploadSuccess(false); + try { + const formData = new FormData(); + formData.append("file", file); + const res = await fetch("/api/memories/upload", { method: "POST", body: formData }); + if (!res.ok) throw new Error((await res.json()).error ?? "Upload failed"); + setUploadSuccess(true); + setTimeout(() => setUploadSuccess(false), 3000); + } catch (err) { + setUploadError(err instanceof Error ? err.message : String(err)); + } finally { + setUploading(false); + } + }; + + const totalPages = Math.ceil(total / PAGE_SIZE); + + return ( +
+
+ + {/* Header */} +
+
+ +

Memories

+ {health && ( + + {health.memoryCount} stored + + )} +
+ {health && ( +
+
+ {health.healthy ? "Sidecar online" : "Sidecar offline"} + {health.lastConsolidation && ( + + · Last consolidation: {new Intl.DateTimeFormat(undefined, { dateStyle: "short", timeStyle: "short" }).format(new Date(health.lastConsolidation))} + + )} +
+ )} +
+ + {/* Add memory */} +
+

Add to memory

+