From 832ecf93e49de685e36886eacf2c76febb88a260 Mon Sep 17 00:00:00 2001 From: Dimon Date: Sun, 17 May 2026 02:42:53 +0800 Subject: [PATCH] feat(core): add optional pipeline stage controls Allow hosts to disable L2 scene extraction and L3 persona generation while keeping L1 extraction enabled. Defaults preserve the existing full pipeline, and targeted tests cover config parsing plus scheduler behavior. Signed-off-by: Dimon --- README.md | 2 + README_CN.md | 2 + src/config.ts | 6 +++ src/core/pipeline-controls.test.ts | 82 ++++++++++++++++++++++++++++++ src/core/seed/seed-runtime.ts | 40 ++++++++------- src/core/tdai-core.ts | 51 ++++++++++--------- src/utils/pipeline-factory.ts | 2 + src/utils/pipeline-manager.test.ts | 68 +++++++++++++++++++++++++ src/utils/pipeline-manager.ts | 31 +++++++++-- 9 files changed, 239 insertions(+), 45 deletions(-) create mode 100644 src/core/pipeline-controls.test.ts create mode 100644 src/utils/pipeline-manager.test.ts diff --git a/README.md b/README.md index d8a1ec9..35862a6 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,8 @@ docker exec -it hermes-memory hermes | `recall.strategy` | `"hybrid"` | Recall strategy: `keyword` / `embedding` / `hybrid` (RRF fusion, recommended) | | `recall.maxResults` | `5` | Number of items returned per recall | | `pipeline.everyNConversations` | `5` | Trigger an L1 memory extraction every N turns | +| `pipeline.enableL2` | `true` | Enable L2 scene extraction after L1 | +| `pipeline.enableL3` | `true` | Enable L3 persona generation after L2 | | `extraction.maxMemoriesPerSession` | `20` | Max memories extracted per L1 pass | | `persona.triggerEveryN` | `50` | Generate the user persona every N new memories | | `offload.enabled` | `false` | Whether to enable short-term compression | diff --git a/README_CN.md b/README_CN.md index 8698126..34c9b18 100644 --- a/README_CN.md +++ b/README_CN.md @@ -263,6 +263,8 @@ docker exec -it hermes-memory hermes | `recall.strategy` | `"hybrid"` | 召回策略:`keyword` / `embedding` / `hybrid`(RRF 融合,推荐) | | `recall.maxResults` | `5` | 每次召回条数 | | `pipeline.everyNConversations` | `5` | 每 N 轮对话触发一次 L1 记忆提取 | +| `pipeline.enableL2` | `true` | L1 后是否启用 L2 场景提取 | +| `pipeline.enableL3` | `true` | L2 后是否启用 L3 画像生成 | | `extraction.maxMemoriesPerSession` | `20` | 单次 L1 最多提取多少条 | | `persona.triggerEveryN` | `50` | 每 N 条新记忆触发用户画像生成 | | `offload.enabled` | `false` | 是否启用短期记忆压缩 | diff --git a/src/config.ts b/src/config.ts index 30a68e3..7a39e4b 100644 --- a/src/config.ts +++ b/src/config.ts @@ -62,6 +62,10 @@ export interface PipelineTriggerConfig { everyNConversations: number; /** Enable warm-up: start threshold at 1, double after each L1 (1→2→4→...→everyN) (default: true) */ enableWarmup: boolean; + /** Enable L2 scene extraction scheduling after L1 completes (default: true) */ + enableL2: boolean; + /** Enable L3 persona generation after L2 completes (default: true) */ + enableL3: boolean; /** L1 idle timeout: trigger L1 after this many seconds of inactivity (default: 600) */ l1IdleTimeoutSeconds: number; /** L2 delay after L1: wait this many seconds after L1 completes before triggering L2 (default: 90) */ @@ -477,6 +481,8 @@ export function parseConfig(raw: Record | undefined): MemoryTda pipeline: { everyNConversations: num(pipelineGroup, "everyNConversations") ?? 5, enableWarmup: bool(pipelineGroup, "enableWarmup") ?? true, + enableL2: bool(pipelineGroup, "enableL2") ?? true, + enableL3: bool(pipelineGroup, "enableL3") ?? true, l1IdleTimeoutSeconds: num(pipelineGroup, "l1IdleTimeoutSeconds") ?? 600, l2DelayAfterL1Seconds: num(pipelineGroup, "l2DelayAfterL1Seconds") ?? 90, l2MinIntervalSeconds: num(pipelineGroup, "l2MinIntervalSeconds") ?? 900, diff --git a/src/core/pipeline-controls.test.ts b/src/core/pipeline-controls.test.ts new file mode 100644 index 0000000..24f5422 --- /dev/null +++ b/src/core/pipeline-controls.test.ts @@ -0,0 +1,82 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { parseConfig } from "../config.js"; +import { TdaiCore } from "./tdai-core.js"; + +const tempDirs: string[] = []; + +function makeTempDir(): string { + const dir = mkdtempSync(join(tmpdir(), "memory-tdai-pipeline-controls-")); + tempDirs.push(dir); + return dir; +} + +function createHostAdapter(dataDir: string, runnerCalls: Array<{ enableTools: boolean }>) { + return { + hostType: "standalone" as const, + getLLMRunnerFactory: () => ({ + createRunner: (options: { enableTools?: boolean } = {}) => { + runnerCalls.push({ enableTools: options.enableTools ?? false }); + return { run: async () => "" }; + }, + }), + getLogger: () => ({ + debug: () => {}, + error: () => {}, + info: () => {}, + warn: () => {}, + }), + getRuntimeContext: () => ({ + dataDir, + platform: "vitest", + sessionId: "session-1", + sessionKey: "chat-1:session-1", + userId: "user-1", + workspaceDir: "/tmp/workspace", + }), + }; +} + +afterEach(() => { + for (const dir of tempDirs.splice(0)) { + rmSync(dir, { recursive: true, force: true }); + } +}); + +describe("pipeline controls", () => { + it("defaults L2 and L3 pipeline stages to enabled", () => { + const config = parseConfig({}); + + expect(config.pipeline.enableL2).toBe(true); + expect(config.pipeline.enableL3).toBe(true); + }); + + it("can enable L1 extraction without creating tool-enabled L2/L3 runners", async () => { + const runnerCalls: Array<{ enableTools: boolean }> = []; + const config = parseConfig({ + extraction: { enabled: true }, + pipeline: { + enableL2: false, + enableL3: false, + enableWarmup: false, + }, + }); + const core = new TdaiCore({ + config, + hostAdapter: createHostAdapter(makeTempDir(), runnerCalls), + }); + + await core.initialize(); + await core.destroy(); + + expect(runnerCalls).toEqual([{ enableTools: false }]); + expect(core.getScheduler()?.getQueueSizes()).toMatchObject({ + l2: 0, + l2Pending: false, + l3: 0, + l3Pending: false, + }); + }); +}); diff --git a/src/core/seed/seed-runtime.ts b/src/core/seed/seed-runtime.ts index 46e6647..721a533 100644 --- a/src/core/seed/seed-runtime.ts +++ b/src/core/seed/seed-runtime.ts @@ -91,7 +91,9 @@ async function createSeedPipeline(opts: SeedRuntimeOptions): Promise<{ pipeline: logger, }); l1LlmRunner = runnerFactory.createRunner({ enableTools: false }); - l2l3LlmRunner = runnerFactory.createRunner({ enableTools: true }); + if (cfg.pipeline.enableL2 || cfg.pipeline.enableL3) { + l2l3LlmRunner = runnerFactory.createRunner({ enableTools: true }); + } logger.info(`${TAG} Seed using standalone LLM: model=${cfg.llm.model}`); } @@ -105,24 +107,28 @@ async function createSeedPipeline(opts: SeedRuntimeOptions): Promise<{ pipeline: }); // Wire L2 runner via shared factory (same logic as index.ts live runtime) - pipeline.scheduler.setL2Runner(createL2Runner({ - pluginDataDir: outputDir, - cfg, - openclawConfig, - vectorStore: pipeline.vectorStore, - logger, - llmRunner: l2l3LlmRunner, - })); + if (cfg.pipeline.enableL2) { + pipeline.scheduler.setL2Runner(createL2Runner({ + pluginDataDir: outputDir, + cfg, + openclawConfig, + vectorStore: pipeline.vectorStore, + logger, + llmRunner: l2l3LlmRunner, + })); + } // Wire L3 runner via shared factory (same logic as index.ts live runtime) - pipeline.scheduler.setL3Runner(createL3Runner({ - pluginDataDir: outputDir, - cfg, - openclawConfig, - vectorStore: pipeline.vectorStore, - logger, - llmRunner: l2l3LlmRunner, - })); + if (cfg.pipeline.enableL3) { + pipeline.scheduler.setL3Runner(createL3Runner({ + pluginDataDir: outputDir, + cfg, + openclawConfig, + vectorStore: pipeline.vectorStore, + logger, + llmRunner: l2l3LlmRunner, + })); + } return { pipeline, cfg }; } diff --git a/src/core/tdai-core.ts b/src/core/tdai-core.ts index 977d4a2..57cbf59 100644 --- a/src/core/tdai-core.ts +++ b/src/core/tdai-core.ts @@ -449,7 +449,8 @@ export class TdaiCore { const l1LlmRunner = useStandaloneRunner ? runnerFactory.createRunner({ enableTools: false }) : undefined; - const l2l3LlmRunner = useStandaloneRunner + const needsToolRunner = this.cfg.pipeline.enableL2 || this.cfg.pipeline.enableL3; + const l2l3LlmRunner = useStandaloneRunner && needsToolRunner ? runnerFactory.createRunner({ enableTools: true }) : undefined; @@ -469,32 +470,36 @@ export class TdaiCore { this.scheduler.setPersister(createPersister(this.dataDir, this.logger)); // L2 runner - this.scheduler.setL2Runner(async (sessionKey: string, cursor?: string) => { - const l2Runner = createL2Runner({ - pluginDataDir: this.dataDir, - cfg: this.cfg, - openclawConfig, - vectorStore: this.vectorStore, - logger: this.logger, - instanceId: this.instanceId, - llmRunner: l2l3LlmRunner, + if (this.cfg.pipeline.enableL2) { + this.scheduler.setL2Runner(async (sessionKey: string, cursor?: string) => { + const l2Runner = createL2Runner({ + pluginDataDir: this.dataDir, + cfg: this.cfg, + openclawConfig, + vectorStore: this.vectorStore, + logger: this.logger, + instanceId: this.instanceId, + llmRunner: l2l3LlmRunner, + }); + return l2Runner(sessionKey, cursor); }); - return l2Runner(sessionKey, cursor); - }); + } // L3 runner - this.scheduler.setL3Runner(async () => { - const l3Runner = createL3Runner({ - pluginDataDir: this.dataDir, - cfg: this.cfg, - openclawConfig, - vectorStore: this.vectorStore, - logger: this.logger, - instanceId: this.instanceId, - llmRunner: l2l3LlmRunner, + if (this.cfg.pipeline.enableL3) { + this.scheduler.setL3Runner(async () => { + const l3Runner = createL3Runner({ + pluginDataDir: this.dataDir, + cfg: this.cfg, + openclawConfig, + vectorStore: this.vectorStore, + logger: this.logger, + instanceId: this.instanceId, + llmRunner: l2l3LlmRunner, + }); + await l3Runner(); }); - await l3Runner(); - }); + } this.logger.debug?.(`${TAG} Pipeline runners wired`); } diff --git a/src/utils/pipeline-factory.ts b/src/utils/pipeline-factory.ts index 4595753..b47c68d 100644 --- a/src/utils/pipeline-factory.ts +++ b/src/utils/pipeline-factory.ts @@ -664,6 +664,8 @@ export function createPipelineManager( { everyNConversations: cfg.pipeline.everyNConversations, enableWarmup: cfg.pipeline.enableWarmup, + enableL2: cfg.pipeline.enableL2, + enableL3: cfg.pipeline.enableL3, l1: { idleTimeoutSeconds: cfg.pipeline.l1IdleTimeoutSeconds }, l2: { delayAfterL1Seconds: cfg.pipeline.l2DelayAfterL1Seconds, diff --git a/src/utils/pipeline-manager.test.ts b/src/utils/pipeline-manager.test.ts new file mode 100644 index 0000000..e2af580 --- /dev/null +++ b/src/utils/pipeline-manager.test.ts @@ -0,0 +1,68 @@ +import { describe, expect, it } from "vitest"; +import { MemoryPipelineManager, type PipelineConfig } from "./pipeline-manager.js"; + +function baseConfig(overrides: Partial = {}): PipelineConfig { + return { + enableL2: true, + enableL3: true, + enableWarmup: false, + everyNConversations: 1, + l1: { idleTimeoutSeconds: 60 }, + l2: { + delayAfterL1Seconds: 0, + maxIntervalSeconds: 60, + minIntervalSeconds: 0, + sessionActiveWindowHours: 24, + }, + ...overrides, + }; +} + +async function waitFor(predicate: () => boolean): Promise { + const deadline = Date.now() + 1_000; + while (Date.now() < deadline) { + if (predicate()) return; + await new Promise((resolve) => setTimeout(resolve, 10)); + } + throw new Error("condition not met"); +} + +describe("MemoryPipelineManager stage controls", () => { + it("does not schedule L2 or L3 when both stages are disabled", async () => { + const manager = new MemoryPipelineManager(baseConfig({ + enableL2: false, + enableL3: false, + })); + let l1Calls = 0; + let l2Calls = 0; + let l3Calls = 0; + + manager.setL1Runner(async () => { + l1Calls += 1; + }); + manager.setL2Runner(async () => { + l2Calls += 1; + }); + manager.setL3Runner(async () => { + l3Calls += 1; + }); + manager.start({}); + + await manager.notifyConversation("session-1", [ + { role: "user", content: "remember this", timestamp: new Date().toISOString() }, + ]); + await waitFor(() => l1Calls === 1); + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(l2Calls).toBe(0); + expect(l3Calls).toBe(0); + expect(manager.getQueueSizes()).toMatchObject({ + l2: 0, + l2Pending: false, + l3: 0, + l3Pending: false, + }); + + await manager.destroy(); + }); +}); diff --git a/src/utils/pipeline-manager.ts b/src/utils/pipeline-manager.ts index b56234c..56229aa 100644 --- a/src/utils/pipeline-manager.ts +++ b/src/utils/pipeline-manager.ts @@ -120,6 +120,12 @@ export interface PipelineConfig { */ enableWarmup: boolean; + /** Enable downstream L2 scene extraction after L1 succeeds. */ + enableL2: boolean; + + /** Enable downstream L3 persona generation after L2 succeeds. */ + enableL3: boolean; + l1: { /** Idle timeout before triggering L1 (seconds, default: 60) */ idleTimeoutSeconds: number; @@ -198,6 +204,8 @@ export class MemoryPipelineManager { private readonly l1IdleTimeoutMs: number; private readonly everyNConversations: number; private readonly enableWarmup: boolean; + private readonly enableL2: boolean; + private readonly enableL3: boolean; private readonly l2DelayAfterL1Ms: number; private readonly l2MinIntervalMs: number; private readonly l2MaxIntervalMs: number; @@ -255,6 +263,8 @@ export class MemoryPipelineManager { this.l1IdleTimeoutMs = config.l1.idleTimeoutSeconds * 1000; this.everyNConversations = config.everyNConversations; this.enableWarmup = config.enableWarmup; + this.enableL2 = config.enableL2; + this.enableL3 = config.enableL3; this.l2DelayAfterL1Ms = config.l2.delayAfterL1Seconds * 1000; this.l2MinIntervalMs = config.l2.minIntervalSeconds * 1000; this.l2MaxIntervalMs = config.l2.maxIntervalSeconds * 1000; @@ -265,6 +275,8 @@ export class MemoryPipelineManager { this.logger?.debug?.( `${TAG} Initialized: everyNConversations=${config.everyNConversations}, ` + `warmup=${config.enableWarmup ? "enabled" : "disabled"}, ` + + `l2=${config.enableL2 ? "enabled" : "disabled"}, ` + + `l3=${config.enableL3 ? "enabled" : "disabled"}, ` + `l1IdleTimeout=${config.l1.idleTimeoutSeconds}s, ` + `l2DelayAfterL1=${config.l2.delayAfterL1Seconds}s, ` + `l2MinInterval=${config.l2.minIntervalSeconds}s, ` + @@ -686,11 +698,11 @@ export class MemoryPipelineManager { if (!this.l1Runner) { this.logger?.warn(`${TAG} [${sessionKey}] No L1 runner set, skipping`); - state.l2_pending_l1_count = state.conversation_count; + state.l2_pending_l1_count = this.enableL2 ? state.conversation_count : 0; state.conversation_count = 0; this.advanceWarmupThreshold(state); await this.persistStates(); - this.advanceL2Timer(sessionKey); + if (this.enableL2) this.advanceL2Timer(sessionKey); return; } @@ -738,13 +750,13 @@ export class MemoryPipelineManager { // Success: reset retry count and advance state const timers = this.getOrCreateTimers(sessionKey); timers.l1RetryCount = 0; - state.l2_pending_l1_count = state.conversation_count; + state.l2_pending_l1_count = this.enableL2 ? state.conversation_count : 0; state.conversation_count = 0; this.advanceWarmupThreshold(state); await this.persistStates(); // Advance the L2 timer (downward-only) to fire after delay, respecting minInterval - this.advanceL2Timer(sessionKey); + if (this.enableL2) this.advanceL2Timer(sessionKey); } // ============================ @@ -762,6 +774,7 @@ export class MemoryPipelineManager { */ private advanceL2Timer(sessionKey: string): void { if (this.destroyed) return; + if (!this.enableL2) return; const timers = this.getOrCreateTimers(sessionKey); const now = Date.now(); @@ -796,6 +809,7 @@ export class MemoryPipelineManager { */ private armL2MaxInterval(sessionKey: string): void { if (this.destroyed) return; + if (!this.enableL2) return; const timers = this.getOrCreateTimers(sessionKey); const fireAt = Date.now() + this.l2MaxIntervalMs; @@ -819,6 +833,8 @@ export class MemoryPipelineManager { * - "max-interval": periodic timer — apply cold check normally. */ private onL2TimerFired(sessionKey: string, source: "delay-after-l1" | "max-interval"): void { + if (!this.enableL2) return; + const state = this.sessionStates.get(sessionKey); if (!state) return; @@ -844,6 +860,8 @@ export class MemoryPipelineManager { // ============================ private enqueueL2(sessionKey: string, trigger: string): void { + if (!this.enableL2) return; + const timers = this.getOrCreateTimers(sessionKey); // Cancel any pending L2 timer (we're about to run L2) @@ -919,7 +937,7 @@ export class MemoryPipelineManager { this.armL2MaxInterval(sessionKey); // Trigger L3 - this.triggerL3(); + if (this.enableL3) this.triggerL3(); } // ============================ @@ -928,6 +946,7 @@ export class MemoryPipelineManager { private triggerL3(): void { if (this.destroyed) return; + if (!this.enableL3) return; if (this.l3Running) { // L3 is in progress — mark pending so it runs again after current finishes @@ -1096,6 +1115,8 @@ export class MemoryPipelineManager { * because the pipeline may be starting during management commands. */ private recoverPendingSessions(): void { + if (!this.enableL2) return; + for (const [sessionKey, state] of this.sessionStates) { if (state.conversation_count === 0 && state.l2_pending_l1_count === 0) continue;