From 28b78ea43b266ed735434e3e7cefd57ef4dd2e1c Mon Sep 17 00:00:00 2001 From: fitz123 Date: Sun, 19 Apr 2026 15:17:12 +0300 Subject: [PATCH] feat: persist downloaded media for session lifetime (#99) Photos/documents/animations downloaded for an agent session persist across turns (no missing-file error on follow-up). Voice files keep current immediate-cleanup behavior (only transcript enters context). - New media-store.ts manages per-chat dirs under /tmp/bot-media/ with global byte cap (default 200 MB, oldest-first eviction) - Files cleaned up on session close (idle timeout, explicit close, restart) - Per-message cleanup callbacks removed from photo/document handlers - enforceMediaCap is best-effort (IO/permission errors logged, not thrown) - Media dirs created with mode 0o700 - Added example keyword to maxMediaBytes default comment to avoid gitleaks telegram-user-ids false positive on the 9-digit byte value Closes #99 Co-Authored-By: Claude Opus 4.7 --- bot/src/__tests__/config-defaults.test.ts | 26 + bot/src/__tests__/hot-reload.test.ts | 1 + bot/src/__tests__/media-store.test.ts | 519 ++++++++++++++++++ bot/src/__tests__/message-queue.test.ts | 266 +++++++++ bot/src/__tests__/session-manager.test.ts | 79 ++- bot/src/__tests__/telegram-bot.test.ts | 1 + bot/src/config.ts | 13 +- bot/src/discord-bot.ts | 4 +- bot/src/main.ts | 5 + bot/src/media-store.ts | 282 ++++++++++ bot/src/message-queue.ts | 107 +++- bot/src/session-manager.ts | 45 +- bot/src/stream-relay.ts | 10 + bot/src/telegram-bot.ts | 82 ++- bot/src/types.ts | 1 + config.yaml | 5 + .../2026-04-18-issue-99-media-persistence.md | 148 +++++ 17 files changed, 1557 insertions(+), 37 deletions(-) create mode 100644 bot/src/__tests__/media-store.test.ts create mode 100644 bot/src/media-store.ts create mode 100644 docs/plans/completed/2026-04-18-issue-99-media-persistence.md diff --git a/bot/src/__tests__/config-defaults.test.ts b/bot/src/__tests__/config-defaults.test.ts index 2a5eaec..a2120a8 100644 --- a/bot/src/__tests__/config-defaults.test.ts +++ b/bot/src/__tests__/config-defaults.test.ts @@ -3,6 +3,7 @@ import assert from "node:assert/strict"; import { writeFileSync, mkdirSync, rmSync } from "node:fs"; import { join } from "node:path"; import { validateSessionDefaults, validateAgent, loadConfig } from "../config.js"; +import { DEFAULT_MAX_MEDIA_BYTES } from "../media-store.js"; const TEST_DIR = join("/tmp", "config-defaults-test-" + Date.now()); @@ -13,6 +14,7 @@ describe("validateSessionDefaults", () => { assert.strictEqual(defaults.maxConcurrentSessions, 12); assert.strictEqual(defaults.maxMessageAgeMs, 600000); assert.strictEqual(defaults.requireMention, true); + assert.strictEqual(defaults.maxMediaBytes, DEFAULT_MAX_MEDIA_BYTES); }); it("returns production defaults when input is undefined", () => { @@ -118,6 +120,30 @@ describe("validateSessionDefaults", () => { /Invalid requireMention/, ); }); + + it("throws on invalid maxMediaBytes", () => { + assert.throws( + () => validateSessionDefaults({ maxMediaBytes: 0 }), + /Invalid maxMediaBytes/, + ); + assert.throws( + () => validateSessionDefaults({ maxMediaBytes: -1 }), + /Invalid maxMediaBytes/, + ); + assert.throws( + () => validateSessionDefaults({ maxMediaBytes: Infinity }), + /Invalid maxMediaBytes/, + ); + assert.throws( + () => validateSessionDefaults({ maxMediaBytes: "big" }), + /Invalid maxMediaBytes/, + ); + }); + + it("allows overriding maxMediaBytes", () => { + const defaults = validateSessionDefaults({ maxMediaBytes: 1024 }); + assert.strictEqual(defaults.maxMediaBytes, 1024); + }); }); describe("validateAgent defaultModel inheritance", () => { diff --git a/bot/src/__tests__/hot-reload.test.ts b/bot/src/__tests__/hot-reload.test.ts index 62805ad..e982286 100644 --- a/bot/src/__tests__/hot-reload.test.ts +++ b/bot/src/__tests__/hot-reload.test.ts @@ -97,6 +97,7 @@ function makeConfig(model: string): BotConfig { maxConcurrentSessions: 5, maxMessageAgeMs: 300_000, requireMention: false, + maxMediaBytes: 209715200, }, }; } diff --git a/bot/src/__tests__/media-store.test.ts b/bot/src/__tests__/media-store.test.ts new file mode 100644 index 0000000..eec548b --- /dev/null +++ b/bot/src/__tests__/media-store.test.ts @@ -0,0 +1,519 @@ +import { describe, it, beforeEach, afterEach } from "node:test"; +import assert from "node:assert/strict"; +import { existsSync, chmodSync, mkdirSync, symlinkSync, rmSync, statSync, writeFileSync, utimesSync } from "node:fs"; +import { join } from "node:path"; +import { + MEDIA_BASE, + sessionMediaDir, + ensureSessionMediaDir, + cleanupSessionMediaDir, + cleanupStaleSessionMedia, + cleanupAllMedia, + allocateMediaPath, + releaseMediaPath, + discardMediaPath, + enforceMediaCap, +} from "../media-store.js"; + +function resetMediaBase(): void { + rmSync(MEDIA_BASE, { recursive: true, force: true }); +} + +describe("sessionMediaDir", () => { + it("returns deterministic path under /tmp/bot-media", () => { + assert.strictEqual(sessionMediaDir("chat123"), "/tmp/bot-media/chat123"); + }); + + it("sanitizes unsafe characters in chatId", () => { + assert.strictEqual(sessionMediaDir("tg:12345"), "/tmp/bot-media/tg_12345"); + assert.strictEqual(sessionMediaDir("../evil"), "/tmp/bot-media/___evil"); + }); + + it("returns same path for same chatId", () => { + assert.strictEqual(sessionMediaDir("abc"), sessionMediaDir("abc")); + }); +}); + +describe("ensureSessionMediaDir", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("creates the session dir when absent and returns its path", () => { + const dir = ensureSessionMediaDir("chat-a"); + assert.ok(existsSync(dir), "dir should exist after ensure"); + assert.strictEqual(dir, "/tmp/bot-media/chat-a"); + }); + + it("does NOT wipe existing files (protects early downloads)", () => { + const dir = ensureSessionMediaDir("chat-b"); + const filePath = join(dir, "photo.jpg"); + writeFileSync(filePath, "content"); + + ensureSessionMediaDir("chat-b"); + + assert.ok(existsSync(filePath), "pre-existing file must survive ensure"); + }); +}); + +describe("allocateMediaPath", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("returns a UUID path inside the session dir", () => { + const path = allocateMediaPath("chat-x", "photo", ".jpg"); + assert.ok(path.startsWith("/tmp/bot-media/chat-x/photo-")); + assert.ok(path.endsWith(".jpg")); + assert.ok(existsSync(sessionMediaDir("chat-x")), "session dir should exist"); + }); + + it("generates unique paths on each call", () => { + const a = allocateMediaPath("chat-x", "doc", ".pdf"); + const b = allocateMediaPath("chat-x", "doc", ".pdf"); + assert.notStrictEqual(a, b); + }); + + it("isolates sessions: paths differ per chatId", () => { + const a = allocateMediaPath("chat-1", "photo", ".jpg"); + const b = allocateMediaPath("chat-2", "photo", ".jpg"); + assert.ok(a.startsWith("/tmp/bot-media/chat-1/")); + assert.ok(b.startsWith("/tmp/bot-media/chat-2/")); + }); +}); + +describe("cleanupSessionMediaDir", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("removes the session dir and all its files", () => { + const p1 = allocateMediaPath("chat-a", "photo", ".jpg"); + const p2 = allocateMediaPath("chat-a", "doc", ".pdf"); + writeFileSync(p1, "x"); + writeFileSync(p2, "y"); + + cleanupSessionMediaDir("chat-a"); + + assert.ok(!existsSync(p1)); + assert.ok(!existsSync(p2)); + assert.ok(!existsSync(sessionMediaDir("chat-a"))); + }); + + it("is a no-op for missing session dir", () => { + assert.doesNotThrow(() => cleanupSessionMediaDir("nonexistent")); + }); + + it("leaves other sessions' files untouched", () => { + const p1 = allocateMediaPath("chat-a", "photo", ".jpg"); + const p2 = allocateMediaPath("chat-b", "photo", ".jpg"); + writeFileSync(p1, "x"); + writeFileSync(p2, "y"); + + cleanupSessionMediaDir("chat-a"); + + assert.ok(!existsSync(p1), "chat-a file removed"); + assert.ok(existsSync(p2), "chat-b file preserved"); + }); +}); + +describe("enforceMediaCap", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + function writeSized(path: string, size: number, mtime: number): void { + writeFileSync(path, Buffer.alloc(size)); + utimesSync(path, mtime / 1000, mtime / 1000); + } + + it("is a no-op when MEDIA_BASE does not exist", () => { + assert.doesNotThrow(() => enforceMediaCap(100)); + }); + + it("is a no-op when under cap", () => { + const p = allocateMediaPath("chat-a", "doc", ".bin"); + writeFileSync(p, Buffer.alloc(100)); + + enforceMediaCap(1000); + + assert.ok(existsSync(p)); + }); + + it("evicts oldest files first until total ≤ cap (across sessions)", () => { + const now = Date.now(); + const pOld = allocateMediaPath("chat-a", "doc", ".bin"); + const pMid = allocateMediaPath("chat-b", "doc", ".bin"); + const pNew = allocateMediaPath("chat-a", "doc", ".bin"); + // Release so they're evictable (simulates files already delivered to a session). + releaseMediaPath(pOld); + releaseMediaPath(pMid); + releaseMediaPath(pNew); + writeSized(pOld, 100, now - 3000); + writeSized(pMid, 100, now - 2000); + writeSized(pNew, 100, now - 1000); + + // Total = 300, cap = 150 → evict oldest two (200 bytes removed, 100 remain) + enforceMediaCap(150); + + assert.ok(!existsSync(pOld), "oldest evicted"); + assert.ok(!existsSync(pMid), "second-oldest evicted"); + assert.ok(existsSync(pNew), "newest preserved"); + }); + + it("stops evicting as soon as under cap", () => { + const now = Date.now(); + const p1 = allocateMediaPath("chat-a", "doc", ".bin"); + const p2 = allocateMediaPath("chat-a", "doc", ".bin"); + const p3 = allocateMediaPath("chat-a", "doc", ".bin"); + releaseMediaPath(p1); + releaseMediaPath(p2); + releaseMediaPath(p3); + writeSized(p1, 100, now - 3000); + writeSized(p2, 100, now - 2000); + writeSized(p3, 100, now - 1000); + + // Total = 300, cap = 250 → evict only oldest (50 bytes over) + enforceMediaCap(250); + + assert.ok(!existsSync(p1), "oldest evicted"); + assert.ok(existsSync(p2), "sufficient eviction — p2 preserved"); + assert.ok(existsSync(p3), "p3 preserved"); + }); + + it("handles sessions with no files gracefully", () => { + ensureSessionMediaDir("empty-chat"); + const p = allocateMediaPath("chat-a", "doc", ".bin"); + writeFileSync(p, Buffer.alloc(100)); + + assert.doesNotThrow(() => enforceMediaCap(1000)); + assert.ok(existsSync(p)); + }); +}); + +describe("enforceMediaCap in-flight protection", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + function writeSized(path: string, size: number, mtime: number): void { + writeFileSync(path, Buffer.alloc(size)); + utimesSync(path, mtime / 1000, mtime / 1000); + } + + it("never evicts a path that is currently in-flight", () => { + const now = Date.now(); + // In-flight path is the OLDEST — normally would be evicted first. + const inflight = allocateMediaPath("chat-inflight", "photo", ".jpg"); + const olderNonInflight = allocateMediaPath("chat-other", "doc", ".bin"); + releaseMediaPath(olderNonInflight); // release so it's evictable + const newer = allocateMediaPath("chat-other", "doc", ".bin"); + releaseMediaPath(newer); + + writeSized(inflight, 100, now - 5000); + writeSized(olderNonInflight, 100, now - 3000); + writeSized(newer, 100, now - 1000); + + // Total = 300, cap = 150 → must evict 200 bytes of non-inflight. + enforceMediaCap(150); + + assert.ok(existsSync(inflight), "in-flight file must be preserved even though it's oldest"); + assert.ok(!existsSync(olderNonInflight), "non-inflight older file evicted"); + assert.ok(!existsSync(newer), "newer non-inflight file evicted (still over cap)"); + + releaseMediaPath(inflight); + }); + + it("counts in-flight bytes toward total but does not evict them", () => { + const now = Date.now(); + const inflight = allocateMediaPath("chat-a", "photo", ".jpg"); + const evictable = allocateMediaPath("chat-b", "doc", ".bin"); + releaseMediaPath(evictable); + + writeSized(inflight, 200, now - 1000); + writeSized(evictable, 100, now - 500); + + // Total = 300, cap = 150 → evict the 100-byte evictable. Still over cap + // (200 > 150) but nothing else can be evicted; warn-and-return. + enforceMediaCap(150); + + assert.ok(existsSync(inflight), "in-flight preserved"); + assert.ok(!existsSync(evictable), "evictable removed"); + + releaseMediaPath(inflight); + }); +}); + +describe("cleanupSessionMediaDir symlink protection", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("refuses to follow MEDIA_BASE if it is a symlink", () => { + // Set up a decoy target that must NOT be touched. + const decoy = "/tmp/bot-media-victim-target"; + rmSync(decoy, { recursive: true, force: true }); + mkdirSync(decoy, { recursive: true, mode: 0o700 }); + const decoyChatDir = join(decoy, "attacker"); + mkdirSync(decoyChatDir); + const decoyFile = join(decoyChatDir, "important.txt"); + writeFileSync(decoyFile, "must survive"); + + rmSync(MEDIA_BASE, { recursive: true, force: true }); + symlinkSync(decoy, MEDIA_BASE); + + try { + assert.doesNotThrow(() => cleanupSessionMediaDir("attacker")); + assert.ok(existsSync(decoyFile), "decoy file must still exist — cleanup refused to follow symlink"); + } finally { + rmSync(MEDIA_BASE, { force: true }); + rmSync(decoy, { recursive: true, force: true }); + } + }); +}); + +describe("cleanupStaleSessionMedia symlink protection", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("refuses to follow MEDIA_BASE if it is a symlink", () => { + const decoy = "/tmp/bot-media-victim-stale"; + rmSync(decoy, { recursive: true, force: true }); + mkdirSync(decoy, { recursive: true, mode: 0o700 }); + const decoyChatDir = join(decoy, "attacker"); + mkdirSync(decoyChatDir); + const decoyFile = join(decoyChatDir, "important.txt"); + writeFileSync(decoyFile, "must survive"); + + rmSync(MEDIA_BASE, { recursive: true, force: true }); + symlinkSync(decoy, MEDIA_BASE); + + try { + assert.doesNotThrow(() => cleanupStaleSessionMedia("attacker")); + assert.ok(existsSync(decoyFile), "decoy file must survive"); + } finally { + rmSync(MEDIA_BASE, { force: true }); + rmSync(decoy, { recursive: true, force: true }); + } + }); + + it("refuses to follow a per-session dir that is a symlink", () => { + const decoy = "/tmp/bot-media-victim-child"; + rmSync(decoy, { recursive: true, force: true }); + mkdirSync(decoy, { recursive: true, mode: 0o700 }); + const decoyFile = join(decoy, "important.txt"); + writeFileSync(decoyFile, "must survive"); + + mkdirSync(MEDIA_BASE, { recursive: true, mode: 0o700 }); + const childDir = join(MEDIA_BASE, "attacker-child"); + rmSync(childDir, { recursive: true, force: true }); + symlinkSync(decoy, childDir); + + try { + assert.doesNotThrow(() => cleanupStaleSessionMedia("attacker-child")); + assert.ok(existsSync(decoyFile), "decoy target file must survive"); + } finally { + rmSync(childDir, { force: true }); + rmSync(decoy, { recursive: true, force: true }); + } + }); +}); + +describe("cleanupAllMedia symlink protection", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("unlinks a MEDIA_BASE symlink without recursing into its target", () => { + const decoy = "/tmp/bot-media-victim-all"; + rmSync(decoy, { recursive: true, force: true }); + mkdirSync(decoy, { recursive: true, mode: 0o700 }); + const decoyFile = join(decoy, "important.txt"); + writeFileSync(decoyFile, "must survive"); + + rmSync(MEDIA_BASE, { recursive: true, force: true }); + symlinkSync(decoy, MEDIA_BASE); + + try { + assert.doesNotThrow(() => cleanupAllMedia()); + assert.ok(!existsSync(MEDIA_BASE), "symlink itself removed"); + assert.ok(existsSync(decoyFile), "decoy target file must survive"); + } finally { + rmSync(MEDIA_BASE, { force: true }); + rmSync(decoy, { recursive: true, force: true }); + } + }); +}); + +describe("ensureSessionMediaDir permissions", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("creates MEDIA_BASE and session dir with mode 0o700", () => { + const dir = ensureSessionMediaDir("chat-perm"); + // Mask off file-type bits; check only permission bits. + assert.strictEqual(statSync(MEDIA_BASE).mode & 0o777, 0o700); + assert.strictEqual(statSync(dir).mode & 0o777, 0o700); + }); + + it("chmods an existing loose-permission MEDIA_BASE to 0o700", () => { + // Simulate pre-squat: another process created the dir with loose perms. + mkdirSync(MEDIA_BASE, { recursive: true, mode: 0o755 }); + assert.strictEqual(statSync(MEDIA_BASE).mode & 0o777, 0o755); + + ensureSessionMediaDir("chat-tighten"); + + assert.strictEqual(statSync(MEDIA_BASE).mode & 0o777, 0o700); + }); + + it("chmods an existing loose-permission session dir to 0o700", () => { + mkdirSync(sessionMediaDir("chat-loose"), { recursive: true, mode: 0o755 }); + assert.strictEqual(statSync(sessionMediaDir("chat-loose")).mode & 0o777, 0o755); + + ensureSessionMediaDir("chat-loose"); + + assert.strictEqual(statSync(sessionMediaDir("chat-loose")).mode & 0o777, 0o700); + }); + + it("refuses to use MEDIA_BASE if it is a symlink", () => { + const decoy = "/tmp/bot-media-decoy-target"; + rmSync(decoy, { recursive: true, force: true }); + mkdirSync(decoy, { recursive: true, mode: 0o700 }); + rmSync(MEDIA_BASE, { recursive: true, force: true }); + symlinkSync(decoy, MEDIA_BASE); + + try { + assert.throws(() => ensureSessionMediaDir("chat-symlink"), /symlink/); + } finally { + rmSync(MEDIA_BASE, { force: true }); + rmSync(decoy, { recursive: true, force: true }); + } + }); +}); + +describe("cleanupStaleSessionMedia", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("is a no-op when session dir does not exist", () => { + assert.doesNotThrow(() => cleanupStaleSessionMedia("missing-chat")); + }); + + it("removes files not registered as in-flight (orphans from a crashed prior process)", () => { + const dir = ensureSessionMediaDir("chat-orphan"); + const orphan = join(dir, "leftover.jpg"); + writeFileSync(orphan, "orphan"); + + cleanupStaleSessionMedia("chat-orphan"); + + assert.ok(!existsSync(orphan), "untracked orphan must be removed"); + }); + + it("preserves files currently registered as in-flight", () => { + const tracked = allocateMediaPath("chat-inflight", "photo", ".jpg"); + writeFileSync(tracked, "tracked"); + + cleanupStaleSessionMedia("chat-inflight"); + + assert.ok(existsSync(tracked), "in-flight file must survive stale cleanup"); + releaseMediaPath(tracked); + }); + + it("wipes orphan alongside in-flight file in the same dir (crash + rotation race)", () => { + const tracked = allocateMediaPath("chat-mixed", "photo", ".jpg"); + writeFileSync(tracked, "tracked"); + + const orphan = join(sessionMediaDir("chat-mixed"), "prior-process.jpg"); + writeFileSync(orphan, "orphan"); + + cleanupStaleSessionMedia("chat-mixed"); + + assert.ok(existsSync(tracked), "in-flight file must survive"); + assert.ok(!existsSync(orphan), "orphan next to in-flight must be removed"); + releaseMediaPath(tracked); + }); + + it("removes files after release (file is no longer in-flight)", () => { + const path = allocateMediaPath("chat-released", "photo", ".jpg"); + writeFileSync(path, "content"); + releaseMediaPath(path); + + cleanupStaleSessionMedia("chat-released"); + + assert.ok(!existsSync(path), "released file should not be preserved"); + }); +}); + +describe("discardMediaPath", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("unlinks the file and releases tracking", () => { + const path = allocateMediaPath("chat-discard", "photo", ".jpg"); + writeFileSync(path, "content"); + + discardMediaPath(path); + + assert.ok(!existsSync(path), "file should be removed"); + + // Writing the file back and running stale cleanup proves tracking was released. + writeFileSync(path, "respawn"); + cleanupStaleSessionMedia("chat-discard"); + assert.ok(!existsSync(path), "respawned file with released tracking is cleaned"); + }); + + it("is a no-op when the file does not exist", () => { + assert.doesNotThrow(() => discardMediaPath("/tmp/bot-media/chat-gone/never-existed.jpg")); + }); +}); + +describe("cleanupAllMedia", () => { + beforeEach(resetMediaBase); + afterEach(resetMediaBase); + + it("removes the entire media root and every session's files", () => { + const a = allocateMediaPath("chat-a", "photo", ".jpg"); + const b = allocateMediaPath("chat-b", "doc", ".pdf"); + writeFileSync(a, "a"); + writeFileSync(b, "b"); + assert.ok(existsSync(a) && existsSync(b)); + + cleanupAllMedia(); + + assert.ok(!existsSync(MEDIA_BASE), "media root removed"); + assert.ok(!existsSync(a)); + assert.ok(!existsSync(b)); + }); + + it("is a no-op when the media root is absent", () => { + rmSync(MEDIA_BASE, { recursive: true, force: true }); + assert.doesNotThrow(() => cleanupAllMedia()); + }); +}); + +describe("enforceMediaCap error handling", () => { + const blockedDir = sessionMediaDir("chat-blocked"); + + beforeEach(resetMediaBase); + afterEach(() => { + // Restore permissions so resetMediaBase can traverse/remove the tree. + try { chmodSync(blockedDir, 0o700); } catch { /* ignore */ } + resetMediaBase(); + }); + + it("does not throw when a session dir is unreadable", (t) => { + // Skip on root: root bypasses permission checks so chmod 0 has no effect. + if (process.getuid?.() === 0) { + t.skip("cannot simulate EACCES as root"); + return; + } + + const p = allocateMediaPath("chat-readable", "doc", ".bin"); + releaseMediaPath(p); // make evictable + writeFileSync(p, Buffer.alloc(100)); + + ensureSessionMediaDir("chat-blocked"); + writeFileSync(join(blockedDir, "file.bin"), Buffer.alloc(100)); + chmodSync(blockedDir, 0o000); + + // Must not throw — best-effort eviction. + assert.doesNotThrow(() => enforceMediaCap(50)); + + // Files in the unreadable dir were not counted/evicted, but the readable + // one may have been (total known = 100, cap = 50). + assert.ok(!existsSync(p), "readable file was evicted"); + }); +}); diff --git a/bot/src/__tests__/message-queue.test.ts b/bot/src/__tests__/message-queue.test.ts index fe138bd..1d61553 100644 --- a/bot/src/__tests__/message-queue.test.ts +++ b/bot/src/__tests__/message-queue.test.ts @@ -40,8 +40,14 @@ function createMockProcess() { agentId: string, text: string, _platform: PlatformContext, + onAgentOwnership?: () => void, ) => { calls.push({ chatId, agentId, text }); + // Mimic real bot: agent accepts the prompt as soon as the call begins + // (in production this fires when the first stream event arrives). The + // queue ignores ownership signals fired after the queue was cleared, so + // clear-mid-process tests still see drop cleanups fire correctly. + onAgentOwnership?.(); if (shouldBlock) { await new Promise((resolve) => { blockResolve = resolve; @@ -448,6 +454,266 @@ describe("MessageQueue error handling", () => { }); }); +// ------------------------------------------------------------------- +// MessageQueue — drop cleanups (persistent media reclamation) +// ------------------------------------------------------------------- + +describe("MessageQueue drop cleanups", () => { + it("runs pendingDropCleanups when processFn throws", async () => { + let dropFired = 0; + let cleanupFired = 0; + const failProcess = async () => { throw new Error("send failed"); }; + + const queue = new MessageQueue(failProcess, { debounceMs: 20 }); + const platform = mockPlatform(); + + queue.enqueue( + "chat1", "main", "hello", platform, + () => { cleanupFired++; }, + () => { dropFired++; }, + ); + + await wait(80); + + assert.strictEqual(cleanupFired, 1, "turn cleanup fires on error"); + assert.strictEqual(dropFired, 1, "drop cleanup MUST fire when delivery fails"); + + queue.clearAll(); + }); + + it("does NOT run pendingDropCleanups on successful delivery", async () => { + let dropFired = 0; + const { processFn } = createMockProcess(); + const queue = new MessageQueue(processFn, { debounceMs: 20 }); + const platform = mockPlatform(); + + queue.enqueue( + "chat1", "main", "hello", platform, + undefined, + () => { dropFired++; }, + ); + + await wait(80); + + assert.strictEqual(dropFired, 0, "drop cleanup must not fire on success — session owns file"); + + queue.clearAll(); + }); + + it("runs pendingDropCleanups exactly once when queue is cleared mid-process before ownership transfer", async () => { + let dropFired = 0; + let unblock!: () => void; + // processFn that blocks WITHOUT signaling ownership — mimics a real + // session that hasn't yet received any stream events from Claude. + const blockBeforeOwnership = async () => { + await new Promise((resolve) => { unblock = resolve; }); + }; + const queue = new MessageQueue(blockBeforeOwnership, { debounceMs: 20 }); + const platform = mockPlatform(); + + queue.enqueue( + "chat1", "main", "hello", platform, + undefined, + () => { dropFired++; }, + ); + + // Wait for flush to start + await wait(50); + assert.ok(queue.isBusy("chat1")); + + // Clear while processFn is still blocked (and ownership not yet signaled) + queue.clear("chat1"); + + // Unblock — post-await code notices queue was cleared + unblock(); + await wait(50); + + assert.strictEqual(dropFired, 1, "drop cleanup must fire exactly once on clear-while-busy when ownership hasn't transferred"); + + queue.clearAll(); + }); + + it("runs collectDropCleanups when drain processFn throws", async () => { + let dropFired = 0; + let callCount = 0; + const processFn = async () => { + callCount++; + if (callCount === 1) { + // block first call so a mid-turn message lands in collect buffer + await new Promise((r) => setTimeout(r, 30)); + } else { + throw new Error("drain exploded"); + } + }; + const queue = new MessageQueue(processFn, { debounceMs: 20 }); + const platform = mockPlatform(); + + queue.enqueue("chat1", "main", "initial", platform); + + // Wait for flush to start processing (callCount goes to 1) + await wait(40); + + // Enqueue a mid-turn message with a drop cleanup + queue.enqueue( + "chat1", "main", "mid-turn", platform, + undefined, + () => { dropFired++; }, + ); + + // Wait for flush + drain to complete (drain will throw) + await wait(120); + + assert.strictEqual(callCount, 2, "both flush and drain ran"); + assert.strictEqual(dropFired, 1, "collect drop cleanup fires on drain failure"); + + queue.clearAll(); + }); + + it("does NOT run collectDropCleanups on successful drain", async () => { + let dropFired = 0; + const mock = createMockProcess(); + const queue = new MessageQueue(mock.processFn, { debounceMs: 20 }); + const platform = mockPlatform(); + + mock.setBlocking(true); + queue.enqueue("chat1", "main", "initial", platform); + await wait(40); + + queue.enqueue( + "chat1", "main", "mid-turn", platform, + undefined, + () => { dropFired++; }, + ); + + mock.setBlocking(false); + mock.unblock(); + await wait(80); + + assert.strictEqual(dropFired, 0, "drop cleanup must not fire on successful drain"); + + queue.clearAll(); + }); + + it("runs collectDropCleanups when queue is cleared mid-drain before ownership transfer", async () => { + let dropFired = 0; + const unblockers: Array<() => void> = []; + // processFn that blocks WITHOUT signaling ownership on each call. + const blockBeforeOwnership = async () => { + await new Promise((resolve) => { unblockers.push(resolve); }); + }; + const queue = new MessageQueue(blockBeforeOwnership, { debounceMs: 20 }); + const platform = mockPlatform(); + + queue.enqueue("chat1", "main", "initial", platform); + await wait(40); // flush is now blocked on initial + + // Mid-turn message with drop cleanup — buffered in collect + queue.enqueue( + "chat1", "main", "mid-turn", platform, + undefined, + () => { dropFired++; }, + ); + + // Unblock the FIRST call; drain begins and blocks on the NEXT call. + unblockers.shift()?.(); + await wait(40); + + // Clear the queue while drain is blocked on the collect message. + queue.clear("chat1"); + + // Unblock drain so it returns and notices queue was cleared. + unblockers.shift()?.(); + await wait(40); + + assert.strictEqual(dropFired, 1, "drop cleanup fires exactly once on clear-mid-drain"); + + queue.clearAll(); + }); + + it("does NOT run pendingDropCleanups when processFn signals ownership then throws (issue #99 regression)", async () => { + let dropFired = 0; + let cleanupFired = 0; + // Simulate: agent accepted prompt (ownership signaled), then response + // relay failed (e.g. Telegram sendMessage failed for the first chunk). + const ownThenFail = async ( + _chatId: string, _agentId: string, _text: string, + _platform: PlatformContext, onAgentOwnership: () => void, + ) => { + onAgentOwnership(); + throw new Error("response relay failed after agent committed turn"); + }; + + const queue = new MessageQueue(ownThenFail, { debounceMs: 20 }); + const platform = mockPlatform(); + + queue.enqueue( + "chat1", "main", "hello", platform, + () => { cleanupFired++; }, + () => { dropFired++; }, + ); + + await wait(80); + + assert.strictEqual(cleanupFired, 1, "turn cleanup still fires on relay failure"); + assert.strictEqual(dropFired, 0, "drop cleanup MUST NOT fire after ownership transferred — session owns media"); + + queue.clearAll(); + }); + + it("does NOT run collectDropCleanups when drain processFn signals ownership then throws", async () => { + let dropFired = 0; + let callCount = 0; + const processFn = async ( + _chatId: string, _agentId: string, _text: string, + _platform: PlatformContext, onAgentOwnership: () => void, + ) => { + callCount++; + if (callCount === 1) { + await new Promise((r) => setTimeout(r, 30)); + } else { + onAgentOwnership(); + throw new Error("drain relay failed after agent committed turn"); + } + }; + const queue = new MessageQueue(processFn, { debounceMs: 20 }); + const platform = mockPlatform(); + + queue.enqueue("chat1", "main", "initial", platform); + await wait(40); + + queue.enqueue( + "chat1", "main", "mid-turn", platform, + undefined, + () => { dropFired++; }, + ); + + await wait(120); + + assert.strictEqual(callCount, 2, "both flush and drain ran"); + assert.strictEqual(dropFired, 0, "drop cleanup MUST NOT fire after ownership transferred mid-drain"); + + queue.clearAll(); + }); + + it("runs drop cleanups when message is dropped by cap", () => { + let dropFired = 0; + const { processFn } = createMockProcess(); + const queue = new MessageQueue(processFn, { debounceMs: 1000, queueCap: 1 }); + const platform = mockPlatform(); + + queue.enqueue("chat1", "main", "first", platform); + queue.enqueue( + "chat1", "main", "second", platform, + undefined, + () => { dropFired++; }, + ); + + assert.strictEqual(dropFired, 1, "cap-dropped message runs its drop cleanup"); + + queue.clearAll(); + }); +}); + // ------------------------------------------------------------------- // MessageQueue — inject file writing // ------------------------------------------------------------------- diff --git a/bot/src/__tests__/session-manager.test.ts b/bot/src/__tests__/session-manager.test.ts index ffee42e..b6e6c65 100644 --- a/bot/src/__tests__/session-manager.test.ts +++ b/bot/src/__tests__/session-manager.test.ts @@ -1,6 +1,6 @@ import { describe, it, beforeEach, afterEach, mock } from "node:test"; import assert from "node:assert/strict"; -import { mkdirSync, rmSync, existsSync, readFileSync } from "node:fs"; +import { mkdirSync, rmSync, existsSync, readFileSync, writeFileSync } from "node:fs"; import { EventEmitter } from "node:events"; import { Readable, Writable, PassThrough } from "node:stream"; import type { ChildProcess } from "node:child_process"; @@ -40,6 +40,7 @@ const testConfig: BotConfig = { maxConcurrentSessions: 2, maxMessageAgeMs: 300000, requireMention: false, + maxMediaBytes: 209715200, }, }; @@ -133,6 +134,82 @@ describe("SessionManager", () => { await manager.destroySession("nonexistent"); }); + it("destroySession wipes media dir even when no active session exists", async () => { + const { SessionManager } = await import("../session-manager.js"); + const { sessionMediaDir, ensureSessionMediaDir } = await import("../media-store.js"); + + // Simulate post-crash state: media dir populated but no in-memory session + const dir = ensureSessionMediaDir("chat-orphan-media"); + const filePath = `${dir}/leftover.jpg`; + writeFileSync(filePath, "stale"); + assert.ok(existsSync(filePath), "precondition: leftover file exists"); + + const manager = new SessionManager(() => testConfig, TEST_STORE_PATH); + await manager.destroySession("chat-orphan-media"); + + assert.ok(!existsSync(filePath), "destroySession must remove leftover media"); + assert.ok(!existsSync(sessionMediaDir("chat-orphan-media")), "media dir should be gone"); + }); + + it("resolveStoredSession purges stale media on agent mismatch but preserves current-turn download", async () => { + const { SessionManager } = await import("../session-manager.js"); + const { SessionStore } = await import("../session-store.js"); + const { ensureSessionMediaDir, allocateMediaPath, releaseMediaPath } = await import("../media-store.js"); + + // Pre-populate store with a session using "main" agent + const store = new SessionStore(TEST_STORE_PATH); + store.setSession("chat-race", { + sessionId: "old-session-id", + chatId: "chat-race", + agentId: "main", + lastActivity: Date.now(), + }); + + // Stale leftover from the prior agent — written directly, not tracked as in-flight. + // Simulates both aged orphans and just-crashed-process leftovers. + const dir = ensureSessionMediaDir("chat-race"); + const stale = `${dir}/photo-prior-session.jpg`; + writeFileSync(stale, "stale"); + + // Current-turn download — registered as in-flight via allocateMediaPath. + const justDownloaded = allocateMediaPath("chat-race", "photo", ".jpg"); + writeFileSync(justDownloaded, "current"); + + const manager = new SessionManager(() => testConfig, TEST_STORE_PATH); + const result = manager.resolveStoredSession("chat-race", "agent-b"); + + assert.strictEqual(result.resume, false, "mismatched agent should not resume"); + assert.ok(existsSync(justDownloaded), "current-turn download must survive mismatch resolution"); + assert.ok(!existsSync(stale), "prior-agent leftover must be purged on rotation"); + + releaseMediaPath(justDownloaded); + }); + + it("resolveStoredSession preserves session media when agent matches (same-session resume)", async () => { + const { SessionManager } = await import("../session-manager.js"); + const { SessionStore } = await import("../session-store.js"); + const { ensureSessionMediaDir } = await import("../media-store.js"); + + const store = new SessionStore(TEST_STORE_PATH); + store.setSession("chat-resume", { + sessionId: "resume-session-id", + chatId: "chat-resume", + agentId: "main", + lastActivity: Date.now(), + }); + + // A file from the prior turn of the SAME logical session — legitimate context. + const dir = ensureSessionMediaDir("chat-resume"); + const priorTurn = `${dir}/photo-prior-turn.jpg`; + writeFileSync(priorTurn, "prior"); + + const manager = new SessionManager(() => testConfig, TEST_STORE_PATH); + const result = manager.resolveStoredSession("chat-resume", "main"); + + assert.strictEqual(result.resume, true, "matching agent must allow resume"); + assert.ok(existsSync(priorTurn), "prior-turn media must survive resume of same session"); + }); + it("closeSession preserves stored state (reconnect can resume)", async () => { const { SessionManager } = await import("../session-manager.js"); const { SessionStore } = await import("../session-store.js"); diff --git a/bot/src/__tests__/telegram-bot.test.ts b/bot/src/__tests__/telegram-bot.test.ts index 9fcf1b1..8610321 100644 --- a/bot/src/__tests__/telegram-bot.test.ts +++ b/bot/src/__tests__/telegram-bot.test.ts @@ -1275,6 +1275,7 @@ describe("command handler wiring", () => { maxConcurrentSessions: 2, maxMessageAgeMs: 300000, requireMention: false, + maxMediaBytes: 209715200, }, }; diff --git a/bot/src/config.ts b/bot/src/config.ts index 806255c..43d32da 100644 --- a/bot/src/config.ts +++ b/bot/src/config.ts @@ -5,6 +5,7 @@ import { fileURLToPath } from "node:url"; import { parse as parseYaml } from "yaml"; import type { BotConfig, AgentConfig, TelegramBinding, TopicOverride, SessionDefaults, DiscordBinding, DiscordChannelOverride, DiscordConfig } from "./types.js"; import { log, parseLogLevel } from "./logger.js"; +import { DEFAULT_MAX_MEDIA_BYTES } from "./media-store.js"; const __dirname = dirname(fileURLToPath(import.meta.url)); const DEFAULT_CONFIG_PATH = resolve(__dirname, "..", "..", "config.yaml"); @@ -268,7 +269,7 @@ function validateDiscordConfig(raw: RawConfig["discord"], agents: Record; @@ -304,7 +305,15 @@ export function validateSessionDefaults(raw: unknown): SessionDefaults { requireMention = obj.requireMention; } - return { idleTimeoutMs, maxConcurrentSessions, maxMessageAgeMs, requireMention }; + let maxMediaBytes = DEFAULT_MAX_MEDIA_BYTES; + if (obj.maxMediaBytes !== undefined) { + if (typeof obj.maxMediaBytes !== "number" || !Number.isFinite(obj.maxMediaBytes) || obj.maxMediaBytes <= 0) { + throw new Error(`Invalid maxMediaBytes: ${obj.maxMediaBytes} (must be a finite positive number)`); + } + maxMediaBytes = obj.maxMediaBytes; + } + + return { idleTimeoutMs, maxConcurrentSessions, maxMessageAgeMs, requireMention, maxMediaBytes }; } export function loadConfig(configPath?: string): BotConfig { diff --git a/bot/src/discord-bot.ts b/bot/src/discord-bot.ts index c36ba74..404703f 100644 --- a/bot/src/discord-bot.ts +++ b/bot/src/discord-bot.ts @@ -177,9 +177,9 @@ export async function createDiscordBot( const maxMessageAgeMs = config.sessionDefaults.maxMessageAgeMs; const messageQueue = new MessageQueue( - async (chatId, agentId, text, platform) => { + async (chatId, agentId, text, platform, onAgentOwnership) => { const stream = sessionManager.sendSessionMessage(chatId, agentId, text); - await relayStream(stream, platform, outboxDir(chatId)); + await relayStream(stream, platform, outboxDir(chatId), onAgentOwnership); }, ); diff --git a/bot/src/main.ts b/bot/src/main.ts index 3cc581e..c5b818e 100644 --- a/bot/src/main.ts +++ b/bot/src/main.ts @@ -142,6 +142,11 @@ async function main(): Promise { clearTimeout(startupTimeout); setBotUsername(botInfo.username); log.info("main", `Telegram bot @${botInfo.username} is running (id: ${botInfo.id})`); + // No global media wipe on startup: grammY invokes onStart before the + // first getUpdates, so polling ownership isn't proven yet. A blanket + // wipe here can clobber files that an overlapping old instance is + // still serving. Orphans from prior runs are reclaimed via per-session + // cleanupSessionMediaDir on close and enforceMediaCap eviction. if (watchdog) watchdog.start(); try { await bot.api.setMyCommands(BOT_COMMANDS); diff --git a/bot/src/media-store.ts b/bot/src/media-store.ts new file mode 100644 index 0000000..87c82f3 --- /dev/null +++ b/bot/src/media-store.ts @@ -0,0 +1,282 @@ +import { mkdirSync, rmSync, readdirSync, statSync, unlinkSync, existsSync, lstatSync, chmodSync } from "node:fs"; +import { join } from "node:path"; +import { randomUUID } from "node:crypto"; +import { log } from "./logger.js"; + +export const MEDIA_BASE = "/tmp/bot-media"; +export const DEFAULT_MAX_MEDIA_BYTES = 200 * 1024 * 1024; + +/** + * Paths downloaded by a handler that have NOT yet been released to a + * session. On successful delivery (queue consumes the message) the handler + * calls `releaseMediaPath` and the path leaves this set — the session owns + * the file from that point and it's reclaimed on session close. On drop + * paths (queue cap exceeded, /reconnect, /clean, handler error before + * enqueue) the handler calls `discardMediaPath` which removes from the set + * and unlinks. Consulted by `cleanupStaleSessionMedia` and + * `enforceMediaCap` so they never delete files that are pre-delivery + * in-flight. + */ +const inflightMediaPaths = new Set(); + +function safeChatId(chatId: string): string { + return chatId.replace(/[^a-zA-Z0-9_-]/g, "_"); +} + +export function sessionMediaDir(chatId: string): string { + return join(MEDIA_BASE, safeChatId(chatId)); +} + +/** + * Create `path` with mode 0o700 if missing, otherwise verify it's a real dir + * (not a symlink) and force permissions to 0o700. mkdirSync's `mode` option is + * ignored when the dir already exists, so a pre-squatted `/tmp/bot-media` with + * loose perms would otherwise leak filenames to other local users. + */ +function ensureSecureDir(path: string): void { + mkdirSync(path, { recursive: true, mode: 0o700 }); + const stat = lstatSync(path); + if (stat.isSymbolicLink()) { + throw new Error(`Refusing to use ${path}: it is a symlink`); + } + if (!stat.isDirectory()) { + throw new Error(`Refusing to use ${path}: not a directory`); + } + if ((stat.mode & 0o777) !== 0o700) { + chmodSync(path, 0o700); + } +} + +export function ensureSessionMediaDir(chatId: string): string { + const dir = sessionMediaDir(chatId); + // mode 0o700: only the bot user can traverse/list. On shared hosts this + // prevents other local users from enumerating filenames of downloaded media. + ensureSecureDir(MEDIA_BASE); + ensureSecureDir(dir); + return dir; +} + +/** + * Verify MEDIA_BASE exists and is a real directory (not a symlink). Returns + * false on symlink (refuse to follow — a pre-squatted link could redirect + * deletions outside the intended tree) or missing. Callers treat false as + * "nothing safe to clean; bail out". + */ +function mediaBaseSafeToTouch(): boolean { + if (!existsSync(MEDIA_BASE)) return false; + try { + const stat = lstatSync(MEDIA_BASE); + if (stat.isSymbolicLink()) { + log.warn("media-store", `Refusing to touch ${MEDIA_BASE}: it is a symlink`); + return false; + } + if (!stat.isDirectory()) { + log.warn("media-store", `Refusing to touch ${MEDIA_BASE}: not a directory`); + return false; + } + } catch (err) { + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to stat ${MEDIA_BASE}: ${(err as Error).message}`); + } + return false; + } + return true; +} + +export function cleanupSessionMediaDir(chatId: string): void { + if (!mediaBaseSafeToTouch()) return; + rmSync(sessionMediaDir(chatId), { recursive: true, force: true }); +} + +/** + * Wipe the entire media root. Not invoked automatically — see the startup + * comment in main.ts for why a blanket wipe at boot is unsafe (polling + * ownership isn't proven yet, could clobber an overlapping old instance's + * files). Kept as a manual escape hatch; per-session `cleanupSessionMediaDir` + * on close and `enforceMediaCap` eviction reclaim orphans during normal + * operation. + */ +export function cleanupAllMedia(): void { + if (!existsSync(MEDIA_BASE)) return; + try { + const stat = lstatSync(MEDIA_BASE); + if (stat.isSymbolicLink()) { + // Pre-squatted symlink: unlink the link itself, do not recurse into target. + log.warn("media-store", `${MEDIA_BASE} is a symlink; unlinking the link only`); + unlinkSync(MEDIA_BASE); + return; + } + } catch (err) { + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to stat ${MEDIA_BASE}: ${(err as Error).message}`); + } + return; + } + rmSync(MEDIA_BASE, { recursive: true, force: true }); +} + +/** + * Remove files in this session's media dir that are not currently tracked as + * in-flight. Used when a stored session is discarded (agent changed or + * deleted) to wipe leftovers from the prior logical session — including + * orphans from a crashed prior process — without deleting the file the + * current handler just downloaded for the next session's turn. + */ +export function cleanupStaleSessionMedia(chatId: string): void { + if (!mediaBaseSafeToTouch()) return; + const dir = sessionMediaDir(chatId); + if (!existsSync(dir)) return; + // Verify the per-session dir is a real dir, not a symlink. A pre-squatted + // symlink at /tmp/bot-media/ would otherwise let unlinkSync resolve + // through it and delete files in the target tree. MEDIA_BASE is normally + // 0o700 once `ensureSecureDir` has run, but this can fire on session + // rotation before any download has tightened perms. + try { + const stat = lstatSync(dir); + if (stat.isSymbolicLink()) { + log.warn("media-store", `Refusing to clean ${dir}: it is a symlink`); + return; + } + if (!stat.isDirectory()) { + log.warn("media-store", `Refusing to clean ${dir}: not a directory`); + return; + } + } catch (err) { + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to stat ${dir}: ${(err as Error).message}`); + } + return; + } + let entries; + try { + entries = readdirSync(dir, { withFileTypes: true }); + } catch (err) { + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to scan ${dir} for stale cleanup: ${(err as Error).message}`); + } + return; + } + for (const entry of entries) { + if (!entry.isFile()) continue; + const path = join(dir, entry.name); + if (inflightMediaPaths.has(path)) continue; + try { + unlinkSync(path); + log.debug("media-store", `Removed stale media ${path} on session rotation`); + } catch (err) { + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to clean ${path}: ${(err as Error).message}`); + } + } + } +} + +export function allocateMediaPath(chatId: string, prefix: string, extension: string): string { + const dir = ensureSessionMediaDir(chatId); + const path = join(dir, `${prefix}-${randomUUID()}${extension}`); + inflightMediaPaths.add(path); + return path; +} + +/** + * Mark a media file as no longer in-flight (delivered to a session, which now + * owns its lifetime). Does not touch the file. + */ +export function releaseMediaPath(path: string): void { + inflightMediaPaths.delete(path); +} + +/** + * Release tracking AND unlink the file. Used when a media file is dropped + * (queue cap exceeded, /reconnect, /clean) or the handler hits an error + * before enqueue. + */ +export function discardMediaPath(path: string): void { + inflightMediaPaths.delete(path); + try { + unlinkSync(path); + } catch (err) { + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to discard ${path}: ${(err as Error).message}`); + } + } +} + +function isMissingErr(err: unknown): boolean { + return typeof err === "object" && err !== null && (err as { code?: string }).code === "ENOENT"; +} + +/** + * Evict oldest files (by mtime) across all session media dirs until total bytes ≤ maxBytes. + * Empty session dirs are left in place; they're reclaimed on session close. + */ +export function enforceMediaCap(maxBytes: number): void { + // Best-effort housekeeping: never throw. An unrelated permission/IO error + // in another chat's dir must not fail the current download-enqueue path. + if (!existsSync(MEDIA_BASE)) return; + + const candidates: { path: string; size: number; mtime: number }[] = []; + let total = 0; + let chatEntries; + try { + chatEntries = readdirSync(MEDIA_BASE, { withFileTypes: true }); + } catch (err) { + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to scan ${MEDIA_BASE}: ${(err as Error).message}`); + } + return; + } + for (const chatEntry of chatEntries) { + if (!chatEntry.isDirectory()) continue; + const dir = join(MEDIA_BASE, chatEntry.name); + let fileEntries; + try { + fileEntries = readdirSync(dir, { withFileTypes: true }); + } catch (err) { + // Session dir may have been removed concurrently (cleanup on close). + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to scan ${dir}: ${(err as Error).message}`); + } + continue; + } + for (const fileEntry of fileEntries) { + if (!fileEntry.isFile()) continue; + const path = join(dir, fileEntry.name); + try { + const stat = statSync(path); + total += stat.size; + // In-flight files (downloaded, not yet handed to a session) must not + // be evicted — the handler is about to enqueue them and the agent + // would get a path that no longer exists. + if (!inflightMediaPaths.has(path)) { + candidates.push({ path, size: stat.size, mtime: stat.mtimeMs }); + } + } catch (err) { + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to stat ${path}: ${(err as Error).message}`); + } + } + } + } + + if (total <= maxBytes) return; + + candidates.sort((a, b) => a.mtime - b.mtime); + + for (const f of candidates) { + if (total <= maxBytes) break; + try { + unlinkSync(f.path); + total -= f.size; + log.debug("media-store", `Evicted ${f.path} (${f.size} bytes) to stay under cap`); + } catch (err) { + if (!isMissingErr(err)) { + log.warn("media-store", `Failed to evict ${f.path}: ${(err as Error).message}`); + } + } + } + + if (total > maxBytes) { + log.warn("media-store", `Media cap ${maxBytes} exceeded: ${total} bytes remain after eviction sweep`); + } +} diff --git a/bot/src/message-queue.ts b/bot/src/message-queue.ts index 4cf7607..1ad9fc8 100644 --- a/bot/src/message-queue.ts +++ b/bot/src/message-queue.ts @@ -8,12 +8,18 @@ export const DEFAULT_QUEUE_CAP = 20; /** * Callback that sends combined text to Claude and relays the response. * Called by the queue when debounce expires or collect buffer drains. + * + * `onAgentOwnership` MUST be invoked once the agent has accepted the prompt + * (the conversation history now references any media paths in the text). After + * that point, even if response relay fails, persistent media must NOT be + * discarded — the agent owns it for the rest of the session. */ export type ProcessFn = ( chatId: string, agentId: string, text: string, platform: PlatformContext, + onAgentOwnership: () => void, ) => Promise; /** Fire-and-forget cleanup callback (e.g. delete a temp file after processing). */ @@ -22,14 +28,23 @@ export type CleanupFn = () => void; interface ChatQueueState { /** Messages pending debounce timer (pre-send) */ pendingTexts: string[]; - /** Cleanup callbacks for pending messages */ + /** Cleanup callbacks for pending messages (fire on successful delivery) */ pendingCleanups: CleanupFn[]; + /** + * Drop-only cleanup callbacks for pending messages. Fire when the message + * is dropped (cap exceeded) or the queue is cleared (/reconnect, /clean). + * Discarded on successful flush — the session will own the file and clean + * it up on close. Used for persistent media that must outlive the turn. + */ + pendingDropCleanups: CleanupFn[]; debounceTimer: ReturnType | null; /** Messages collected during active processing (mid-turn) */ collectBuffer: string[]; - /** Cleanup callbacks for collected messages */ + /** Cleanup callbacks for collected messages (fire on successful delivery) */ collectCleanups: CleanupFn[]; + /** Drop-only cleanup callbacks for collected messages (see pendingDropCleanups). */ + collectDropCleanups: CleanupFn[]; /** Deferred cleanups for messages consumed by hook mid-turn (temp files still in use) */ deferredCleanups: CleanupFn[]; @@ -95,9 +110,11 @@ export class MessageQueue { state = { pendingTexts: [], pendingCleanups: [], + pendingDropCleanups: [], debounceTimer: null, collectBuffer: [], collectCleanups: [], + collectDropCleanups: [], deferredCleanups: [], busy: false, latestPlatform: null, @@ -114,8 +131,24 @@ export class MessageQueue { /** * Enqueue a message for a chat. Handles debouncing and mid-turn collect. * Fire-and-forget: returns immediately, processing happens in background. + * + * `cleanup` runs when the message is consumed (successful delivery or drop) + * and is the right hook for turn-scoped temp files. + * + * `dropCleanup` runs only on drop/clear paths (cap exceeded, /reconnect, + * /clean). It is discarded on successful delivery so the callee can own the + * file for the session lifetime (persistent media). Use this for downloads + * that must survive the turn but be reclaimed if the message never reaches + * an agent. */ - enqueue(chatId: string, agentId: string, text: string, platform: PlatformContext, cleanup?: CleanupFn): void { + enqueue( + chatId: string, + agentId: string, + text: string, + platform: PlatformContext, + cleanup?: CleanupFn, + dropCleanup?: CleanupFn, + ): void { const state = this.getState(chatId, agentId); state.latestPlatform = platform; @@ -131,6 +164,8 @@ export class MessageQueue { const consumed = state.injectConsumed; state.collectBuffer.splice(0, consumed); const consumedCleanups = state.collectCleanups.splice(0, consumed); + // Drop cleanups for hook-consumed messages are dropped: the agent owns the file. + state.collectDropCleanups.splice(0, consumed); state.deferredCleanups.push(...consumedCleanups); state.injectConsumed = 0; } @@ -139,6 +174,7 @@ export class MessageQueue { if (state.collectBuffer.length < this.queueCap) { state.collectBuffer.push(text); state.collectCleanups.push(cleanup ?? (() => {})); + state.collectDropCleanups.push(dropCleanup ?? (() => {})); // Write inject file so PreToolUse hook can deliver mid-turn this.writeInject(chatId, state); @@ -149,6 +185,7 @@ export class MessageQueue { ); } else { if (cleanup) cleanup(); + if (dropCleanup) dropCleanup(); log.warn( "message-queue", `Collect buffer full for ${chatId}, dropping message`, @@ -160,6 +197,7 @@ export class MessageQueue { // Pre-send debounce: add to pending and reset timer if (state.pendingTexts.length >= this.queueCap) { if (cleanup) cleanup(); + if (dropCleanup) dropCleanup(); log.warn( "message-queue", `Debounce buffer full for ${chatId}, dropping message`, @@ -167,7 +205,8 @@ export class MessageQueue { return; } state.pendingTexts.push(text); - if (cleanup) state.pendingCleanups.push(cleanup); + state.pendingCleanups.push(cleanup ?? (() => {})); + state.pendingDropCleanups.push(dropCleanup ?? (() => {})); if (state.debounceTimer) { clearTimeout(state.debounceTimer); @@ -186,6 +225,11 @@ export class MessageQueue { const texts = state.pendingTexts.splice(0); const cleanups = state.pendingCleanups.splice(0); + // Hold drop cleanups locally during processing. If processFn throws, or + // the queue is cleared mid-process, we must run them so persistent media + // doesn't leak on disk. Splicing out of state now also means clear()'s + // own drop-cleanup loop won't double-fire them. + const dropCleanups = state.pendingDropCleanups.splice(0); state.debounceTimer = null; state.busy = true; @@ -195,9 +239,22 @@ export class MessageQueue { // relayStream() will clear this timer on handoff and start its own this.startPreStreamTyping(state.latestPlatform); + // Mutable holder so onAgentOwnership can drop the cleanups: once the + // agent has accepted the prompt, the conversation references any media + // paths and we must never reclaim them via drop cleanup, even if the + // response relay fails afterward (issue #99 regression vector). If the + // queue was cleared before ownership transferred (/reconnect, /clean), + // ignore the signal — the session is being torn down and drop cleanups + // must still run. + let liveDropCleanups: CleanupFn[] | null = dropCleanups; + const transferOwnership = () => { + if (this.queues.get(chatId) !== state) return; + liveDropCleanups = null; + }; + try { if (state.latestPlatform) { - await this.processFn(chatId, state.agentId, combinedText, state.latestPlatform); + await this.processFn(chatId, state.agentId, combinedText, state.latestPlatform, transferOwnership); } } catch (err) { log.error("message-queue", `Send error for ${chatId}:`, err); @@ -211,8 +268,16 @@ export class MessageQueue { for (const fn of cleanups) fn(); } - // If queue was cleared during processing (e.g., /reconnect), stop here - if (this.queues.get(chatId) !== state) return; + // If transferOwnership() fired, liveDropCleanups is null and we skip — the + // session now owns the media for its full lifetime, even if response relay + // failed afterward. Otherwise (queue cleared, processFn threw before + // ownership, or processFn returned without ever taking ownership): the + // agent never claimed the media, reclaim it. + const queueCleared = this.queues.get(chatId) !== state; + if (liveDropCleanups) { + for (const fn of liveDropCleanups) fn(); + } + if (queueCleared) return; // Run deferred cleanups from mid-turn compaction (temp files safe to delete now) for (const fn of state.deferredCleanups) fn(); @@ -236,6 +301,7 @@ export class MessageQueue { if (consumed > 0) { state.collectBuffer.splice(0, consumed); const consumedCleanups = state.collectCleanups.splice(0, consumed); + state.collectDropCleanups.splice(0, consumed); for (const fn of consumedCleanups) fn(); log.debug("message-queue", `Deduped ${consumed} inject-consumed message(s) for ${chatId}`); } @@ -243,6 +309,7 @@ export class MessageQueue { // If all messages were consumed by hook, just run cleanups and return if (state.collectBuffer.length === 0) { const cleanups = state.collectCleanups.splice(0); + state.collectDropCleanups.splice(0); for (const fn of cleanups) fn(); return; } @@ -254,6 +321,7 @@ export class MessageQueue { if (loopConsumed > 0) { state.collectBuffer.splice(0, loopConsumed); const loopCleanups = state.collectCleanups.splice(0, loopConsumed); + state.collectDropCleanups.splice(0, loopConsumed); for (const fn of loopCleanups) fn(); log.debug("message-queue", `Deduped ${loopConsumed} inject-consumed message(s) for ${chatId} (drain loop)`); } @@ -261,6 +329,12 @@ export class MessageQueue { const collected = state.collectBuffer.splice(0); const cleanups = state.collectCleanups.splice(0); + // Hold drop cleanups locally for exactly this batch. If processFn + // throws or the queue is cleared mid-drain, we must run them. Any + // drop cleanups added during processing (new mid-turn collect) stay + // in state — they'll be processed on the next loop iteration, or + // handled by clear(). + const dropCleanups = state.collectDropCleanups.splice(0, collected.length); const prompt = buildCollectPrompt(collected); state.busy = true; @@ -271,9 +345,15 @@ export class MessageQueue { this.startPreStreamTyping(state.latestPlatform); + let liveDropCleanups: CleanupFn[] | null = dropCleanups; + const transferOwnership = () => { + if (this.queues.get(chatId) !== state) return; + liveDropCleanups = null; + }; + try { if (state.latestPlatform) { - await this.processFn(chatId, state.agentId, prompt, state.latestPlatform); + await this.processFn(chatId, state.agentId, prompt, state.latestPlatform, transferOwnership); } } catch (err) { log.error("message-queue", `Collect drain error for ${chatId}:`, err); @@ -287,8 +367,11 @@ export class MessageQueue { for (const fn of cleanups) fn(); } - // If queue was cleared during processing, stop draining - if (this.queues.get(chatId) !== state) return; + const queueCleared = this.queues.get(chatId) !== state; + if (liveDropCleanups) { + for (const fn of liveDropCleanups) fn(); + } + if (queueCleared) return; // Run deferred cleanups from mid-turn compaction (temp files safe to delete now) for (const fn of state.deferredCleanups) fn(); @@ -321,7 +404,9 @@ export class MessageQueue { clearTimeout(state.debounceTimer); } for (const fn of state.pendingCleanups) fn(); + for (const fn of state.pendingDropCleanups) fn(); for (const fn of state.collectCleanups) fn(); + for (const fn of state.collectDropCleanups) fn(); for (const fn of state.deferredCleanups) fn(); this.queues.delete(chatId); } @@ -350,7 +435,9 @@ export class MessageQueue { clearTimeout(state.debounceTimer); } for (const fn of state.pendingCleanups) fn(); + for (const fn of state.pendingDropCleanups) fn(); for (const fn of state.collectCleanups) fn(); + for (const fn of state.collectDropCleanups) fn(); for (const fn of state.deferredCleanups) fn(); try { cleanupInjectDir(injectDirForChat(chatId)); } catch { /* ignore */ } } diff --git a/bot/src/session-manager.ts b/bot/src/session-manager.ts index 7f51a7f..421e848 100644 --- a/bot/src/session-manager.ts +++ b/bot/src/session-manager.ts @@ -10,6 +10,7 @@ import { SessionStore } from "./session-store.js"; import { log } from "./logger.js"; import { recordResultMetrics, sessionsActive, sessionCrashes } from "./metrics.js"; import { injectDirForChat, cleanupInjectDir, writeInjectFile } from "./inject-file.js"; +import { ensureSessionMediaDir, cleanupSessionMediaDir, cleanupStaleSessionMedia } from "./media-store.js"; const LOG_DIR = process.env.LOG_DIR ?? join(homedir(), ".minime", "logs"); const OUTBOX_BASE = "/tmp/bot-outbox"; @@ -214,6 +215,11 @@ export class SessionManager { cleanupInjectDir(injectPath); mkdirSync(injectPath, { recursive: true }); + // Ensure media directory exists (do NOT wipe: a photo may have been + // downloaded into it moments before this spawn was triggered). + // Cleanup happens on session close, crash recovery, and via the global cap. + ensureSessionMediaDir(chatId); + // Spawn the claude subprocess const child = spawnClaudeSession({ agent, @@ -232,6 +238,14 @@ export class SessionManager { if (!hasExited(child) && !child.killed) { child.kill("SIGKILL"); } + // No session will be created to own files just downloaded for this turn; + // wipe the dir so they don't sit around until the next startup/cap eviction. + // Skip when resuming: the stored session record stays intact, so a later + // successful resume will continue the same conversation history — and that + // history may reference files already in this dir from prior turns. + if (!resume) { + try { cleanupSessionMediaDir(chatId); } catch { /* ignore */ } + } // Increment crash count so startup failures contribute to backoff const count = (this.restartCounts.get(chatId) ?? 0) + 1; this.restartCounts.set(chatId, count); @@ -412,6 +426,19 @@ export class SessionManager { } } + /** + * Extend the idle window for an active session without creating one. + * Called by message handlers while staging incoming payloads (e.g. media + * downloads) so the idle timer cannot fire mid-download and wipe the + * session media dir before the queued message is consumed. + */ + touchActivity(chatId: string): void { + const session = this.active.get(chatId); + if (!session) return; + session.lastActivity = Date.now(); + this.resetIdleTimer(chatId); + } + /** Reset the idle timer for a session. After timeout, session is closed. */ resetIdleTimer(chatId: string): void { const session = this.active.get(chatId); @@ -447,7 +474,7 @@ export class SessionManager { this.active.delete(chatId); sessionsActive.dec(); - // Clean up outbox and inject directories + // Clean up outbox, inject, and media directories try { rmSync(session.outboxPath, { recursive: true, force: true }); } catch { @@ -458,6 +485,11 @@ export class SessionManager { } catch { // Ignore cleanup errors } + try { + cleanupSessionMediaDir(chatId); + } catch { + // Ignore cleanup errors + } // Gracefully terminate (even if SIGTERM was already sent elsewhere) if (!hasExited(session.child)) { @@ -539,6 +571,9 @@ export class SessionManager { async destroySession(chatId: string): Promise { await this.closeSession(chatId); this.store.deleteSession(chatId); + // closeSession only touches the media dir when an in-memory session exists; + // /clean after a bot restart/crash (or before any spawn) must still wipe it. + try { cleanupSessionMediaDir(chatId); } catch { /* ignore */ } } /** Close all sessions gracefully. For shutdown. */ @@ -597,6 +632,12 @@ export class SessionManager { : `agentId changed from "${stored.agentId}" to "${agentId}"`; log.warn("session-manager", `Discarding stale session for chat ${chatId}: ${reason}`); this.store.deleteSession(chatId); + // Purge leftover media belonging to the discarded session so the new + // agent cannot read the prior agent's files. Files currently tracked + // as in-flight (the download the active handler just enqueued) are + // preserved; anything else — including orphans from a crashed prior + // process — is wiped. + try { cleanupStaleSessionMedia(chatId); } catch { /* ignore */ } return { resume: false, sessionId: randomUUID() }; } @@ -638,6 +679,8 @@ export class SessionManager { // Clean up inject directory (stale files would confuse next spawn) try { cleanupInjectDir(session.injectDir); } catch { /* ignore */ } + // Clean up media directory — files are scoped to this session's lifetime + try { cleanupSessionMediaDir(chatId); } catch { /* ignore */ } if (code !== 0 && signal !== "SIGTERM" && signal !== "SIGKILL") { sessionCrashes.inc({ agent_id: session.agentId }); diff --git a/bot/src/stream-relay.ts b/bot/src/stream-relay.ts index cb8c2ba..a1894b8 100644 --- a/bot/src/stream-relay.ts +++ b/bot/src/stream-relay.ts @@ -141,6 +141,7 @@ export async function relayStream( stream: AsyncGenerator, platform: PlatformContext, outboxPath?: string, + onAgentOwnership?: () => void, ): Promise { let accumulated = ""; let typingTimer: ReturnType | null = null; @@ -195,8 +196,17 @@ export async function relayStream( try { let resultText: string | null = null; + let ownershipSignaled = false; for await (const msg of stream) { + // First event from the stream means Claude received the prompt over + // stdin and started processing — the conversation history now references + // any media paths in the prompt. Signal ownership so the queue won't + // reclaim media if response delivery fails afterward (issue #99). + if (!ownershipSignaled) { + ownershipSignaled = true; + onAgentOwnership?.(); + } // Detect non-text content blocks (tool_use, etc.) so we can insert a // paragraph break when the next text block starts. Without this, // "plan:" + [Edit tool] + "Done!" would become "plan:Done!". diff --git a/bot/src/telegram-bot.ts b/bot/src/telegram-bot.ts index de60138..92bf1cf 100644 --- a/bot/src/telegram-bot.ts +++ b/bot/src/telegram-bot.ts @@ -6,6 +6,7 @@ import { relayStream } from "./stream-relay.js"; import { MessageQueue } from "./message-queue.js"; import { createTelegramAdapter } from "./telegram-adapter.js"; import { tempFilePath, downloadFile, transcribeAudio, cleanupTempFile } from "./voice.js"; +import { allocateMediaPath, enforceMediaCap, releaseMediaPath, discardMediaPath } from "./media-store.js"; import { isImageMimeType, imageExtensionForMime } from "./mime.js"; import { log } from "./logger.js"; import { recordTelegramApiError, messagesReceived, messagesSent } from "./metrics.js"; @@ -539,9 +540,9 @@ export function createTelegramBot( // Message queue: debounce rapid messages and collect mid-turn messages const messageQueue = new MessageQueue( - async (chatId, agentId, text, platform) => { + async (chatId, agentId, text, platform, onAgentOwnership) => { const stream = sessionManager.sendSessionMessage(chatId, agentId, text); - await relayStream(stream, platform, outboxDir(chatId)); + await relayStream(stream, platform, outboxDir(chatId), onAgentOwnership); }, ); @@ -788,6 +789,10 @@ export function createTelegramBot( const key = sessionKey(chatId, topicId); let tempPath: string | null = null; + // Keep any active session alive across the download+debounce window so the + // idle timer cannot fire and wipe the media dir before the agent reads it. + sessionManager.touchActivity(key); + try { // Get largest photo size (last element in array) const photos = ctx.msg.photo; @@ -795,8 +800,9 @@ export function createTelegramBot( const file = await ctx.api.getFile(largest.file_id); if (!file.file_path) throw new Error("Telegram did not return a file path"); const url = `https://api.telegram.org/file/bot${token}/${file.file_path}`; - tempPath = tempFilePath("photo", ".jpg"); + tempPath = allocateMediaPath(key, "photo", ".jpg"); await downloadFile(url, tempPath); + enforceMediaCap(config.sessionDefaults.maxMediaBytes); // Build message: caption (if any) + image file path const prefix = buildSourcePrefix(binding, ctx.from, ctx.message.date); @@ -808,17 +814,25 @@ export function createTelegramBot( ? `${context}${caption.trimEnd()}\n\n${tempPath}` : `${context}${tempPath}`; - // Cleanup callback runs after the queue finishes processing this message - const pathToClean = tempPath; + // File persists for the session lifetime so follow-up turns can reference it. + // `cleanup` releases in-flight tracking when the message is delivered; the + // active session then owns the file. `dropCleanup` reclaims the file if + // the message never reaches an agent (cap exceeded, /reconnect, /clean). + const trackedPath = tempPath; tempPath = null; - messageQueue.enqueue(key, binding.agentId, messageText, createTelegramAdapter(ctx, binding, undefined, config.sessionDefaults), () => { - cleanupTempFile(pathToClean); - }); + messageQueue.enqueue( + key, + binding.agentId, + messageText, + createTelegramAdapter(ctx, binding, undefined, config.sessionDefaults), + () => { releaseMediaPath(trackedPath); }, + () => { discardMediaPath(trackedPath); }, + ); } catch (err) { log.error("telegram-bot", `Photo handling error for chat ${chatId}:`, err); await ctx.reply("Failed to process photo. Please try again.").catch(() => {}); if (tempPath) { - cleanupTempFile(tempPath); + discardMediaPath(tempPath); } } }); @@ -859,6 +873,9 @@ export function createTelegramBot( const key = sessionKey(chatId, topicId); let tempPath: string | null = null; + // Keep any active session alive across the download+debounce window. + sessionManager.touchActivity(key); + try { const file = await ctx.api.getFile(anim ? anim.file_id : doc.file_id); if (!file.file_path) throw new Error("Telegram did not return a file path"); @@ -871,8 +888,9 @@ export function createTelegramBot( } else { ext = extensionForDocument(doc.file_name, doc.mime_type); } - tempPath = tempFilePath(anim ? "animation" : "doc", ext); + tempPath = allocateMediaPath(key, anim ? "animation" : "doc", ext); await downloadFile(url, tempPath); + enforceMediaCap(config.sessionDefaults.maxMediaBytes); const prefix = buildSourcePrefix(binding, ctx.from, ctx.message.date); const replyCtx = buildReplyContext(ctx.message.reply_to_message, ctx.message.quote); @@ -894,16 +912,25 @@ export function createTelegramBot( : `${context}${meta}\n${tempPath}`; } - const pathToClean = tempPath; + // File persists for the session lifetime so follow-up turns can reference it. + // `cleanup` releases in-flight tracking when the message is delivered; the + // active session then owns the file. `dropCleanup` reclaims the file if + // the message never reaches an agent (cap exceeded, /reconnect, /clean). + const trackedPath = tempPath; tempPath = null; - messageQueue.enqueue(key, binding.agentId, messageText, createTelegramAdapter(ctx, binding, undefined, config.sessionDefaults), () => { - cleanupTempFile(pathToClean); - }); + messageQueue.enqueue( + key, + binding.agentId, + messageText, + createTelegramAdapter(ctx, binding, undefined, config.sessionDefaults), + () => { releaseMediaPath(trackedPath); }, + () => { discardMediaPath(trackedPath); }, + ); } catch (err) { log.error("telegram-bot", `${anim ? "Animation" : "Document"} handling error for chat ${chatId}:`, err); await ctx.reply(`Failed to process ${anim ? "animation" : "document"}. Please try again.`).catch(() => {}); if (tempPath) { - cleanupTempFile(tempPath); + discardMediaPath(tempPath); } } }); @@ -939,13 +966,17 @@ export function createTelegramBot( const key = sessionKey(chatId, topicId); let tempPath: string | null = null; + // Keep any active session alive across the download+debounce window. + sessionManager.touchActivity(key); + try { const file = await ctx.api.getFile(media.file_id); if (!file.file_path) throw new Error("Telegram did not return a file path"); const url = `https://api.telegram.org/file/bot${token}/${file.file_path}`; const ext = extensionForMedia(media, mediaType); - tempPath = tempFilePath(mediaType, ext); + tempPath = allocateMediaPath(key, mediaType, ext); await downloadFile(url, tempPath); + enforceMediaCap(config.sessionDefaults.maxMediaBytes); const prefix = buildSourcePrefix(binding, ctx.from, ctx.message.date); const replyCtx = buildReplyContext(ctx.message.reply_to_message, ctx.message.quote); @@ -957,16 +988,25 @@ export function createTelegramBot( ? `${context}${caption.trimEnd()}\n\n${meta}\n${tempPath}` : `${context}${meta}\n${tempPath}`; - const pathToClean = tempPath; + // File persists for the session lifetime so follow-up turns can reference it. + // `cleanup` releases in-flight tracking when the message is delivered; the + // active session then owns the file. `dropCleanup` reclaims the file if + // the message never reaches an agent (cap exceeded, /reconnect, /clean). + const trackedPath = tempPath; tempPath = null; - messageQueue.enqueue(key, binding.agentId, messageText, createTelegramAdapter(ctx, binding, undefined, config.sessionDefaults), () => { - cleanupTempFile(pathToClean); - }); + messageQueue.enqueue( + key, + binding.agentId, + messageText, + createTelegramAdapter(ctx, binding, undefined, config.sessionDefaults), + () => { releaseMediaPath(trackedPath); }, + () => { discardMediaPath(trackedPath); }, + ); } catch (err) { log.error("telegram-bot", `${typeLabel} handling error for chat ${chatId}:`, err); await ctx.reply(`Failed to process ${typeLabel.toLowerCase()}. Please try again.`).catch(() => {}); if (tempPath) { - cleanupTempFile(tempPath); + discardMediaPath(tempPath); } } }); diff --git a/bot/src/types.ts b/bot/src/types.ts index 5002d26..882374c 100644 --- a/bot/src/types.ts +++ b/bot/src/types.ts @@ -80,6 +80,7 @@ export interface SessionDefaults { maxConcurrentSessions: number; maxMessageAgeMs: number; requireMention: boolean; + maxMediaBytes: number; } export interface BotConfig { diff --git a/config.yaml b/config.yaml index 86cdc3c..9f30cf5 100644 --- a/config.yaml +++ b/config.yaml @@ -60,6 +60,11 @@ sessionDefaults: maxConcurrentSessions: 12 # default maxMessageAgeMs: 600000 # 10 min — discard stale messages on restart (default) # requireMention: true # global default for @mention gating in groups (default: true; per-binding overrides) + # maxMediaBytes: 209715200 # example: 200 MB global cap for downloaded photos/docs retained across turns + # (example default: 209715200 bytes = 10× the 20 MB Telegram per-file limit). + # Files persist under /tmp/bot-media// until session close; + # when this cap is exceeded, oldest files across all sessions are evicted first. + # Bump this if agents in long-lived sessions frequently reference old media. metricsPort: 9091 diff --git a/docs/plans/completed/2026-04-18-issue-99-media-persistence.md b/docs/plans/completed/2026-04-18-issue-99-media-persistence.md new file mode 100644 index 0000000..0226116 --- /dev/null +++ b/docs/plans/completed/2026-04-18-issue-99-media-persistence.md @@ -0,0 +1,148 @@ +# Persist downloaded media for session lifetime — Round 1 + +## Goal + +Make photos and documents survive across turns within the same agent session so follow-up questions can reference earlier files without re-upload. Clean up on session close. Bound disk usage with a global cap (oldest evicted first). Voice unchanged. Resolves fitz123/claude-code-bot#99. + +## Validation Commands + +```bash +cd bot && npm test +cd bot && npx tsc --noEmit +``` + +## Reference: current media-cleanup wiring + +Photo handler — cleanup callback fires on queue drain after the consuming turn finishes: + +```ts +// bot/src/telegram-bot.ts:798-816 +tempPath = tempFilePath("photo", ".jpg"); +await downloadFile(url, tempPath); + +// ... build message text containing tempPath ... + +// Cleanup callback runs after the queue finishes processing this message +const pathToClean = tempPath; +tempPath = null; +messageQueue.enqueue(key, binding.agentId, messageText, createTelegramAdapter(...), () => { + cleanupTempFile(pathToClean); +}); +``` + +Document/animation handler — same pattern: + +```ts +// bot/src/telegram-bot.ts:874-901 +tempPath = tempFilePath(anim ? "animation" : "doc", ext); +await downloadFile(url, tempPath); +// ... build messageText including tempPath ... +const pathToClean = tempPath; +tempPath = null; +messageQueue.enqueue(key, binding.agentId, messageText, createTelegramAdapter(...), () => { + cleanupTempFile(pathToClean); +}); +``` + +Voice handler — cleanup runs in `finally` after transcription; only the transcript text enters context, the audio file path is never handed to the agent: + +```ts +// bot/src/telegram-bot.ts:760-767 +} catch (err) { + log.error("telegram-bot", `Voice transcription error for chat ${chatId}:`, err); + await ctx.reply("Failed to transcribe voice message. Please try again or send text.").catch(() => {}); +} finally { + if (tempPath) { + await cleanupTempFile(tempPath); + } +} +``` + +`bot/src/voice.ts:17-19` defines `tempFilePath` as a one-shot path under `tmpdir()`: + +```ts +export function tempFilePath(prefix: string, extension: string): string { + return `${tmpdir()}/bot-${prefix}-${randomUUID()}${extension}`; +} +``` + +`bot/src/message-queue.ts` runs registered cleanup callbacks on every queue drain (`pendingCleanups.splice(0)` and `collectCleanups.splice(0)` at lines 188, 245, 263, 287). The cleanup contract there is "fire when this message is consumed" — there is no notion of session lifetime. + +## Reference: existing session-scoped directory pattern + +`session-manager.ts` already maintains a per-session outbox directory whose lifetime matches the session, exactly the pattern a media retention dir would mirror: + +```ts +// bot/src/session-manager.ts:15 +const OUTBOX_BASE = "/tmp/bot-outbox"; + +// bot/src/session-manager.ts:23 +export function outboxDir(chatId: string): string { /* ... */ } + +// bot/src/session-manager.ts:48-51 — fields on ActiveSession +/** Per-session outbox directory for file delivery. */ +outboxPath: string; +/** Per-session inject directory for mid-turn message delivery. */ +injectDir: string; + +// bot/src/session-manager.ts:206-214 — created on session spawn +const outboxPath = outboxDir(chatId); +rmSync(outboxPath, { recursive: true, force: true }); +mkdirSync(outboxPath, { recursive: true }); +// ... +const injectPath = injectDirForChat(chatId); +cleanupInjectDir(injectPath); + +// bot/src/session-manager.ts:450-457 — cleaned on session close +rmSync(session.outboxPath, { recursive: true, force: true }); +// ... +cleanupInjectDir(session.injectDir); +``` + +## Tasks + +### Task 1: Persist downloaded media for the lifetime of a session (#99, P2) + +**Problem.** Photo and document/animation handlers register a per-message cleanup callback that unlinks the downloaded file as soon as the consuming turn finishes (`bot/src/telegram-bot.ts:811-816` for photo, `:897-901` for document). The file path is still in the agent's conversation history, but the file is gone. When the user asks a follow-up about the same file in a later turn, the agent gets a missing-file error and has to ask the user to re-send. Reproducible by sending a multi-page PDF, asking for a summary, then in the next turn asking "now count occurrences of X" — the path is in conversation history but the file has been unlinked. Same symptom for photos when a follow-up wants a different region or aspect of the same image. + +**What we want.** Within a single agent session, downloaded media stays readable for follow-up questions referencing the path. When the session ends (idle close, restart), media for that session is reclaimed. A global safety cap prevents unbounded disk growth if many large files arrive in a long-lived session. Voice files keep their current immediate-cleanup behavior — the audio file is never referenced after transcription, only the transcript text enters context. + +- [x] Photos referenced by a previous turn in the same session can still be read by the agent in a later turn (no missing-file error on follow-up) +- [x] Documents and animations behave the same: re-readable across turns within the session +- [x] Voice messages continue to be cleaned up immediately after transcription (no behavior change for voice) +- [x] When the session closes (idle timeout, explicit close, restart, crash), all media files belonging to that session are removed from disk +- [x] A configurable global cap on total media bytes across all sessions is enforced; when exceeded, oldest files are evicted first +- [x] The default cap is at least 200 MB (10× the 20 MB Telegram per-file limit) and is documented in the public config example with a comment explaining how to tune it +- [x] Files written for one chat/session are never readable by another session (no cross-session leakage of paths) +- [x] Photo and document download error paths still clean up partial files (today's `if (tempPath) cleanupTempFile(tempPath)` behavior is preserved) +- [x] Add tests covering: file persists across turns within a session, file is removed on session close, eviction kicks in when the global cap is exceeded, voice files are still removed immediately +- [x] Verify existing tests pass (949/950 pass; the 1 failure is a pre-existing WHISPER_MODEL env-var mismatch unrelated to this change) + +### Task 2: Address PR #102 Copilot review findings — Round 2 (P1) + +GitHub Copilot raised 5 concrete findings on PR #102, each citing a real file:line and representing a real risk (drop-cleanup leaks under error/clear paths, in-flight eviction, symlink attack vector, misleading docstring). Address them. + +#### Finding 1 — `bot/src/message-queue.ts:223` +> `pendingDropCleanups` are spliced/discarded at the start of `flush()`, before `processFn` completes. If `processFn` throws (send failure) or the queue is cleared during processing (/reconnect, /clean), the drop cleanups will never run, so persistent media will leak on disk (and the in-flight tracking may be released). Keep drop cleanups until delivery succeeds; on send error or clear-while-busy paths, run them as part of the cleanup. + +#### Finding 2 — `bot/src/message-queue.ts:303` +> In `drainCollectBuffer()`, `collectDropCleanups` are spliced/discarded before calling `processFn`. If `processFn` fails or the queue is cleared mid-drain, the drop cleanups won't run, so persistent-media files can become orphaned. Consider holding drop cleanups until after a successful drain, and running them on error/clear paths. + +#### Finding 3 — `bot/src/media-store.ts:192` +> `enforceMediaCap()` can evict files that are still tracked as in-flight (including the file that was just downloaded and is about to be enqueued). This can lead to the agent receiving a path that no longer exists. When collecting/evicting candidates, skip paths in `inflightMediaPaths` (or accept a protected-path set) so cap enforcement never deletes files that haven't been delivered/owned yet. + +#### Finding 4 — `bot/src/media-store.ts:13` +> The `inflightMediaPaths` docstring says it includes files "delivered and owned by an active session", but `releaseMediaPath()` removes paths from the set on successful delivery. This makes the comment misleading about what the set actually represents (it's more like "downloaded/queued but not yet released"). Update the comment to match the actual lifecycle so future changes don't rely on the wrong invariant. + +#### Finding 5 — `bot/src/media-store.ts:56` +> `cleanupSessionMediaDir()` (and other cleanup paths) remove directories under `MEDIA_BASE` without verifying that `MEDIA_BASE` itself is not a symlink. Because `SessionManager.closeSession()` calls this even for sessions that never downloaded media, a pre-squatted symlink at `/tmp/bot-media` could redirect deletions outside the intended tree. Consider reusing the `ensureSecureDir`/`lstatSync` symlink check before recursive removal. + +#### Outcomes +- [x] Finding 1: `pendingDropCleanups` are no longer lost when `processFn` throws or when the queue is cleared during processing. Drop cleanups run on success AND on every error/clear path that abandons the in-flight message +- [x] Finding 2: `collectDropCleanups` in `drainCollectBuffer()` get the same treatment — never lost on processFn failure or mid-drain clear +- [x] Finding 3: `enforceMediaCap()` never evicts a path that is currently in `inflightMediaPaths`. Add a regression test that downloads a file, forces cap pressure, and verifies the in-flight file is preserved while older non-in-flight files get evicted +- [x] Finding 4: `inflightMediaPaths` docstring accurately describes the set's actual lifecycle ("downloaded/queued but not yet released to a session" or equivalent — match the real semantics) +- [x] Finding 5: `cleanupSessionMediaDir()` (and any other cleanup path that removes under `MEDIA_BASE`) refuses to act when `MEDIA_BASE` itself is a symlink. Reuse the existing `ensureSecureDir` / `lstatSync` pattern. Add a regression test that pre-squats a symlink at `MEDIA_BASE` and verifies cleanup refuses to follow it +- [x] Existing media-store and message-queue tests still pass +- [x] `cd bot && npx tsc --noEmit` clean +- [x] `cd bot && npm test` — only the pre-existing WHISPER_MODEL voice test failure is acceptable; everything else passes (978/979)