Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ docker exec -it hermes-memory hermes
| `recall.strategy` | `"hybrid"` | Recall strategy: `keyword` / `embedding` / `hybrid` (RRF fusion, recommended) |
| `recall.maxResults` | `5` | Number of items returned per recall |
| `pipeline.everyNConversations` | `5` | Trigger an L1 memory extraction every N turns |
| `pipeline.enableL2` | `true` | Enable L2 scene extraction after L1 |
| `pipeline.enableL3` | `true` | Enable L3 persona generation after L2 |
| `extraction.maxMemoriesPerSession` | `20` | Max memories extracted per L1 pass |
| `persona.triggerEveryN` | `50` | Generate the user persona every N new memories |
| `offload.enabled` | `false` | Whether to enable short-term compression |
Expand Down
2 changes: 2 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ docker exec -it hermes-memory hermes
| `recall.strategy` | `"hybrid"` | 召回策略:`keyword` / `embedding` / `hybrid`(RRF 融合,推荐) |
| `recall.maxResults` | `5` | 每次召回条数 |
| `pipeline.everyNConversations` | `5` | 每 N 轮对话触发一次 L1 记忆提取 |
| `pipeline.enableL2` | `true` | L1 后是否启用 L2 场景提取 |
| `pipeline.enableL3` | `true` | L2 后是否启用 L3 画像生成 |
| `extraction.maxMemoriesPerSession` | `20` | 单次 L1 最多提取多少条 |
| `persona.triggerEveryN` | `50` | 每 N 条新记忆触发用户画像生成 |
| `offload.enabled` | `false` | 是否启用短期记忆压缩 |
Expand Down
6 changes: 6 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ export interface PipelineTriggerConfig {
everyNConversations: number;
/** Enable warm-up: start threshold at 1, double after each L1 (1→2→4→...→everyN) (default: true) */
enableWarmup: boolean;
/** Enable L2 scene extraction scheduling after L1 completes (default: true) */
enableL2: boolean;
/** Enable L3 persona generation after L2 completes (default: true) */
enableL3: boolean;
/** L1 idle timeout: trigger L1 after this many seconds of inactivity (default: 600) */
l1IdleTimeoutSeconds: number;
/** L2 delay after L1: wait this many seconds after L1 completes before triggering L2 (default: 90) */
Expand Down Expand Up @@ -477,6 +481,8 @@ export function parseConfig(raw: Record<string, unknown> | undefined): MemoryTda
pipeline: {
everyNConversations: num(pipelineGroup, "everyNConversations") ?? 5,
enableWarmup: bool(pipelineGroup, "enableWarmup") ?? true,
enableL2: bool(pipelineGroup, "enableL2") ?? true,
enableL3: bool(pipelineGroup, "enableL3") ?? true,
l1IdleTimeoutSeconds: num(pipelineGroup, "l1IdleTimeoutSeconds") ?? 600,
l2DelayAfterL1Seconds: num(pipelineGroup, "l2DelayAfterL1Seconds") ?? 90,
l2MinIntervalSeconds: num(pipelineGroup, "l2MinIntervalSeconds") ?? 900,
Expand Down
82 changes: 82 additions & 0 deletions src/core/pipeline-controls.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { parseConfig } from "../config.js";
import { TdaiCore } from "./tdai-core.js";

const tempDirs: string[] = [];

function makeTempDir(): string {
const dir = mkdtempSync(join(tmpdir(), "memory-tdai-pipeline-controls-"));
tempDirs.push(dir);
return dir;
}

function createHostAdapter(dataDir: string, runnerCalls: Array<{ enableTools: boolean }>) {
return {
hostType: "standalone" as const,
getLLMRunnerFactory: () => ({
createRunner: (options: { enableTools?: boolean } = {}) => {
runnerCalls.push({ enableTools: options.enableTools ?? false });
return { run: async () => "" };
},
}),
getLogger: () => ({
debug: () => {},
error: () => {},
info: () => {},
warn: () => {},
}),
getRuntimeContext: () => ({
dataDir,
platform: "vitest",
sessionId: "session-1",
sessionKey: "chat-1:session-1",
userId: "user-1",
workspaceDir: "/tmp/workspace",
}),
};
}

afterEach(() => {
for (const dir of tempDirs.splice(0)) {
rmSync(dir, { recursive: true, force: true });
}
});

describe("pipeline controls", () => {
it("defaults L2 and L3 pipeline stages to enabled", () => {
const config = parseConfig({});

expect(config.pipeline.enableL2).toBe(true);
expect(config.pipeline.enableL3).toBe(true);
});

it("can enable L1 extraction without creating tool-enabled L2/L3 runners", async () => {
const runnerCalls: Array<{ enableTools: boolean }> = [];
const config = parseConfig({
extraction: { enabled: true },
pipeline: {
enableL2: false,
enableL3: false,
enableWarmup: false,
},
});
const core = new TdaiCore({
config,
hostAdapter: createHostAdapter(makeTempDir(), runnerCalls),
});

await core.initialize();
await core.destroy();

expect(runnerCalls).toEqual([{ enableTools: false }]);
expect(core.getScheduler()?.getQueueSizes()).toMatchObject({
l2: 0,
l2Pending: false,
l3: 0,
l3Pending: false,
});
});
});
40 changes: 23 additions & 17 deletions src/core/seed/seed-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ async function createSeedPipeline(opts: SeedRuntimeOptions): Promise<{ pipeline:
logger,
});
l1LlmRunner = runnerFactory.createRunner({ enableTools: false });
l2l3LlmRunner = runnerFactory.createRunner({ enableTools: true });
if (cfg.pipeline.enableL2 || cfg.pipeline.enableL3) {
l2l3LlmRunner = runnerFactory.createRunner({ enableTools: true });
}
logger.info(`${TAG} Seed using standalone LLM: model=${cfg.llm.model}`);
}

Expand All @@ -105,24 +107,28 @@ async function createSeedPipeline(opts: SeedRuntimeOptions): Promise<{ pipeline:
});

// Wire L2 runner via shared factory (same logic as index.ts live runtime)
pipeline.scheduler.setL2Runner(createL2Runner({
pluginDataDir: outputDir,
cfg,
openclawConfig,
vectorStore: pipeline.vectorStore,
logger,
llmRunner: l2l3LlmRunner,
}));
if (cfg.pipeline.enableL2) {
pipeline.scheduler.setL2Runner(createL2Runner({
pluginDataDir: outputDir,
cfg,
openclawConfig,
vectorStore: pipeline.vectorStore,
logger,
llmRunner: l2l3LlmRunner,
}));
}

// Wire L3 runner via shared factory (same logic as index.ts live runtime)
pipeline.scheduler.setL3Runner(createL3Runner({
pluginDataDir: outputDir,
cfg,
openclawConfig,
vectorStore: pipeline.vectorStore,
logger,
llmRunner: l2l3LlmRunner,
}));
if (cfg.pipeline.enableL3) {
pipeline.scheduler.setL3Runner(createL3Runner({
pluginDataDir: outputDir,
cfg,
openclawConfig,
vectorStore: pipeline.vectorStore,
logger,
llmRunner: l2l3LlmRunner,
}));
}

return { pipeline, cfg };
}
Expand Down
51 changes: 28 additions & 23 deletions src/core/tdai-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ export class TdaiCore {
const l1LlmRunner = useStandaloneRunner
? runnerFactory.createRunner({ enableTools: false })
: undefined;
const l2l3LlmRunner = useStandaloneRunner
const needsToolRunner = this.cfg.pipeline.enableL2 || this.cfg.pipeline.enableL3;
const l2l3LlmRunner = useStandaloneRunner && needsToolRunner
? runnerFactory.createRunner({ enableTools: true })
: undefined;

Expand All @@ -469,32 +470,36 @@ export class TdaiCore {
this.scheduler.setPersister(createPersister(this.dataDir, this.logger));

// L2 runner
this.scheduler.setL2Runner(async (sessionKey: string, cursor?: string) => {
const l2Runner = createL2Runner({
pluginDataDir: this.dataDir,
cfg: this.cfg,
openclawConfig,
vectorStore: this.vectorStore,
logger: this.logger,
instanceId: this.instanceId,
llmRunner: l2l3LlmRunner,
if (this.cfg.pipeline.enableL2) {
this.scheduler.setL2Runner(async (sessionKey: string, cursor?: string) => {
const l2Runner = createL2Runner({
pluginDataDir: this.dataDir,
cfg: this.cfg,
openclawConfig,
vectorStore: this.vectorStore,
logger: this.logger,
instanceId: this.instanceId,
llmRunner: l2l3LlmRunner,
});
return l2Runner(sessionKey, cursor);
});
return l2Runner(sessionKey, cursor);
});
}

// L3 runner
this.scheduler.setL3Runner(async () => {
const l3Runner = createL3Runner({
pluginDataDir: this.dataDir,
cfg: this.cfg,
openclawConfig,
vectorStore: this.vectorStore,
logger: this.logger,
instanceId: this.instanceId,
llmRunner: l2l3LlmRunner,
if (this.cfg.pipeline.enableL3) {
this.scheduler.setL3Runner(async () => {
const l3Runner = createL3Runner({
pluginDataDir: this.dataDir,
cfg: this.cfg,
openclawConfig,
vectorStore: this.vectorStore,
logger: this.logger,
instanceId: this.instanceId,
llmRunner: l2l3LlmRunner,
});
await l3Runner();
});
await l3Runner();
});
}

this.logger.debug?.(`${TAG} Pipeline runners wired`);
}
Expand Down
2 changes: 2 additions & 0 deletions src/utils/pipeline-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ export function createPipelineManager(
{
everyNConversations: cfg.pipeline.everyNConversations,
enableWarmup: cfg.pipeline.enableWarmup,
enableL2: cfg.pipeline.enableL2,
enableL3: cfg.pipeline.enableL3,
l1: { idleTimeoutSeconds: cfg.pipeline.l1IdleTimeoutSeconds },
l2: {
delayAfterL1Seconds: cfg.pipeline.l2DelayAfterL1Seconds,
Expand Down
68 changes: 68 additions & 0 deletions src/utils/pipeline-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { describe, expect, it } from "vitest";
import { MemoryPipelineManager, type PipelineConfig } from "./pipeline-manager.js";

function baseConfig(overrides: Partial<PipelineConfig> = {}): PipelineConfig {
return {
enableL2: true,
enableL3: true,
enableWarmup: false,
everyNConversations: 1,
l1: { idleTimeoutSeconds: 60 },
l2: {
delayAfterL1Seconds: 0,
maxIntervalSeconds: 60,
minIntervalSeconds: 0,
sessionActiveWindowHours: 24,
},
...overrides,
};
}

async function waitFor(predicate: () => boolean): Promise<void> {
const deadline = Date.now() + 1_000;
while (Date.now() < deadline) {
if (predicate()) return;
await new Promise((resolve) => setTimeout(resolve, 10));
}
throw new Error("condition not met");
}

describe("MemoryPipelineManager stage controls", () => {
it("does not schedule L2 or L3 when both stages are disabled", async () => {
const manager = new MemoryPipelineManager(baseConfig({
enableL2: false,
enableL3: false,
}));
let l1Calls = 0;
let l2Calls = 0;
let l3Calls = 0;

manager.setL1Runner(async () => {
l1Calls += 1;
});
manager.setL2Runner(async () => {
l2Calls += 1;
});
manager.setL3Runner(async () => {
l3Calls += 1;
});
manager.start({});

await manager.notifyConversation("session-1", [
{ role: "user", content: "remember this", timestamp: new Date().toISOString() },
]);
await waitFor(() => l1Calls === 1);
await new Promise((resolve) => setTimeout(resolve, 50));

expect(l2Calls).toBe(0);
expect(l3Calls).toBe(0);
expect(manager.getQueueSizes()).toMatchObject({
l2: 0,
l2Pending: false,
l3: 0,
l3Pending: false,
});

await manager.destroy();
});
});
Loading