diff --git a/README.md b/README.md index d8a1ec9..35862a6 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,8 @@ docker exec -it hermes-memory hermes | `recall.strategy` | `"hybrid"` | Recall strategy: `keyword` / `embedding` / `hybrid` (RRF fusion, recommended) | | `recall.maxResults` | `5` | Number of items returned per recall | | `pipeline.everyNConversations` | `5` | Trigger an L1 memory extraction every N turns | +| `pipeline.enableL2` | `true` | Enable L2 scene extraction after L1 | +| `pipeline.enableL3` | `true` | Enable L3 persona generation after L2 | | `extraction.maxMemoriesPerSession` | `20` | Max memories extracted per L1 pass | | `persona.triggerEveryN` | `50` | Generate the user persona every N new memories | | `offload.enabled` | `false` | Whether to enable short-term compression | diff --git a/README_CN.md b/README_CN.md index 8698126..34c9b18 100644 --- a/README_CN.md +++ b/README_CN.md @@ -263,6 +263,8 @@ docker exec -it hermes-memory hermes | `recall.strategy` | `"hybrid"` | 召回策略:`keyword` / `embedding` / `hybrid`(RRF 融合,推荐) | | `recall.maxResults` | `5` | 每次召回条数 | | `pipeline.everyNConversations` | `5` | 每 N 轮对话触发一次 L1 记忆提取 | +| `pipeline.enableL2` | `true` | L1 后是否启用 L2 场景提取 | +| `pipeline.enableL3` | `true` | L2 后是否启用 L3 画像生成 | | `extraction.maxMemoriesPerSession` | `20` | 单次 L1 最多提取多少条 | | `persona.triggerEveryN` | `50` | 每 N 条新记忆触发用户画像生成 | | `offload.enabled` | `false` | 是否启用短期记忆压缩 | diff --git a/src/config.ts b/src/config.ts index 30a68e3..7a39e4b 100644 --- a/src/config.ts +++ b/src/config.ts @@ -62,6 +62,10 @@ export interface PipelineTriggerConfig { everyNConversations: number; /** Enable warm-up: start threshold at 1, double after each L1 (1→2→4→...→everyN) (default: true) */ enableWarmup: boolean; + /** Enable L2 scene extraction scheduling after L1 completes (default: true) */ + enableL2: boolean; + /** Enable L3 persona generation after L2 completes (default: true) */ + enableL3: boolean; /** L1 idle timeout: trigger L1 after this many seconds of inactivity (default: 600) */ l1IdleTimeoutSeconds: number; /** L2 delay after L1: wait this many seconds after L1 completes before triggering L2 (default: 90) */ @@ -477,6 +481,8 @@ export function parseConfig(raw: Record | undefined): MemoryTda pipeline: { everyNConversations: num(pipelineGroup, "everyNConversations") ?? 5, enableWarmup: bool(pipelineGroup, "enableWarmup") ?? true, + enableL2: bool(pipelineGroup, "enableL2") ?? true, + enableL3: bool(pipelineGroup, "enableL3") ?? true, l1IdleTimeoutSeconds: num(pipelineGroup, "l1IdleTimeoutSeconds") ?? 600, l2DelayAfterL1Seconds: num(pipelineGroup, "l2DelayAfterL1Seconds") ?? 90, l2MinIntervalSeconds: num(pipelineGroup, "l2MinIntervalSeconds") ?? 900, diff --git a/src/core/pipeline-controls.test.ts b/src/core/pipeline-controls.test.ts new file mode 100644 index 0000000..24f5422 --- /dev/null +++ b/src/core/pipeline-controls.test.ts @@ -0,0 +1,82 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { parseConfig } from "../config.js"; +import { TdaiCore } from "./tdai-core.js"; + +const tempDirs: string[] = []; + +function makeTempDir(): string { + const dir = mkdtempSync(join(tmpdir(), "memory-tdai-pipeline-controls-")); + tempDirs.push(dir); + return dir; +} + +function createHostAdapter(dataDir: string, runnerCalls: Array<{ enableTools: boolean }>) { + return { + hostType: "standalone" as const, + getLLMRunnerFactory: () => ({ + createRunner: (options: { enableTools?: boolean } = {}) => { + runnerCalls.push({ enableTools: options.enableTools ?? false }); + return { run: async () => "" }; + }, + }), + getLogger: () => ({ + debug: () => {}, + error: () => {}, + info: () => {}, + warn: () => {}, + }), + getRuntimeContext: () => ({ + dataDir, + platform: "vitest", + sessionId: "session-1", + sessionKey: "chat-1:session-1", + userId: "user-1", + workspaceDir: "/tmp/workspace", + }), + }; +} + +afterEach(() => { + for (const dir of tempDirs.splice(0)) { + rmSync(dir, { recursive: true, force: true }); + } +}); + +describe("pipeline controls", () => { + it("defaults L2 and L3 pipeline stages to enabled", () => { + const config = parseConfig({}); + + expect(config.pipeline.enableL2).toBe(true); + expect(config.pipeline.enableL3).toBe(true); + }); + + it("can enable L1 extraction without creating tool-enabled L2/L3 runners", async () => { + const runnerCalls: Array<{ enableTools: boolean }> = []; + const config = parseConfig({ + extraction: { enabled: true }, + pipeline: { + enableL2: false, + enableL3: false, + enableWarmup: false, + }, + }); + const core = new TdaiCore({ + config, + hostAdapter: createHostAdapter(makeTempDir(), runnerCalls), + }); + + await core.initialize(); + await core.destroy(); + + expect(runnerCalls).toEqual([{ enableTools: false }]); + expect(core.getScheduler()?.getQueueSizes()).toMatchObject({ + l2: 0, + l2Pending: false, + l3: 0, + l3Pending: false, + }); + }); +}); diff --git a/src/core/seed/seed-runtime.ts b/src/core/seed/seed-runtime.ts index 46e6647..721a533 100644 --- a/src/core/seed/seed-runtime.ts +++ b/src/core/seed/seed-runtime.ts @@ -91,7 +91,9 @@ async function createSeedPipeline(opts: SeedRuntimeOptions): Promise<{ pipeline: logger, }); l1LlmRunner = runnerFactory.createRunner({ enableTools: false }); - l2l3LlmRunner = runnerFactory.createRunner({ enableTools: true }); + if (cfg.pipeline.enableL2 || cfg.pipeline.enableL3) { + l2l3LlmRunner = runnerFactory.createRunner({ enableTools: true }); + } logger.info(`${TAG} Seed using standalone LLM: model=${cfg.llm.model}`); } @@ -105,24 +107,28 @@ async function createSeedPipeline(opts: SeedRuntimeOptions): Promise<{ pipeline: }); // Wire L2 runner via shared factory (same logic as index.ts live runtime) - pipeline.scheduler.setL2Runner(createL2Runner({ - pluginDataDir: outputDir, - cfg, - openclawConfig, - vectorStore: pipeline.vectorStore, - logger, - llmRunner: l2l3LlmRunner, - })); + if (cfg.pipeline.enableL2) { + pipeline.scheduler.setL2Runner(createL2Runner({ + pluginDataDir: outputDir, + cfg, + openclawConfig, + vectorStore: pipeline.vectorStore, + logger, + llmRunner: l2l3LlmRunner, + })); + } // Wire L3 runner via shared factory (same logic as index.ts live runtime) - pipeline.scheduler.setL3Runner(createL3Runner({ - pluginDataDir: outputDir, - cfg, - openclawConfig, - vectorStore: pipeline.vectorStore, - logger, - llmRunner: l2l3LlmRunner, - })); + if (cfg.pipeline.enableL3) { + pipeline.scheduler.setL3Runner(createL3Runner({ + pluginDataDir: outputDir, + cfg, + openclawConfig, + vectorStore: pipeline.vectorStore, + logger, + llmRunner: l2l3LlmRunner, + })); + } return { pipeline, cfg }; } diff --git a/src/core/tdai-core.ts b/src/core/tdai-core.ts index 977d4a2..57cbf59 100644 --- a/src/core/tdai-core.ts +++ b/src/core/tdai-core.ts @@ -449,7 +449,8 @@ export class TdaiCore { const l1LlmRunner = useStandaloneRunner ? runnerFactory.createRunner({ enableTools: false }) : undefined; - const l2l3LlmRunner = useStandaloneRunner + const needsToolRunner = this.cfg.pipeline.enableL2 || this.cfg.pipeline.enableL3; + const l2l3LlmRunner = useStandaloneRunner && needsToolRunner ? runnerFactory.createRunner({ enableTools: true }) : undefined; @@ -469,32 +470,36 @@ export class TdaiCore { this.scheduler.setPersister(createPersister(this.dataDir, this.logger)); // L2 runner - this.scheduler.setL2Runner(async (sessionKey: string, cursor?: string) => { - const l2Runner = createL2Runner({ - pluginDataDir: this.dataDir, - cfg: this.cfg, - openclawConfig, - vectorStore: this.vectorStore, - logger: this.logger, - instanceId: this.instanceId, - llmRunner: l2l3LlmRunner, + if (this.cfg.pipeline.enableL2) { + this.scheduler.setL2Runner(async (sessionKey: string, cursor?: string) => { + const l2Runner = createL2Runner({ + pluginDataDir: this.dataDir, + cfg: this.cfg, + openclawConfig, + vectorStore: this.vectorStore, + logger: this.logger, + instanceId: this.instanceId, + llmRunner: l2l3LlmRunner, + }); + return l2Runner(sessionKey, cursor); }); - return l2Runner(sessionKey, cursor); - }); + } // L3 runner - this.scheduler.setL3Runner(async () => { - const l3Runner = createL3Runner({ - pluginDataDir: this.dataDir, - cfg: this.cfg, - openclawConfig, - vectorStore: this.vectorStore, - logger: this.logger, - instanceId: this.instanceId, - llmRunner: l2l3LlmRunner, + if (this.cfg.pipeline.enableL3) { + this.scheduler.setL3Runner(async () => { + const l3Runner = createL3Runner({ + pluginDataDir: this.dataDir, + cfg: this.cfg, + openclawConfig, + vectorStore: this.vectorStore, + logger: this.logger, + instanceId: this.instanceId, + llmRunner: l2l3LlmRunner, + }); + await l3Runner(); }); - await l3Runner(); - }); + } this.logger.debug?.(`${TAG} Pipeline runners wired`); } diff --git a/src/utils/pipeline-factory.ts b/src/utils/pipeline-factory.ts index 4595753..b47c68d 100644 --- a/src/utils/pipeline-factory.ts +++ b/src/utils/pipeline-factory.ts @@ -664,6 +664,8 @@ export function createPipelineManager( { everyNConversations: cfg.pipeline.everyNConversations, enableWarmup: cfg.pipeline.enableWarmup, + enableL2: cfg.pipeline.enableL2, + enableL3: cfg.pipeline.enableL3, l1: { idleTimeoutSeconds: cfg.pipeline.l1IdleTimeoutSeconds }, l2: { delayAfterL1Seconds: cfg.pipeline.l2DelayAfterL1Seconds, diff --git a/src/utils/pipeline-manager.test.ts b/src/utils/pipeline-manager.test.ts new file mode 100644 index 0000000..e2af580 --- /dev/null +++ b/src/utils/pipeline-manager.test.ts @@ -0,0 +1,68 @@ +import { describe, expect, it } from "vitest"; +import { MemoryPipelineManager, type PipelineConfig } from "./pipeline-manager.js"; + +function baseConfig(overrides: Partial = {}): PipelineConfig { + return { + enableL2: true, + enableL3: true, + enableWarmup: false, + everyNConversations: 1, + l1: { idleTimeoutSeconds: 60 }, + l2: { + delayAfterL1Seconds: 0, + maxIntervalSeconds: 60, + minIntervalSeconds: 0, + sessionActiveWindowHours: 24, + }, + ...overrides, + }; +} + +async function waitFor(predicate: () => boolean): Promise { + const deadline = Date.now() + 1_000; + while (Date.now() < deadline) { + if (predicate()) return; + await new Promise((resolve) => setTimeout(resolve, 10)); + } + throw new Error("condition not met"); +} + +describe("MemoryPipelineManager stage controls", () => { + it("does not schedule L2 or L3 when both stages are disabled", async () => { + const manager = new MemoryPipelineManager(baseConfig({ + enableL2: false, + enableL3: false, + })); + let l1Calls = 0; + let l2Calls = 0; + let l3Calls = 0; + + manager.setL1Runner(async () => { + l1Calls += 1; + }); + manager.setL2Runner(async () => { + l2Calls += 1; + }); + manager.setL3Runner(async () => { + l3Calls += 1; + }); + manager.start({}); + + await manager.notifyConversation("session-1", [ + { role: "user", content: "remember this", timestamp: new Date().toISOString() }, + ]); + await waitFor(() => l1Calls === 1); + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(l2Calls).toBe(0); + expect(l3Calls).toBe(0); + expect(manager.getQueueSizes()).toMatchObject({ + l2: 0, + l2Pending: false, + l3: 0, + l3Pending: false, + }); + + await manager.destroy(); + }); +}); diff --git a/src/utils/pipeline-manager.ts b/src/utils/pipeline-manager.ts index b56234c..56229aa 100644 --- a/src/utils/pipeline-manager.ts +++ b/src/utils/pipeline-manager.ts @@ -120,6 +120,12 @@ export interface PipelineConfig { */ enableWarmup: boolean; + /** Enable downstream L2 scene extraction after L1 succeeds. */ + enableL2: boolean; + + /** Enable downstream L3 persona generation after L2 succeeds. */ + enableL3: boolean; + l1: { /** Idle timeout before triggering L1 (seconds, default: 60) */ idleTimeoutSeconds: number; @@ -198,6 +204,8 @@ export class MemoryPipelineManager { private readonly l1IdleTimeoutMs: number; private readonly everyNConversations: number; private readonly enableWarmup: boolean; + private readonly enableL2: boolean; + private readonly enableL3: boolean; private readonly l2DelayAfterL1Ms: number; private readonly l2MinIntervalMs: number; private readonly l2MaxIntervalMs: number; @@ -255,6 +263,8 @@ export class MemoryPipelineManager { this.l1IdleTimeoutMs = config.l1.idleTimeoutSeconds * 1000; this.everyNConversations = config.everyNConversations; this.enableWarmup = config.enableWarmup; + this.enableL2 = config.enableL2; + this.enableL3 = config.enableL3; this.l2DelayAfterL1Ms = config.l2.delayAfterL1Seconds * 1000; this.l2MinIntervalMs = config.l2.minIntervalSeconds * 1000; this.l2MaxIntervalMs = config.l2.maxIntervalSeconds * 1000; @@ -265,6 +275,8 @@ export class MemoryPipelineManager { this.logger?.debug?.( `${TAG} Initialized: everyNConversations=${config.everyNConversations}, ` + `warmup=${config.enableWarmup ? "enabled" : "disabled"}, ` + + `l2=${config.enableL2 ? "enabled" : "disabled"}, ` + + `l3=${config.enableL3 ? "enabled" : "disabled"}, ` + `l1IdleTimeout=${config.l1.idleTimeoutSeconds}s, ` + `l2DelayAfterL1=${config.l2.delayAfterL1Seconds}s, ` + `l2MinInterval=${config.l2.minIntervalSeconds}s, ` + @@ -686,11 +698,11 @@ export class MemoryPipelineManager { if (!this.l1Runner) { this.logger?.warn(`${TAG} [${sessionKey}] No L1 runner set, skipping`); - state.l2_pending_l1_count = state.conversation_count; + state.l2_pending_l1_count = this.enableL2 ? state.conversation_count : 0; state.conversation_count = 0; this.advanceWarmupThreshold(state); await this.persistStates(); - this.advanceL2Timer(sessionKey); + if (this.enableL2) this.advanceL2Timer(sessionKey); return; } @@ -738,13 +750,13 @@ export class MemoryPipelineManager { // Success: reset retry count and advance state const timers = this.getOrCreateTimers(sessionKey); timers.l1RetryCount = 0; - state.l2_pending_l1_count = state.conversation_count; + state.l2_pending_l1_count = this.enableL2 ? state.conversation_count : 0; state.conversation_count = 0; this.advanceWarmupThreshold(state); await this.persistStates(); // Advance the L2 timer (downward-only) to fire after delay, respecting minInterval - this.advanceL2Timer(sessionKey); + if (this.enableL2) this.advanceL2Timer(sessionKey); } // ============================ @@ -762,6 +774,7 @@ export class MemoryPipelineManager { */ private advanceL2Timer(sessionKey: string): void { if (this.destroyed) return; + if (!this.enableL2) return; const timers = this.getOrCreateTimers(sessionKey); const now = Date.now(); @@ -796,6 +809,7 @@ export class MemoryPipelineManager { */ private armL2MaxInterval(sessionKey: string): void { if (this.destroyed) return; + if (!this.enableL2) return; const timers = this.getOrCreateTimers(sessionKey); const fireAt = Date.now() + this.l2MaxIntervalMs; @@ -819,6 +833,8 @@ export class MemoryPipelineManager { * - "max-interval": periodic timer — apply cold check normally. */ private onL2TimerFired(sessionKey: string, source: "delay-after-l1" | "max-interval"): void { + if (!this.enableL2) return; + const state = this.sessionStates.get(sessionKey); if (!state) return; @@ -844,6 +860,8 @@ export class MemoryPipelineManager { // ============================ private enqueueL2(sessionKey: string, trigger: string): void { + if (!this.enableL2) return; + const timers = this.getOrCreateTimers(sessionKey); // Cancel any pending L2 timer (we're about to run L2) @@ -919,7 +937,7 @@ export class MemoryPipelineManager { this.armL2MaxInterval(sessionKey); // Trigger L3 - this.triggerL3(); + if (this.enableL3) this.triggerL3(); } // ============================ @@ -928,6 +946,7 @@ export class MemoryPipelineManager { private triggerL3(): void { if (this.destroyed) return; + if (!this.enableL3) return; if (this.l3Running) { // L3 is in progress — mark pending so it runs again after current finishes @@ -1096,6 +1115,8 @@ export class MemoryPipelineManager { * because the pipeline may be starting during management commands. */ private recoverPendingSessions(): void { + if (!this.enableL2) return; + for (const [sessionKey, state] of this.sessionStates) { if (state.conversation_count === 0 && state.l2_pending_l1_count === 0) continue;