From ede376902bc0bfd13a3f8c4d624304a1081e0600 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 9 Apr 2026 02:23:28 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20add=20passive=20learning=20daemon=20?= =?UTF-8?q?=E2=80=94=20queue,=20auto-compile,=20multi-folder=20watch,=20se?= =?UTF-8?q?rvice=20install?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Transforms `kib watch` from a simple foreground inbox watcher into a full passive knowledge ingestion daemon: - Ingest queue (.kb/queue/) with FIFO ordering, retry (3x), and failed/ dir - Daemon mode (--daemon, --stop, --status) with PID file management - Multi-folder watching with glob patterns (config: watch.folders) - Auto-compile scheduler (threshold + idle debounce) - Rotating watch log (.kb/logs/watch.log) - Service installers for launchd (macOS) and systemd (Linux) - /status HTTP endpoint for monitoring queue depth - 47 new tests, all 449 pass https://claude.ai/code/session_015uE1EA9jnhosFvLaxgSD9c --- packages/cli/src/commands/watch.ts | 340 +++++++++++++++--- packages/cli/src/index.ts | 11 +- packages/core/src/constants.ts | 3 + .../core/src/daemon/folder-watcher.test.ts | 175 +++++++++ packages/core/src/daemon/folder-watcher.ts | 119 ++++++ packages/core/src/daemon/index.ts | 30 ++ packages/core/src/daemon/log.test.ts | 76 ++++ packages/core/src/daemon/log.ts | 42 +++ packages/core/src/daemon/pid.test.ts | 90 +++++ packages/core/src/daemon/pid.ts | 95 +++++ packages/core/src/daemon/queue.test.ts | 229 ++++++++++++ packages/core/src/daemon/queue.ts | 169 +++++++++ packages/core/src/daemon/scheduler.test.ts | 134 +++++++ packages/core/src/daemon/scheduler.ts | 97 +++++ packages/core/src/daemon/service.ts | 156 ++++++++ packages/core/src/index.ts | 1 + packages/core/src/schemas.ts | 12 + 17 files changed, 1734 insertions(+), 45 deletions(-) create mode 100644 packages/core/src/daemon/folder-watcher.test.ts create mode 100644 packages/core/src/daemon/folder-watcher.ts create mode 100644 packages/core/src/daemon/index.ts create mode 100644 packages/core/src/daemon/log.test.ts create mode 100644 packages/core/src/daemon/log.ts create mode 100644 packages/core/src/daemon/pid.test.ts create mode 100644 packages/core/src/daemon/pid.ts create mode 100644 packages/core/src/daemon/queue.test.ts create mode 100644 packages/core/src/daemon/queue.ts create mode 100644 packages/core/src/daemon/scheduler.test.ts create mode 100644 packages/core/src/daemon/scheduler.ts create mode 100644 packages/core/src/daemon/service.ts diff --git a/packages/cli/src/commands/watch.ts b/packages/cli/src/commands/watch.ts index d1b119e..3e90355 100644 --- a/packages/cli/src/commands/watch.ts +++ b/packages/cli/src/commands/watch.ts @@ -1,10 +1,19 @@ import { watch as fsWatch } from "node:fs"; import { readdir, stat } from "node:fs/promises"; import { join, resolve } from "node:path"; +import type { VaultConfig } from "@kibhq/core"; import { loadConfig, resolveVaultRoot, VaultNotFoundError } from "@kibhq/core"; import * as log from "../ui/logger.js"; -export async function watch() { +interface WatchOptions { + daemon?: boolean; + stop?: boolean; + status?: boolean; + install?: boolean; + uninstall?: boolean; +} + +export async function watch(opts: WatchOptions = {}) { let root: string; try { root = resolveVaultRoot(); @@ -16,16 +25,221 @@ export async function watch() { throw err; } + // Lazy-import daemon modules to keep CLI cold start fast + const { + getDaemonStatus, + writePid, + removePid, + stopDaemon, + installService, + uninstallService, + isServiceInstalled, + } = await import("@kibhq/core"); + + // ── Subcommands ────────────────────────────────────────────── + + if (opts.stop) { + const stopped = await stopDaemon(root); + if (stopped) { + log.success("Daemon stopped."); + } else { + log.dim("No daemon running."); + } + return; + } + + if (opts.status) { + const info = await getDaemonStatus(root); + if (info) { + log.info(`Daemon running (PID ${info.pid}, started ${info.startedAt})`); + } else { + log.dim("No daemon running."); + } + const svc = await isServiceInstalled(); + if (svc.installed) { + log.dim(`Service installed at ${svc.path}`); + } + return; + } + + if (opts.install) { + const result = await installService(root); + log.success(`Service installed: ${result.path}`); + log.dim(result.instructions); + return; + } + + if (opts.uninstall) { + const result = await uninstallService(); + if (result.removed) { + log.success(`Service removed: ${result.path}`); + } else { + log.dim("No service installed."); + } + return; + } + + // ── Check for existing daemon ──────────────────────────────── + + const existing = await getDaemonStatus(root); + if (existing) { + log.error(`Daemon already running (PID ${existing.pid}). Run kib watch --stop first.`); + process.exit(1); + } + + // ── Daemon fork ────────────────────────────────────────────── + + if (opts.daemon) { + const { spawn } = await import("node:child_process"); + const child = spawn( + process.execPath, + [...process.argv.slice(1).filter((a) => a !== "--daemon")], + { + cwd: root, + detached: true, + stdio: "ignore", + env: { ...process.env, KIB_DAEMON: "1" }, + }, + ); + child.unref(); + log.success(`Daemon started (PID ${child.pid}).`); + log.dim("Run kib watch --status to check, kib watch --stop to stop."); + return; + } + + // ── Foreground watch ───────────────────────────────────────── + const config = await loadConfig(root); + await writePid(root); + + const cleanup = await startWatch(root, config); + + const shutdown = async () => { + cleanup(); + await removePid(root); + if (!process.env.KIB_DAEMON) { + log.blank(); + log.dim("Watch stopped."); + } + process.exit(0); + }; + + process.on("SIGINT", shutdown); + process.on("SIGTERM", shutdown); +} + +/** + * Core watch loop. Sets up: + * 1. Inbox file watcher + * 2. HTTP server for browser extension + * 3. Multi-folder watchers (from config) + * 4. Ingest queue consumer + * 5. Auto-compile scheduler + * + * Returns a cleanup function. + */ +async function startWatch(root: string, config: VaultConfig): Promise<() => void> { + const { + ingestSource, + enqueue, + dequeue, + listPending, + markFailed, + queueDepth, + ensureQueueDirs, + appendWatchLog, + CompileScheduler, + startFolderWatchers, + compileVault, + isLocked, + } = await import("@kibhq/core"); + const inboxPath = resolve(root, config.watch.inbox_path); - const { ingestSource } = await import("@kibhq/core"); + const isDaemon = !!process.env.KIB_DAEMON; + const logMaxBytes = config.watch.log_max_mb * 1024 * 1024; + + await ensureQueueDirs(root); + + // ── Logging helper ─────────────────────────────────────────── + + const emit = (level: "info" | "warn" | "error", msg: string) => { + appendWatchLog(root, level, msg, logMaxBytes); + if (!isDaemon) { + const ts = new Date().toLocaleTimeString("en-US", { hour12: false }); + if (level === "error") log.error(`${ts} ${msg}`); + else if (level === "warn") log.warn(`${ts} ${msg}`); + else log.info(`${ts} ${msg}`); + } + }; + + // ── Auto-compile scheduler ─────────────────────────────────── - log.header(`watching ${config.watch.inbox_path}/`); - log.dim(`Drop files into ${inboxPath} to auto-ingest.`); - log.dim("Press Ctrl+C to stop."); - log.blank(); + const scheduler = new CompileScheduler({ + threshold: config.watch.auto_compile_threshold, + delayMs: config.watch.auto_compile_delay_ms, + onCompile: async () => { + const lockStatus = await isLocked(root); + if (lockStatus.locked) { + emit("warn", "Skipping auto-compile: vault is locked."); + return; + } + emit("info", "Auto-compiling..."); + try { + const result = await compileVault(root); + emit( + "info", + `Compiled ${result.sourcesCompiled} sources → ${result.articlesCreated} created, ${result.articlesUpdated} updated.`, + ); + } catch (err) { + emit("error", `Auto-compile failed: ${(err as Error).message}`); + } + }, + onLog: (msg) => emit("info", msg), + }); + + // ── Queue consumer ─────────────────────────────────────────── + + let consuming = false; + + async function consumeQueue() { + if (consuming) return; + consuming = true; + try { + const items = await listPending(root, 20); + for (const item of items) { + try { + const result = await ingestSource(root, item.uri, item.options); + await dequeue(root, item.id); + if (result.skipped) { + emit("info", `Skipped: ${result.skipReason}`); + } else { + emit("info", `Ingested: ${result.title} → ${result.path}`); + if (config.watch.auto_compile) { + scheduler.recordIngest(); + } + } + } catch (err) { + const retry = await markFailed(root, item.id, (err as Error).message); + if (retry) { + emit("warn", `Retryable error for ${item.uri}: ${(err as Error).message}`); + } else { + emit("error", `Failed permanently: ${item.uri} — ${(err as Error).message}`); + } + } + } + } finally { + consuming = false; + } + } + + // Poll queue periodically (catches items from folder watchers, retries, etc.) + const queuePollInterval = setInterval(async () => { + const depth = await queueDepth(root); + if (depth > 0) await consumeQueue(); + }, config.watch.poll_interval_ms); + + // ── Inbox watcher ──────────────────────────────────────────── - // Track already-seen files to avoid double-processing const processed = new Set(); // Seed with existing files @@ -36,57 +250,87 @@ export async function watch() { // inbox might not exist yet } - // Start the HTTP server for browser extension - const server = startHttpServer(root, ingestSource); - - // Watch for new files - const watcher = fsWatch(inboxPath, { recursive: false }, async (_event, filename) => { + const inboxWatcher = fsWatch(inboxPath, { recursive: false }, async (_event, filename) => { if (!filename || processed.has(filename)) return; - if (filename.startsWith(".")) return; // skip dotfiles + if (filename.startsWith(".")) return; const filePath = join(inboxPath, filename); - // Wait briefly for file to finish writing await new Promise((r) => setTimeout(r, 500)); - try { await stat(filePath); } catch { - return; // file was deleted before we could process it + return; } processed.add(filename); - const timestamp = new Date().toLocaleTimeString("en-US", { hour12: false }); + await enqueue(root, filePath, "inbox"); + await consumeQueue(); + }); - try { - log.info(`${timestamp} Ingesting ${filename}...`); - const result = await ingestSource(root, filePath); + // ── HTTP server ────────────────────────────────────────────── - if (result.skipped) { - log.dim(`${timestamp} Skipped: ${result.skipReason}`); - } else { - log.success(`${timestamp} ${result.title} → ${result.path}`); + const server = startHttpServer(root, enqueue, consumeQueue); - if (config.watch.auto_compile) { - log.dim(`${timestamp} Auto-compile: run kib compile to process`); - } + // ── Folder watchers ────────────────────────────────────────── + + let folderCleanup: { stop: () => void } | null = null; + if (config.watch.folders.length > 0) { + folderCleanup = startFolderWatchers({ + folders: config.watch.folders, + onFile: async (filePath) => { + emit("info", `Folder watcher detected: ${filePath}`); + await enqueue(root, filePath, "folder"); + await consumeQueue(); + }, + }); + emit("info", `Watching ${config.watch.folders.length} additional folder(s).`); + } + + // ── Process any items already in the queue ─────────────────── + + const initialDepth = await queueDepth(root); + if (initialDepth > 0) { + emit("info", `Processing ${initialDepth} queued item(s) from previous session.`); + await consumeQueue(); + } + + // ── UI output (foreground only) ────────────────────────────── + + if (!isDaemon) { + log.header(`watching ${config.watch.inbox_path}/`); + log.dim(`Drop files into ${inboxPath} to auto-ingest.`); + if (config.watch.folders.length > 0) { + for (const f of config.watch.folders) { + log.dim(` + ${f.path} (${f.glob}${f.recursive ? ", recursive" : ""})`); } - } catch (err) { - log.error(`${timestamp} Failed to ingest ${filename}: ${(err as Error).message}`); } - }); + log.dim( + `Auto-compile: after ${config.watch.auto_compile_threshold} sources or ${Math.round(config.watch.auto_compile_delay_ms / 60000)} min idle.`, + ); + log.dim("Press Ctrl+C to stop."); + log.blank(); + } + + emit("info", "Daemon started."); - // Handle graceful shutdown - process.on("SIGINT", () => { - watcher.close(); + // ── Cleanup function ───────────────────────────────────────── + + return () => { + inboxWatcher.close(); server?.stop(); - log.blank(); - log.dim("Watch stopped."); - process.exit(0); - }); + folderCleanup?.stop(); + scheduler.stop(); + clearInterval(queuePollInterval); + emit("info", "Daemon stopped."); + }; } -function startHttpServer(root: string, ingestSource: typeof import("@kibhq/core").ingestSource) { +function startHttpServer( + root: string, + enqueue: typeof import("@kibhq/core").enqueue, + consumeQueue: () => Promise, +) { try { const server = Bun.serve({ port: 4747, @@ -97,7 +341,6 @@ function startHttpServer(root: string, ingestSource: typeof import("@kibhq/core" try { const body = (await req.json()) as { content: string; url?: string; title?: string }; - // Write content to a temp file in inbox const slug = (body.title ?? "untitled") .toLowerCase() .replace(/[^a-z0-9]+/g, "-") @@ -109,7 +352,8 @@ function startHttpServer(root: string, ingestSource: typeof import("@kibhq/core" : body.content; await Bun.write(tmpPath, fullContent); - await ingestSource(root, tmpPath, { title: body.title }); + await enqueue(root, tmpPath, "http", { title: body.title }); + await consumeQueue(); return new Response(JSON.stringify({ ok: true }), { headers: { "Content-Type": "application/json" }, @@ -128,14 +372,26 @@ function startHttpServer(root: string, ingestSource: typeof import("@kibhq/core" }); } + if (req.method === "GET" && url.pathname === "/status") { + const { queueDepth: getDepth } = await import("@kibhq/core"); + const depth = await getDepth(root); + return new Response(JSON.stringify({ running: true, queueDepth: depth }), { + headers: { "Content-Type": "application/json" }, + }); + } + return new Response("Not found", { status: 404 }); }, }); - log.dim(`HTTP server listening on http://localhost:4747`); + if (!process.env.KIB_DAEMON) { + log.dim("HTTP server listening on http://localhost:4747"); + } return server; } catch { - log.dim("HTTP server not started (port 4747 may be in use)"); + if (!process.env.KIB_DAEMON) { + log.dim("HTTP server not started (port 4747 may be in use)"); + } return null; } } diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 2b044c3..d1fa396 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -132,10 +132,15 @@ program program .command("watch") - .description("Watch inbox/ and auto-ingest") - .action(async () => { + .description("Watch inbox/ and auto-ingest (passive learning daemon)") + .option("--daemon", "run in background as a daemon") + .option("--stop", "stop the running daemon") + .option("--status", "check if the daemon is running") + .option("--install", "install as a system service (launchd/systemd)") + .option("--uninstall", "remove the system service") + .action(async (opts) => { const { watch } = await import("./commands/watch.js"); - await watch(); + await watch(opts); }); program diff --git a/packages/core/src/constants.ts b/packages/core/src/constants.ts index 4c7300c..2d0fbd4 100644 --- a/packages/core/src/constants.ts +++ b/packages/core/src/constants.ts @@ -32,6 +32,9 @@ export const DEFAULTS = { cacheMaxSizeMb: 500, watchPollIntervalMs: 2000, maxFileSizeMb: 50, + autoCompileThreshold: 5, // compile after N new sources + autoCompileDelayMs: 30 * 60 * 1000, // 30 min idle → auto-compile + watchLogMaxMb: 10, compileArticleMinWords: 200, compileArticleMaxWords: 1000, contextWindow: 200_000, // tokens — conservative default (Sonnet 4.6 supports 1M) diff --git a/packages/core/src/daemon/folder-watcher.test.ts b/packages/core/src/daemon/folder-watcher.test.ts new file mode 100644 index 0000000..fa90a75 --- /dev/null +++ b/packages/core/src/daemon/folder-watcher.test.ts @@ -0,0 +1,175 @@ +import { afterEach, describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { matchGlob, scanFolder, startFolderWatchers } from "./folder-watcher.js"; + +let tempDirs: string[] = []; + +afterEach(async () => { + for (const dir of tempDirs) { + await rm(dir, { recursive: true, force: true }); + } + tempDirs = []; +}); + +async function makeTempDir(name: string) { + const dir = await mkdtemp(join(tmpdir(), `kib-fw-${name}-`)); + tempDirs.push(dir); + return dir; +} + +describe("matchGlob", () => { + test("matches *.pdf", () => { + expect(matchGlob("paper.pdf", "*.pdf")).toBe(true); + expect(matchGlob("paper.PDF", "*.pdf")).toBe(true); // case insensitive + expect(matchGlob("paper.txt", "*.pdf")).toBe(false); + }); + + test("matches *.{ext1,ext2}", () => { + expect(matchGlob("file.md", "*.{md,txt}")).toBe(true); + expect(matchGlob("file.txt", "*.{md,txt}")).toBe(true); + expect(matchGlob("file.pdf", "*.{md,txt}")).toBe(false); + }); + + test("matches * (wildcard all)", () => { + expect(matchGlob("anything.xyz", "*")).toBe(true); + expect(matchGlob("file", "*")).toBe(true); + }); + + test("matches exact filename", () => { + expect(matchGlob("notes.md", "notes.md")).toBe(true); + expect(matchGlob("other.md", "notes.md")).toBe(false); + }); +}); + +describe("scanFolder", () => { + test("finds matching files in a directory", async () => { + const dir = await makeTempDir("scan"); + await writeFile(join(dir, "a.pdf"), ""); + await writeFile(join(dir, "b.pdf"), ""); + await writeFile(join(dir, "c.txt"), ""); + await writeFile(join(dir, ".hidden.pdf"), ""); + + const matches = await scanFolder({ path: dir, glob: "*.pdf", recursive: false }); + expect(matches.length).toBe(2); + expect(matches.every((m) => m.endsWith(".pdf"))).toBe(true); + }); + + test("scans recursively when enabled", async () => { + const dir = await makeTempDir("scan-rec"); + const sub = join(dir, "sub"); + await mkdir(sub); + await writeFile(join(dir, "a.md"), ""); + await writeFile(join(sub, "b.md"), ""); + + const matches = await scanFolder({ path: dir, glob: "*.md", recursive: true }); + expect(matches.length).toBe(2); + }); + + test("returns empty for non-existent directory", async () => { + const matches = await scanFolder({ path: "/nonexistent/path", glob: "*", recursive: false }); + expect(matches).toEqual([]); + }); +}); + +describe("startFolderWatchers", () => { + test("detects new files matching glob", async () => { + const dir = await makeTempDir("watch"); + const detected: string[] = []; + + const watcher = startFolderWatchers({ + folders: [{ path: dir, glob: "*.md", recursive: false }], + onFile: (path) => detected.push(path), + debounceMs: 50, + }); + + // Write a matching file + await writeFile(join(dir, "test.md"), "content"); + await new Promise((r) => setTimeout(r, 200)); + + // Write a non-matching file + await writeFile(join(dir, "test.pdf"), "content"); + await new Promise((r) => setTimeout(r, 200)); + + watcher.stop(); + expect(detected.length).toBe(1); + expect(detected[0]).toContain("test.md"); + }); + + test("ignores dotfiles", async () => { + const dir = await makeTempDir("dotfiles"); + const detected: string[] = []; + + const watcher = startFolderWatchers({ + folders: [{ path: dir, glob: "*", recursive: false }], + onFile: (path) => detected.push(path), + debounceMs: 50, + }); + + await writeFile(join(dir, ".hidden"), "content"); + await new Promise((r) => setTimeout(r, 200)); + + watcher.stop(); + expect(detected.length).toBe(0); + }); + + test("does not duplicate events for same file", async () => { + const dir = await makeTempDir("dedup"); + const detected: string[] = []; + + const watcher = startFolderWatchers({ + folders: [{ path: dir, glob: "*", recursive: false }], + onFile: (path) => detected.push(path), + debounceMs: 50, + }); + + await writeFile(join(dir, "test.txt"), "v1"); + await new Promise((r) => setTimeout(r, 200)); + // Re-write same file + await writeFile(join(dir, "test.txt"), "v2"); + await new Promise((r) => setTimeout(r, 200)); + + watcher.stop(); + expect(detected.length).toBe(1); + }); + + test("watches multiple folders", async () => { + const dir1 = await makeTempDir("multi1"); + const dir2 = await makeTempDir("multi2"); + const detected: string[] = []; + + const watcher = startFolderWatchers({ + folders: [ + { path: dir1, glob: "*.md", recursive: false }, + { path: dir2, glob: "*.pdf", recursive: false }, + ], + onFile: (path) => detected.push(path), + debounceMs: 50, + }); + + await writeFile(join(dir1, "notes.md"), "content"); + await writeFile(join(dir2, "paper.pdf"), "content"); + await new Promise((r) => setTimeout(r, 300)); + + watcher.stop(); + expect(detected.length).toBe(2); + }); + + test("stop() cleans up all watchers", async () => { + const dir = await makeTempDir("stop"); + const detected: string[] = []; + + const watcher = startFolderWatchers({ + folders: [{ path: dir, glob: "*", recursive: false }], + onFile: (path) => detected.push(path), + debounceMs: 50, + }); + + watcher.stop(); + + await writeFile(join(dir, "after-stop.txt"), "content"); + await new Promise((r) => setTimeout(r, 200)); + expect(detected.length).toBe(0); + }); +}); diff --git a/packages/core/src/daemon/folder-watcher.ts b/packages/core/src/daemon/folder-watcher.ts new file mode 100644 index 0000000..1533f47 --- /dev/null +++ b/packages/core/src/daemon/folder-watcher.ts @@ -0,0 +1,119 @@ +import { type FSWatcher, watch as fsWatch } from "node:fs"; +import { readdir, stat } from "node:fs/promises"; +import { extname, join, resolve } from "node:path"; + +export interface WatchFolder { + path: string; + glob: string; + recursive: boolean; +} + +export interface FolderWatcherOptions { + folders: WatchFolder[]; + /** Called when a matching file is detected */ + onFile: (filePath: string) => void; + /** Debounce in ms before processing a new file (default: 500) */ + debounceMs?: number; +} + +/** + * Match a filename against a simple glob pattern. + * Supports: *.ext, *.{ext1,ext2}, * (match all) + */ +export function matchGlob(filename: string, pattern: string): boolean { + if (pattern === "*") return true; + + // Handle *.{ext1,ext2} pattern + const braceMatch = pattern.match(/^\*\.\{(.+)\}$/); + if (braceMatch) { + const extensions = braceMatch[1].split(",").map((e) => `.${e.trim()}`); + return extensions.includes(extname(filename).toLowerCase()); + } + + // Handle *.ext pattern + const extMatch = pattern.match(/^\*(\..+)$/); + if (extMatch) { + return extname(filename).toLowerCase() === extMatch[1].toLowerCase(); + } + + return filename === pattern; +} + +/** + * Watch multiple folders for new files matching glob patterns. + * Returns a cleanup function to stop all watchers. + */ +export function startFolderWatchers(options: FolderWatcherOptions): { stop: () => void } { + const { folders, onFile, debounceMs = 500 } = options; + const watchers: FSWatcher[] = []; + const seen = new Set(); + const pending = new Map>(); + + for (const folder of folders) { + const absPath = resolve(folder.path.replace(/^~/, process.env.HOME ?? "")); + + try { + const watcher = fsWatch(absPath, { recursive: folder.recursive }, (_event, filename) => { + if (!filename) return; + if (filename.startsWith(".")) return; + + // Extract just the basename for glob matching + const base = filename.includes("/") ? filename.split("/").pop()! : filename; + if (!matchGlob(base, folder.glob)) return; + + const fullPath = join(absPath, filename); + if (seen.has(fullPath)) return; + + // Debounce: wait for file to finish writing + if (pending.has(fullPath)) { + clearTimeout(pending.get(fullPath)!); + } + pending.set( + fullPath, + setTimeout(async () => { + pending.delete(fullPath); + try { + await stat(fullPath); // verify file still exists + seen.add(fullPath); + onFile(fullPath); + } catch { + // File deleted before we could process + } + }, debounceMs), + ); + }); + watchers.push(watcher); + } catch { + // Folder doesn't exist or isn't watchable — skip + } + } + + return { + stop: () => { + for (const w of watchers) w.close(); + for (const t of pending.values()) clearTimeout(t); + pending.clear(); + }, + }; +} + +/** Scan a folder for existing files that match the glob. Used for initial seeding. */ +export async function scanFolder(folder: WatchFolder): Promise { + const absPath = resolve(folder.path.replace(/^~/, process.env.HOME ?? "")); + const matches: string[] = []; + + try { + const files = await readdir(absPath, { recursive: folder.recursive }); + for (const file of files) { + const name = typeof file === "string" ? file : file.toString(); + const base = name.includes("/") ? name.split("/").pop()! : name; + if (!base.startsWith(".") && matchGlob(base, folder.glob)) { + matches.push(join(absPath, name)); + } + } + } catch { + // Folder doesn't exist + } + + return matches; +} diff --git a/packages/core/src/daemon/index.ts b/packages/core/src/daemon/index.ts new file mode 100644 index 0000000..f33d16e --- /dev/null +++ b/packages/core/src/daemon/index.ts @@ -0,0 +1,30 @@ +export { + type FolderWatcherOptions, + matchGlob, + scanFolder, + startFolderWatchers, + type WatchFolder, +} from "./folder-watcher.js"; +export { appendWatchLog } from "./log.js"; +export { getDaemonStatus, type PidInfo, readPid, removePid, stopDaemon, writePid } from "./pid.js"; +export { + clearFailed, + dequeue, + enqueue, + ensureQueueDirs, + listFailed, + listPending, + markFailed, + type QueueItem, + queueDepth, + readItem, +} from "./queue.js"; +export { CompileScheduler, type SchedulerOptions } from "./scheduler.js"; +export { + detectPlatform, + type InstallResult, + installService, + isServiceInstalled, + type ServicePlatform, + uninstallService, +} from "./service.js"; diff --git a/packages/core/src/daemon/log.test.ts b/packages/core/src/daemon/log.test.ts new file mode 100644 index 0000000..51ac4cb --- /dev/null +++ b/packages/core/src/daemon/log.test.ts @@ -0,0 +1,76 @@ +import { afterEach, describe, expect, test } from "bun:test"; +import { existsSync } from "node:fs"; +import { mkdtemp, readFile, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { initVault } from "../vault.js"; +import { appendWatchLog } from "./log.js"; + +let tempDir: string; + +afterEach(async () => { + if (tempDir) { + await rm(tempDir, { recursive: true, force: true }); + } +}); + +async function makeTempVault() { + tempDir = await mkdtemp(join(tmpdir(), "kib-log-test-")); + await initVault(tempDir, { name: "log-test" }); + return tempDir; +} + +describe("appendWatchLog", () => { + test("creates log file and appends timestamped line", async () => { + const root = await makeTempVault(); + await appendWatchLog(root, "info", "test message"); + + const logPath = join(root, ".kb", "logs", "watch.log"); + expect(existsSync(logPath)).toBe(true); + + const content = await readFile(logPath, "utf-8"); + expect(content).toContain("[INFO]"); + expect(content).toContain("test message"); + expect(content).toMatch(/^\d{4}-\d{2}-\d{2}T/); // ISO timestamp + }); + + test("appends multiple entries", async () => { + const root = await makeTempVault(); + await appendWatchLog(root, "info", "first"); + await appendWatchLog(root, "warn", "second"); + await appendWatchLog(root, "error", "third"); + + const logPath = join(root, ".kb", "logs", "watch.log"); + const content = await readFile(logPath, "utf-8"); + const lines = content.trim().split("\n"); + expect(lines.length).toBe(3); + expect(lines[0]).toContain("[INFO] first"); + expect(lines[1]).toContain("[WARN] second"); + expect(lines[2]).toContain("[ERROR] third"); + }); + + test("rotates log when exceeding max size", async () => { + const root = await makeTempVault(); + const smallMax = 200; // bytes + + // Write enough to exceed limit + for (let i = 0; i < 10; i++) { + await appendWatchLog(root, "info", `message number ${i} with padding`, smallMax); + } + + const rotatedPath = join(root, ".kb", "logs", "watch.log.1"); + expect(existsSync(rotatedPath)).toBe(true); + }); + + test("performance: 1000 log writes in under 2 seconds", async () => { + const root = await makeTempVault(); + const start = performance.now(); + + for (let i = 0; i < 1000; i++) { + await appendWatchLog(root, "info", `perf test line ${i}`); + } + + const elapsed = performance.now() - start; + expect(elapsed).toBeLessThan(2000); + }); +}); diff --git a/packages/core/src/daemon/log.ts b/packages/core/src/daemon/log.ts new file mode 100644 index 0000000..d79bf1d --- /dev/null +++ b/packages/core/src/daemon/log.ts @@ -0,0 +1,42 @@ +import { appendFile, mkdir, rename, stat } from "node:fs/promises"; +import { join } from "node:path"; +import { LOGS_DIR, VAULT_DIR } from "../constants.js"; + +const WATCH_LOG = "watch.log"; +const DEFAULT_MAX_BYTES = 10 * 1024 * 1024; // 10 MB + +function logPath(root: string): string { + return join(root, VAULT_DIR, LOGS_DIR, WATCH_LOG); +} + +function rotatedPath(root: string): string { + return join(root, VAULT_DIR, LOGS_DIR, "watch.log.1"); +} + +/** + * Append a timestamped line to the watch log. + * Automatically rotates when the log exceeds maxBytes. + */ +export async function appendWatchLog( + root: string, + level: "info" | "warn" | "error", + message: string, + maxBytes = DEFAULT_MAX_BYTES, +): Promise { + const path = logPath(root); + await mkdir(join(root, VAULT_DIR, LOGS_DIR), { recursive: true }); + + const timestamp = new Date().toISOString(); + const line = `${timestamp} [${level.toUpperCase()}] ${message}\n`; + await appendFile(path, line, "utf-8"); + + // Rotate if over size limit (check periodically, not every write) + try { + const info = await stat(path); + if (info.size > maxBytes) { + await rename(path, rotatedPath(root)); + } + } catch { + // stat or rename failure is non-fatal + } +} diff --git a/packages/core/src/daemon/pid.test.ts b/packages/core/src/daemon/pid.test.ts new file mode 100644 index 0000000..62732eb --- /dev/null +++ b/packages/core/src/daemon/pid.test.ts @@ -0,0 +1,90 @@ +import { afterEach, describe, expect, test } from "bun:test"; +import { existsSync } from "node:fs"; +import { mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { initVault } from "../vault.js"; +import { getDaemonStatus, readPid, removePid, writePid } from "./pid.js"; + +let tempDir: string; + +afterEach(async () => { + if (tempDir) { + await rm(tempDir, { recursive: true, force: true }); + } +}); + +async function makeTempVault() { + tempDir = await mkdtemp(join(tmpdir(), "kib-pid-test-")); + await initVault(tempDir, { name: "pid-test" }); + return tempDir; +} + +describe("writePid / readPid", () => { + test("writes and reads PID info", async () => { + const root = await makeTempVault(); + await writePid(root); + + const info = await readPid(root); + expect(info).not.toBeNull(); + expect(info!.pid).toBe(process.pid); + expect(info!.root).toBe(root); + expect(typeof info!.startedAt).toBe("string"); + }); + + test("readPid returns null when no PID file exists", async () => { + const root = await makeTempVault(); + expect(await readPid(root)).toBeNull(); + }); +}); + +describe("removePid", () => { + test("removes the PID file", async () => { + const root = await makeTempVault(); + await writePid(root); + const path = join(root, ".kb", "watch.pid"); + expect(existsSync(path)).toBe(true); + + await removePid(root); + expect(existsSync(path)).toBe(false); + }); + + test("does not throw when no PID file exists", async () => { + const root = await makeTempVault(); + await removePid(root); // should not throw + }); +}); + +describe("getDaemonStatus", () => { + test("returns PID info for live process (current process)", async () => { + const root = await makeTempVault(); + await writePid(root); + + const status = await getDaemonStatus(root); + expect(status).not.toBeNull(); + expect(status!.pid).toBe(process.pid); + }); + + test("returns null and cleans up stale PID file", async () => { + const root = await makeTempVault(); + const pidPath = join(root, ".kb", "watch.pid"); + await writeFile( + pidPath, + JSON.stringify({ + pid: 999999999, + startedAt: new Date().toISOString(), + root, + }), + ); + + const status = await getDaemonStatus(root); + expect(status).toBeNull(); + // Stale PID file should be cleaned up + expect(existsSync(pidPath)).toBe(false); + }); + + test("returns null when no PID file", async () => { + const root = await makeTempVault(); + expect(await getDaemonStatus(root)).toBeNull(); + }); +}); diff --git a/packages/core/src/daemon/pid.ts b/packages/core/src/daemon/pid.ts new file mode 100644 index 0000000..e219fe8 --- /dev/null +++ b/packages/core/src/daemon/pid.ts @@ -0,0 +1,95 @@ +import { mkdir, readFile, unlink, writeFile } from "node:fs/promises"; +import { dirname, join } from "node:path"; +import { VAULT_DIR } from "../constants.js"; + +const PID_FILE = "watch.pid"; + +export interface PidInfo { + pid: number; + startedAt: string; + root: string; +} + +function pidPath(root: string): string { + return join(root, VAULT_DIR, PID_FILE); +} + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +/** Write PID file for the running daemon. */ +export async function writePid(root: string): Promise { + const info: PidInfo = { + pid: process.pid, + startedAt: new Date().toISOString(), + root, + }; + const path = pidPath(root); + await mkdir(dirname(path), { recursive: true }); + await writeFile(path, JSON.stringify(info, null, 2), "utf-8"); +} + +/** Read PID info. Returns null if no PID file exists. */ +export async function readPid(root: string): Promise { + try { + const raw = await readFile(pidPath(root), "utf-8"); + return JSON.parse(raw) as PidInfo; + } catch { + return null; + } +} + +/** Remove the PID file. */ +export async function removePid(root: string): Promise { + try { + await unlink(pidPath(root)); + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== "ENOENT") throw err; + } +} + +/** + * Check if the daemon is running. + * Returns the PID info if alive, null otherwise. + * Cleans up stale PID files automatically. + */ +export async function getDaemonStatus(root: string): Promise { + const info = await readPid(root); + if (!info) return null; + + if (isProcessAlive(info.pid)) { + return info; + } + + // Stale PID file — clean up + await removePid(root); + return null; +} + +/** + * Send SIGTERM to a running daemon. + * Returns true if signal was sent, false if no daemon was running. + */ +export async function stopDaemon(root: string): Promise { + const info = await getDaemonStatus(root); + if (!info) return false; + + try { + process.kill(info.pid, "SIGTERM"); + // Wait briefly for process to exit, then clean up PID file + await new Promise((r) => setTimeout(r, 500)); + if (!isProcessAlive(info.pid)) { + await removePid(root); + } + return true; + } catch { + await removePid(root); + return false; + } +} diff --git a/packages/core/src/daemon/queue.test.ts b/packages/core/src/daemon/queue.test.ts new file mode 100644 index 0000000..8b0252d --- /dev/null +++ b/packages/core/src/daemon/queue.test.ts @@ -0,0 +1,229 @@ +import { afterEach, describe, expect, test } from "bun:test"; +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { initVault } from "../vault.js"; +import { + clearFailed, + dequeue, + enqueue, + listFailed, + listPending, + markFailed, + queueDepth, + readItem, +} from "./queue.js"; + +let tempDir: string; + +afterEach(async () => { + if (tempDir) { + await rm(tempDir, { recursive: true, force: true }); + } +}); + +async function makeTempVault() { + tempDir = await mkdtemp(join(tmpdir(), "kib-queue-test-")); + await initVault(tempDir, { name: "queue-test" }); + return tempDir; +} + +describe("enqueue", () => { + test("creates a queue item file and returns an ID", async () => { + const root = await makeTempVault(); + const id = await enqueue(root, "/path/to/file.md", "inbox"); + expect(id).toBeTruthy(); + expect(typeof id).toBe("string"); + + const item = await readItem(root, id); + expect(item).not.toBeNull(); + expect(item!.uri).toBe("/path/to/file.md"); + expect(item!.source).toBe("inbox"); + expect(item!.retries).toBe(0); + }); + + test("stores options when provided", async () => { + const root = await makeTempVault(); + const id = await enqueue(root, "https://example.com", "http", { + title: "Test Article", + tags: ["test", "example"], + }); + + const item = await readItem(root, id); + expect(item!.options?.title).toBe("Test Article"); + expect(item!.options?.tags).toEqual(["test", "example"]); + }); + + test("generates unique IDs", async () => { + const root = await makeTempVault(); + const ids = new Set(); + for (let i = 0; i < 100; i++) { + ids.add(await enqueue(root, `/file-${i}.md`, "inbox")); + } + expect(ids.size).toBe(100); + }); +}); + +describe("listPending", () => { + test("returns empty array when queue is empty", async () => { + const root = await makeTempVault(); + const items = await listPending(root); + expect(items).toEqual([]); + }); + + test("returns items in FIFO order", async () => { + const root = await makeTempVault(); + const id1 = await enqueue(root, "/first.md", "inbox"); + // Small delay to ensure different timestamp in ID + await new Promise((r) => setTimeout(r, 5)); + const id2 = await enqueue(root, "/second.md", "inbox"); + await new Promise((r) => setTimeout(r, 5)); + const id3 = await enqueue(root, "/third.md", "folder"); + + const items = await listPending(root); + expect(items.length).toBe(3); + expect(items[0].id).toBe(id1); + expect(items[1].id).toBe(id2); + expect(items[2].id).toBe(id3); + }); + + test("respects limit parameter", async () => { + const root = await makeTempVault(); + for (let i = 0; i < 10; i++) { + await enqueue(root, `/file-${i}.md`, "inbox"); + } + const items = await listPending(root, 3); + expect(items.length).toBe(3); + }); +}); + +describe("dequeue", () => { + test("removes an item from the queue", async () => { + const root = await makeTempVault(); + const id = await enqueue(root, "/file.md", "inbox"); + expect(await readItem(root, id)).not.toBeNull(); + + await dequeue(root, id); + expect(await readItem(root, id)).toBeNull(); + }); + + test("does not throw for non-existent items", async () => { + const root = await makeTempVault(); + await dequeue(root, "non-existent-id"); // should not throw + }); +}); + +describe("markFailed", () => { + test("increments retry count and preserves item for retry", async () => { + const root = await makeTempVault(); + const id = await enqueue(root, "/file.md", "inbox"); + + const shouldRetry = await markFailed(root, id, "connection timeout"); + expect(shouldRetry).toBe(true); + + const item = await readItem(root, id); + expect(item!.retries).toBe(1); + expect(item!.lastError).toBe("connection timeout"); + }); + + test("moves to failed/ after max retries exhausted", async () => { + const root = await makeTempVault(); + const id = await enqueue(root, "/file.md", "inbox"); + + // Exhaust retries (MAX_RETRIES = 3) + await markFailed(root, id, "error 1"); // retry 1 + await markFailed(root, id, "error 2"); // retry 2 + const shouldRetry = await markFailed(root, id, "error 3"); // retry 3 → failed + expect(shouldRetry).toBe(false); + + // Should be gone from pending + expect(await readItem(root, id)).toBeNull(); + expect(await queueDepth(root)).toBe(0); + + // Should be in failed/ + const failed = await listFailed(root); + expect(failed.length).toBe(1); + expect(failed[0].id).toBe(id); + expect(failed[0].retries).toBe(3); + expect(failed[0].lastError).toBe("error 3"); + }); + + test("returns false for non-existent items", async () => { + const root = await makeTempVault(); + const result = await markFailed(root, "ghost", "error"); + expect(result).toBe(false); + }); +}); + +describe("queueDepth", () => { + test("returns 0 for empty queue", async () => { + const root = await makeTempVault(); + expect(await queueDepth(root)).toBe(0); + }); + + test("tracks enqueue and dequeue correctly", async () => { + const root = await makeTempVault(); + const id1 = await enqueue(root, "/a.md", "inbox"); + await enqueue(root, "/b.md", "inbox"); + expect(await queueDepth(root)).toBe(2); + + await dequeue(root, id1); + expect(await queueDepth(root)).toBe(1); + }); +}); + +describe("clearFailed", () => { + test("removes all failed items and returns count", async () => { + const root = await makeTempVault(); + + // Create two items and fail them completely + for (const uri of ["/a.md", "/b.md"]) { + const id = await enqueue(root, uri, "inbox"); + await markFailed(root, id, "err"); + await markFailed(root, id, "err"); + await markFailed(root, id, "err"); // moves to failed/ + } + + expect((await listFailed(root)).length).toBe(2); + const cleared = await clearFailed(root); + expect(cleared).toBe(2); + expect((await listFailed(root)).length).toBe(0); + }); + + test("returns 0 when no failed items", async () => { + const root = await makeTempVault(); + expect(await clearFailed(root)).toBe(0); + }); +}); + +describe("performance", () => { + test("enqueue + dequeue 500 items in under 2 seconds", async () => { + const root = await makeTempVault(); + const start = performance.now(); + + const ids: string[] = []; + for (let i = 0; i < 500; i++) { + ids.push(await enqueue(root, `/file-${i}.md`, "inbox")); + } + for (const id of ids) { + await dequeue(root, id); + } + + const elapsed = performance.now() - start; + expect(elapsed).toBeLessThan(2000); + }); + + test("listPending with 200 items returns in under 500ms", async () => { + const root = await makeTempVault(); + for (let i = 0; i < 200; i++) { + await enqueue(root, `/file-${i}.md`, "inbox"); + } + + const start = performance.now(); + const items = await listPending(root, 200); + const elapsed = performance.now() - start; + + expect(items.length).toBe(200); + expect(elapsed).toBeLessThan(500); + }); +}); diff --git a/packages/core/src/daemon/queue.ts b/packages/core/src/daemon/queue.ts new file mode 100644 index 0000000..f0fd42a --- /dev/null +++ b/packages/core/src/daemon/queue.ts @@ -0,0 +1,169 @@ +import { mkdir, readdir, readFile, unlink, writeFile } from "node:fs/promises"; +import { join } from "node:path"; +import { VAULT_DIR } from "../constants.js"; + +const QUEUE_DIR = "queue"; +const FAILED_DIR = "failed"; +const MAX_RETRIES = 3; + +export interface QueueItem { + id: string; + uri: string; + source: "inbox" | "http" | "folder" | "clipboard"; + timestamp: string; + options?: { + title?: string; + category?: string; + tags?: string[]; + }; + retries: number; + lastError?: string; +} + +function queueDir(root: string): string { + return join(root, VAULT_DIR, QUEUE_DIR); +} + +function failedDir(root: string): string { + return join(root, VAULT_DIR, QUEUE_DIR, FAILED_DIR); +} + +function itemPath(root: string, id: string): string { + return join(queueDir(root), `${id}.json`); +} + +/** Generate a sortable, unique queue item ID (timestamp + random suffix). */ +function generateId(): string { + const ts = Date.now().toString(36); + const rand = Math.random().toString(36).slice(2, 8); + return `${ts}-${rand}`; +} + +/** Ensure queue directories exist. */ +export async function ensureQueueDirs(root: string): Promise { + await mkdir(queueDir(root), { recursive: true }); + await mkdir(failedDir(root), { recursive: true }); +} + +/** Enqueue a URI for ingestion. Returns the item ID. */ +export async function enqueue( + root: string, + uri: string, + source: QueueItem["source"], + options?: QueueItem["options"], +): Promise { + const id = generateId(); + const item: QueueItem = { + id, + uri, + source, + timestamp: new Date().toISOString(), + options, + retries: 0, + }; + await ensureQueueDirs(root); + await writeFile(itemPath(root, id), JSON.stringify(item, null, 2), "utf-8"); + return id; +} + +/** Read a single queue item by ID. Returns null if not found. */ +export async function readItem(root: string, id: string): Promise { + try { + const raw = await readFile(itemPath(root, id), "utf-8"); + return JSON.parse(raw) as QueueItem; + } catch { + return null; + } +} + +/** + * List pending items in FIFO order (sorted by ID which embeds timestamp). + * Returns at most `limit` items. + */ +export async function listPending(root: string, limit = 50): Promise { + try { + const files = await readdir(queueDir(root)); + const jsonFiles = files.filter((f) => f.endsWith(".json")).sort(); // lexicographic = chronological since IDs start with timestamp + const items: QueueItem[] = []; + for (const file of jsonFiles.slice(0, limit)) { + const raw = await readFile(join(queueDir(root), file), "utf-8"); + items.push(JSON.parse(raw) as QueueItem); + } + return items; + } catch { + return []; + } +} + +/** Remove a successfully processed item from the queue. */ +export async function dequeue(root: string, id: string): Promise { + try { + await unlink(itemPath(root, id)); + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== "ENOENT") throw err; + } +} + +/** + * Mark an item as failed. Increments retry count. + * If retries exhausted, moves to failed/ directory. + * Returns true if the item should be retried, false if moved to failed. + */ +export async function markFailed(root: string, id: string, error: string): Promise { + const item = await readItem(root, id); + if (!item) return false; + + item.retries += 1; + item.lastError = error; + + if (item.retries >= MAX_RETRIES) { + // Move to failed/ + await ensureQueueDirs(root); + await writeFile(join(failedDir(root), `${id}.json`), JSON.stringify(item, null, 2), "utf-8"); + await dequeue(root, id); + return false; + } + + // Update in place for retry + await writeFile(itemPath(root, id), JSON.stringify(item, null, 2), "utf-8"); + return true; +} + +/** List items that have permanently failed. */ +export async function listFailed(root: string): Promise { + try { + const files = await readdir(failedDir(root)); + const items: QueueItem[] = []; + for (const file of files.filter((f) => f.endsWith(".json"))) { + const raw = await readFile(join(failedDir(root), file), "utf-8"); + items.push(JSON.parse(raw) as QueueItem); + } + return items; + } catch { + return []; + } +} + +/** Clear all failed items. Returns the count removed. */ +export async function clearFailed(root: string): Promise { + try { + const files = await readdir(failedDir(root)); + const jsonFiles = files.filter((f) => f.endsWith(".json")); + for (const file of jsonFiles) { + await unlink(join(failedDir(root), file)); + } + return jsonFiles.length; + } catch { + return 0; + } +} + +/** Get queue depth (number of pending items). Fast — just counts files. */ +export async function queueDepth(root: string): Promise { + try { + const files = await readdir(queueDir(root)); + return files.filter((f) => f.endsWith(".json")).length; + } catch { + return 0; + } +} diff --git a/packages/core/src/daemon/scheduler.test.ts b/packages/core/src/daemon/scheduler.test.ts new file mode 100644 index 0000000..f05688c --- /dev/null +++ b/packages/core/src/daemon/scheduler.test.ts @@ -0,0 +1,134 @@ +import { afterEach, describe, expect, test } from "bun:test"; +import { CompileScheduler } from "./scheduler.js"; + +describe("CompileScheduler", () => { + let scheduler: CompileScheduler; + + afterEach(() => { + scheduler?.stop(); + }); + + test("triggers compile when threshold is reached", async () => { + let compiled = false; + scheduler = new CompileScheduler({ + threshold: 3, + delayMs: 60_000, // high so idle timer doesn't fire + onCompile: async () => { + compiled = true; + }, + }); + + scheduler.recordIngest(); + scheduler.recordIngest(); + expect(compiled).toBe(false); + expect(scheduler.pendingCount()).toBe(2); + + scheduler.recordIngest(); // threshold hit + // onCompile is async, give it a tick + await new Promise((r) => setTimeout(r, 10)); + expect(compiled).toBe(true); + }); + + test("triggers compile after idle timeout", async () => { + let compiled = false; + scheduler = new CompileScheduler({ + threshold: 100, // high so threshold doesn't trigger + delayMs: 50, // short for testing + onCompile: async () => { + compiled = true; + }, + }); + + scheduler.recordIngest(); + expect(compiled).toBe(false); + + // Wait for idle timer + await new Promise((r) => setTimeout(r, 100)); + expect(compiled).toBe(true); + }); + + test("resets idle timer on each ingest", async () => { + let compileCount = 0; + scheduler = new CompileScheduler({ + threshold: 100, + delayMs: 80, + onCompile: async () => { + compileCount++; + }, + }); + + scheduler.recordIngest(); + await new Promise((r) => setTimeout(r, 40)); + scheduler.recordIngest(); // resets timer + await new Promise((r) => setTimeout(r, 40)); + // 80ms total but timer was reset at 40ms, so only 40ms since last ingest + expect(compileCount).toBe(0); + + await new Promise((r) => setTimeout(r, 60)); // now 100ms since last ingest > 80ms delay + expect(compileCount).toBe(1); + }); + + test("resets ingest count after compile completes", async () => { + scheduler = new CompileScheduler({ + threshold: 2, + delayMs: 0, + onCompile: async () => {}, + }); + + scheduler.recordIngest(); + scheduler.recordIngest(); // triggers + await new Promise((r) => setTimeout(r, 10)); + expect(scheduler.pendingCount()).toBe(0); + }); + + test("does not trigger concurrent compiles", async () => { + let concurrent = 0; + let maxConcurrent = 0; + scheduler = new CompileScheduler({ + threshold: 1, + delayMs: 0, + onCompile: async () => { + concurrent++; + maxConcurrent = Math.max(maxConcurrent, concurrent); + await new Promise((r) => setTimeout(r, 50)); + concurrent--; + }, + }); + + scheduler.recordIngest(); // triggers compile + scheduler.recordIngest(); // should be ignored (already compiling) + await new Promise((r) => setTimeout(r, 100)); + expect(maxConcurrent).toBe(1); + }); + + test("stop() cancels pending timer", async () => { + let compiled = false; + scheduler = new CompileScheduler({ + threshold: 100, + delayMs: 30, + onCompile: async () => { + compiled = true; + }, + }); + + scheduler.recordIngest(); + scheduler.stop(); + await new Promise((r) => setTimeout(r, 60)); + expect(compiled).toBe(false); + }); + + test("does not trigger when delayMs is 0 and threshold not met", async () => { + let compiled = false; + scheduler = new CompileScheduler({ + threshold: 5, + delayMs: 0, + onCompile: async () => { + compiled = true; + }, + }); + + scheduler.recordIngest(); + await new Promise((r) => setTimeout(r, 20)); + expect(compiled).toBe(false); + }); +}); diff --git a/packages/core/src/daemon/scheduler.ts b/packages/core/src/daemon/scheduler.ts new file mode 100644 index 0000000..8b9bbb1 --- /dev/null +++ b/packages/core/src/daemon/scheduler.ts @@ -0,0 +1,97 @@ +/** + * Auto-compile scheduler. + * Triggers compilation after N new ingests OR after T ms of idle. + * Debounced: each new ingest resets the idle timer. + */ + +export interface SchedulerOptions { + /** Compile after this many new sources (default: 5) */ + threshold: number; + /** Compile after this many ms of inactivity (default: 30 min) */ + delayMs: number; + /** Callback to run compilation */ + onCompile: () => Promise; + /** Optional callback for logging */ + onLog?: (message: string) => void; +} + +export class CompileScheduler { + private ingestCount = 0; + private timer: ReturnType | null = null; + private compiling = false; + private opts: SchedulerOptions; + + constructor(opts: SchedulerOptions) { + this.opts = opts; + } + + /** Notify the scheduler that a source was ingested. */ + recordIngest(): void { + this.ingestCount++; + this.opts.onLog?.( + `Scheduler: ${this.ingestCount}/${this.opts.threshold} sources toward auto-compile`, + ); + + if (this.ingestCount >= this.opts.threshold) { + this.triggerCompile("threshold reached"); + return; + } + + // Reset idle timer + this.resetTimer(); + } + + /** Get how many ingests since the last compile. */ + pendingCount(): number { + return this.ingestCount; + } + + /** Whether a compile is currently running. */ + isCompiling(): boolean { + return this.compiling; + } + + /** Cancel any pending timer and reset state. */ + stop(): void { + this.clearTimer(); + this.ingestCount = 0; + this.compiling = false; + } + + private resetTimer(): void { + this.clearTimer(); + if (this.opts.delayMs > 0 && this.ingestCount > 0) { + this.timer = setTimeout(() => { + this.triggerCompile("idle timeout"); + }, this.opts.delayMs); + } + } + + private clearTimer(): void { + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + } + + private triggerCompile(reason: string): void { + if (this.compiling) return; + + this.clearTimer(); + this.compiling = true; + this.opts.onLog?.(`Auto-compile triggered: ${reason} (${this.ingestCount} new sources)`); + + this.opts + .onCompile() + .then(() => { + this.opts.onLog?.("Auto-compile completed."); + }) + .catch((err) => { + this.opts.onLog?.(`Auto-compile failed: ${(err as Error).message}`); + }) + .finally(() => { + this.ingestCount = 0; + this.compiling = false; + }); + } +} diff --git a/packages/core/src/daemon/service.ts b/packages/core/src/daemon/service.ts new file mode 100644 index 0000000..49f6f22 --- /dev/null +++ b/packages/core/src/daemon/service.ts @@ -0,0 +1,156 @@ +import { existsSync } from "node:fs"; +import { mkdir, unlink, writeFile } from "node:fs/promises"; +import { homedir, platform } from "node:os"; +import { dirname, join } from "node:path"; + +const LAUNCHD_LABEL = "com.kibhq.watch"; +const SYSTEMD_UNIT = "kib-watch.service"; + +export type ServicePlatform = "macos" | "linux" | "unsupported"; + +export function detectPlatform(): ServicePlatform { + const p = platform(); + if (p === "darwin") return "macos"; + if (p === "linux") return "linux"; + return "unsupported"; +} + +function launchdPlistPath(): string { + return join(homedir(), "Library", "LaunchAgents", `${LAUNCHD_LABEL}.plist`); +} + +function systemdUnitPath(): string { + return join(homedir(), ".config", "systemd", "user", SYSTEMD_UNIT); +} + +function generateLaunchdPlist(vaultRoot: string, kibBinary: string): string { + return ` + + + + Label + ${LAUNCHD_LABEL} + ProgramArguments + + ${kibBinary} + watch + + WorkingDirectory + ${vaultRoot} + RunAtLoad + + KeepAlive + + StandardOutPath + ${vaultRoot}/.kb/logs/watch-stdout.log + StandardErrorPath + ${vaultRoot}/.kb/logs/watch-stderr.log + +`; +} + +function generateSystemdUnit(vaultRoot: string, kibBinary: string): string { + return `[Unit] +Description=kib watch daemon — passive knowledge ingestion +After=network.target + +[Service] +Type=simple +WorkingDirectory=${vaultRoot} +ExecStart=${kibBinary} watch +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=default.target`; +} + +/** Resolve the kib binary path (installed via npm or standalone). */ +function resolveKibBinary(): string { + // Check common locations + const candidates = [ + join(homedir(), ".bun", "bin", "kib"), + "/usr/local/bin/kib", + join(homedir(), ".local", "bin", "kib"), + ]; + for (const c of candidates) { + if (existsSync(c)) return c; + } + // Fallback: assume it's on PATH + return "kib"; +} + +export interface InstallResult { + platform: ServicePlatform; + path: string; + instructions: string; +} + +/** + * Install a system service to auto-start `kib watch` on login. + * Returns the path written and any manual instructions. + */ +export async function installService(vaultRoot: string): Promise { + const plat = detectPlatform(); + const binary = resolveKibBinary(); + + if (plat === "macos") { + const path = launchdPlistPath(); + const content = generateLaunchdPlist(vaultRoot, binary); + await mkdir(dirname(path), { recursive: true }); + await writeFile(path, content, "utf-8"); + return { + platform: plat, + path, + instructions: `Run: launchctl load ${path}`, + }; + } + + if (plat === "linux") { + const path = systemdUnitPath(); + const content = generateSystemdUnit(vaultRoot, binary); + await mkdir(dirname(path), { recursive: true }); + await writeFile(path, content, "utf-8"); + return { + platform: plat, + path, + instructions: `Run: systemctl --user enable --now ${SYSTEMD_UNIT}`, + }; + } + + throw new Error("Service installation is only supported on macOS and Linux."); +} + +/** Check if the service is installed. */ +export async function isServiceInstalled(): Promise<{ installed: boolean; path: string }> { + const plat = detectPlatform(); + if (plat === "macos") { + const path = launchdPlistPath(); + return { installed: existsSync(path), path }; + } + if (plat === "linux") { + const path = systemdUnitPath(); + return { installed: existsSync(path), path }; + } + return { installed: false, path: "" }; +} + +/** Uninstall the system service. */ +export async function uninstallService(): Promise<{ removed: boolean; path: string }> { + const plat = detectPlatform(); + if (plat === "macos") { + const path = launchdPlistPath(); + if (existsSync(path)) { + await unlink(path); + return { removed: true, path }; + } + } + if (plat === "linux") { + const path = systemdUnitPath(); + if (existsSync(path)) { + await unlink(path); + return { removed: true, path }; + } + } + return { removed: false, path: "" }; +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 8d55863..810fcbd 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -8,6 +8,7 @@ export { extractWikilinks, parseCompileOutput, parseFrontmatter } from "./compil export { enrichCrossReferences } from "./compile/enrichment.js"; export { computeStats, generateIndexMd } from "./compile/index-manager.js"; export * from "./constants.js"; +export * from "./daemon/index.js"; export * from "./errors.js"; export * from "./hash.js"; export { ingestSource } from "./ingest/ingest.js"; diff --git a/packages/core/src/schemas.ts b/packages/core/src/schemas.ts index 4ea4e95..f681133 100644 --- a/packages/core/src/schemas.ts +++ b/packages/core/src/schemas.ts @@ -95,6 +95,18 @@ export const VaultConfigSchema = z.object({ inbox_path: z.string().default("inbox"), auto_compile: z.boolean().default(true), poll_interval_ms: z.number().int().positive().default(DEFAULTS.watchPollIntervalMs), + auto_compile_threshold: z.number().int().positive().default(DEFAULTS.autoCompileThreshold), + auto_compile_delay_ms: z.number().int().nonnegative().default(DEFAULTS.autoCompileDelayMs), + log_max_mb: z.number().positive().default(DEFAULTS.watchLogMaxMb), + folders: z + .array( + z.object({ + path: z.string(), + glob: z.string().default("*"), + recursive: z.boolean().default(false), + }), + ) + .default([]), }), search: z.object({ engine: z.enum(["builtin", "vector", "hybrid"]).default("builtin"),