From a4a6ad12c42029fffe4a11f03099cbdc495a8198 Mon Sep 17 00:00:00 2001 From: Adam Jacob Date: Thu, 9 Apr 2026 08:00:47 -0700 Subject: [PATCH] fix: make CatalogStore required on FileSystemUnifiedDataRepository MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The catalog SQLite index was an optional constructor parameter, which meant repository instances created without it silently skipped write-through catalog updates. In particular, executeModelMethod in WorkflowExecutionService created a data repo without the catalog, so data written by workflow steps was never indexed — causing data.latest() to return null and produce "No such key: attributes" errors in subsequent steps. Making CatalogStore required turns this class of bug into a compile error. Every FileSystemUnifiedDataRepository now maintains write-through catalog consistency. A createCatalogStore() factory helper centralizes construction. Also makes queryData required on ModelMethodRunDeps and catalogStore required on WorkflowRunDeps, removes redundant optional chaining across CLI/serve code, and updates design/data-query.md to reflect the new semantics. Fixes swamp-club#39 Co-authored-by: Claude Opus 4.6 (1M context) --- design/data-query.md | 7 +- integration/cel_data_access_test.ts | 78 ++++++++-- integration/cross_model_data_access_test.ts | 37 ++++- integration/data_expression_test.ts | 54 +++++-- integration/data_output_specs_test.ts | 20 ++- integration/data_ownership_test.ts | 79 ++++++++-- integration/data_tagging_test.ts | 78 ++++++++-- integration/data_versioning_test.ts | 78 ++++++++-- integration/unified_data_test.ts | 85 ++++++++-- integration/vary_test.ts | 30 +++- integration/workflow_new_architecture_test.ts | 7 +- src/cli/commands/model_evaluate.ts | 2 +- src/cli/commands/model_method_run.ts | 27 ++-- src/cli/commands/serve.ts | 2 +- src/cli/commands/workflow_evaluate.ts | 2 +- src/cli/commands/workflow_run.ts | 2 +- src/cli/repo_context.ts | 2 +- src/domain/data/data_query_service_test.ts | 8 +- src/domain/expressions/model_resolver_test.ts | 123 +++++++++++---- src/domain/workflows/execution_service.ts | 16 +- .../workflows/execution_service_test.ts | 115 ++++++++++++++ .../persistence/repository_factory.ts | 36 ++++- .../persistence/unified_data_repository.ts | 6 +- .../unified_data_repository_test.ts | 145 +++++++++++++++--- src/libswamp/data/gc.ts | 2 + src/libswamp/data/get.ts | 2 + src/libswamp/data/list.ts | 2 + src/libswamp/data/rename.ts | 2 + src/libswamp/data/versions.ts | 2 + src/libswamp/models/delete.ts | 2 + src/libswamp/models/evaluate.ts | 2 + src/libswamp/models/output_data.ts | 2 + src/libswamp/models/output_logs.ts | 2 + src/libswamp/models/run.ts | 4 +- src/libswamp/models/run_test.ts | 1 + src/libswamp/models/validate.ts | 2 + src/libswamp/reports/search.ts | 2 + src/libswamp/workflows/evaluate.ts | 13 +- src/libswamp/workflows/run.ts | 4 +- src/libswamp/workflows/run_test.ts | 19 ++- src/serve/connection.ts | 2 +- src/serve/deps.ts | 27 ++-- 42 files changed, 912 insertions(+), 219 deletions(-) diff --git a/design/data-query.md b/design/data-query.md index e2784110..58763821 100644 --- a/design/data-query.md +++ b/design/data-query.md @@ -222,9 +222,10 @@ Every mutation in `UnifiedDataRepository` updates the catalog inline: | `removeLatestMarker()` | Remove row | | `collectGarbage()` | Update version or remove row | -The `CatalogStore` is an optional constructor parameter on -`UnifiedDataRepository`. When absent (tests, lightweight contexts), the -repository behaves as before. +The `CatalogStore` is a required constructor parameter on +`UnifiedDataRepository`. Every repository instance maintains write-through +catalog consistency. Use `createCatalogStore()` from `repository_factory.ts` +to construct one from a repo directory. ### Population Strategy diff --git a/integration/cel_data_access_test.ts b/integration/cel_data_access_test.ts index 4ac8a8da..9eeadbc0 100644 --- a/integration/cel_data_access_test.ts +++ b/integration/cel_data_access_test.ts @@ -198,7 +198,11 @@ Deno.test("CEL Data Access: reference hyphenated model name", async () => { Deno.test("CEL Data Access: access latest resource via model.X.resource.specName", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:create"); @@ -256,7 +260,11 @@ Deno.test("CEL Data Access: access latest resource via model.X.resource.specName Deno.test("CEL Data Access: access specific version via data.version()", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:version-access"); @@ -327,7 +335,11 @@ Deno.test("CEL Data Access: access specific version via data.version()", async ( Deno.test("CEL Data Access: access data via data.latest()", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:latest"); @@ -385,7 +397,11 @@ Deno.test("CEL Data Access: access data via data.latest()", async () => { Deno.test("CEL Data Access: list all versions via data.listVersions()", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:list"); @@ -435,7 +451,11 @@ Deno.test("CEL Data Access: list all versions via data.listVersions()", async () Deno.test("CEL Data Access: reference resource from dependent model", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:cross-ref"); @@ -498,7 +518,11 @@ Deno.test("CEL Data Access: reference resource from dependent model", async () = Deno.test("CEL Data Access: chain data references across multiple models", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:chain"); @@ -752,7 +776,11 @@ Deno.test("CEL Data Access: handle missing model gracefully", async () => { Deno.test("CEL Data Access: handle missing data gracefully", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -785,7 +813,11 @@ Deno.test("CEL Data Access: handle missing data gracefully", async () => { Deno.test("CEL Data Access: multiple resource items from same model", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:multi-data"); @@ -845,7 +877,11 @@ Deno.test( async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:spec-name-test"); @@ -904,7 +940,11 @@ Deno.test( async () => { await withTempDir(async (repoDir) => { await initializeTestRepo(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:cli-spec-name-test"); @@ -965,7 +1005,11 @@ Deno.test( Deno.test("CEL Data Access: cross-type resource with specName tag via ExpressionEvaluationService (#370)", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const owner = createOwner("user/s3-inventory:create"); @@ -1032,7 +1076,11 @@ Deno.test("CEL Data Access: cross-type resource with specName tag via Expression Deno.test("CEL Data Access: resource resolves after model delete and recreate with new UUID (#370)", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const inventoryType = ModelType.create("@user/s3-inventory"); const reportType = ModelType.create("@user/s3-report"); @@ -1109,7 +1157,11 @@ Deno.test("CEL Data Access: resource resolves after model delete and recreate wi Deno.test("CEL Data Access: data.latest() sees data written after buildContext()", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:fresh-data"); diff --git a/integration/cross_model_data_access_test.ts b/integration/cross_model_data_access_test.ts index f8e853d1..59eb1511 100644 --- a/integration/cross_model_data_access_test.ts +++ b/integration/cross_model_data_access_test.ts @@ -37,6 +37,7 @@ import { DataAccessService } from "../src/domain/data/data_access_service.ts"; import { ModelType } from "../src/domain/models/model_type.ts"; import { Definition } from "../src/domain/definitions/definition.ts"; import { FileSystemUnifiedDataRepository } from "../src/infrastructure/persistence/unified_data_repository.ts"; +import { CatalogStore } from "../src/infrastructure/persistence/catalog_store.ts"; import { YamlDefinitionRepository } from "../src/infrastructure/persistence/yaml_definition_repository.ts"; import { computeDefinitionHash } from "../src/domain/models/model_output.ts"; @@ -92,7 +93,11 @@ Deno.test("cross-model data access: read another model's data by name", async () await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const sourceType = ModelType.create("test/source"); // Create source model definition @@ -128,7 +133,11 @@ Deno.test("cross-model data access: filter by specName", async () => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const sourceType = ModelType.create("test/source"); const sourceDef = Definition.create({ @@ -179,7 +188,11 @@ Deno.test("cross-model data access: non-existent model returns empty array", asy await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const service = new DataAccessService(defRepo, dataRepo); const records = await service.readModelData("nonexistent-model"); @@ -193,7 +206,11 @@ Deno.test("cross-model data access: model with no data returns empty array", asy await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const sourceType = ModelType.create("test/empty"); // Create model but don't write any data @@ -215,7 +232,11 @@ Deno.test("cross-model data access: workflowRunId scoping returns only matching await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const sourceType = ModelType.create("test/source"); const sourceDef = Definition.create({ @@ -290,7 +311,11 @@ Deno.test("cross-model data access: orphan recovery reads content from old UUID await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const sourceType = ModelType.create("test/orphan"); // Step 1: Create original model and write data diff --git a/integration/data_expression_test.ts b/integration/data_expression_test.ts index de5cd669..3a6e6a81 100644 --- a/integration/data_expression_test.ts +++ b/integration/data_expression_test.ts @@ -68,7 +68,11 @@ function createOwner(ref: string): OwnerDefinition { Deno.test("Integration: model.X.resource.specName accesses latest version of resource", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -148,7 +152,11 @@ Deno.test("Integration: model.X.resource.specName accesses latest version of res Deno.test("Integration: data.version() retrieves specific version", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -213,7 +221,11 @@ Deno.test("Integration: data.version() retrieves specific version", async () => Deno.test("Integration: data.version() returns null for missing version", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -261,7 +273,11 @@ Deno.test("Integration: data.version() returns null for missing version", async Deno.test("Integration: data.latest() retrieves latest version", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -324,7 +340,11 @@ Deno.test("Integration: data.latest() retrieves latest version", async () => { Deno.test("Integration: data.listVersions() returns sorted version numbers", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -370,7 +390,11 @@ Deno.test("Integration: data.listVersions() returns sorted version numbers", asy Deno.test("Integration: data.listVersions() returns empty array for missing data", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -400,7 +424,11 @@ Deno.test("Integration: data.listVersions() returns empty array for missing data Deno.test("Integration: data.findByTag() returns matching records", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -509,7 +537,11 @@ Deno.test("Integration: data.findByTag() returns matching records", async () => Deno.test("Integration: model can have multiple named data items with mixed types", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -604,7 +636,11 @@ Deno.test("Integration: model can have multiple named data items with mixed type Deno.test("Integration: handles hyphenated model names in data expressions", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); diff --git a/integration/data_output_specs_test.ts b/integration/data_output_specs_test.ts index 1690ffbb..86fdb47d 100644 --- a/integration/data_output_specs_test.ts +++ b/integration/data_output_specs_test.ts @@ -23,6 +23,7 @@ * Tests verify that models properly declare and use resource/file output specifications. */ import { assertEquals, assertStringIncludes } from "@std/assert"; +import { join } from "@std/path"; import { getLogger } from "@logtape/logtape"; import { ModelType } from "../src/domain/models/model_type.ts"; import { @@ -33,6 +34,7 @@ import { Definition } from "../src/domain/definitions/definition.ts"; import { DefaultMethodExecutionService } from "../src/domain/models/method_execution_service.ts"; import { YamlDefinitionRepository } from "../src/infrastructure/persistence/yaml_definition_repository.ts"; import { FileSystemUnifiedDataRepository } from "../src/infrastructure/persistence/unified_data_repository.ts"; +import { CatalogStore } from "../src/infrastructure/persistence/catalog_store.ts"; import { createFileWriterFactory, createResourceWriter, @@ -78,7 +80,11 @@ Deno.test("Data output specs - shell model declares log file spec", () => { Deno.test("Data output specs - shell model execution produces valid resource handle", async () => { await withTempDir(async (repoDir) => { const definitionRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const executionService = new DefaultMethodExecutionService(); const modelType = ModelType.create("command/shell"); @@ -162,7 +168,11 @@ Deno.test("Data output specs - shell model execution produces valid resource han Deno.test("Data output specs - undeclared resource spec fails at writeResource", async () => { await withTempDir(async (repoDir) => { const definitionRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const executionService = new DefaultMethodExecutionService(); const shellModelType = ModelType.create("command/shell"); @@ -234,7 +244,11 @@ Deno.test("Data output specs - undeclared resource spec fails at writeResource", Deno.test("Data output specs - undeclared file spec fails at createFileWriter", async () => { await withTempDir(async (repoDir) => { const definitionRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const executionService = new DefaultMethodExecutionService(); const shellModelType = ModelType.create("command/shell"); diff --git a/integration/data_ownership_test.ts b/integration/data_ownership_test.ts index 7d59c6d8..cc850ca4 100644 --- a/integration/data_ownership_test.ts +++ b/integration/data_ownership_test.ts @@ -36,6 +36,7 @@ import { FileSystemUnifiedDataRepository, OwnershipValidationError, } from "../src/infrastructure/persistence/unified_data_repository.ts"; +import { CatalogStore } from "../src/infrastructure/persistence/catalog_store.ts"; async function withTempDir(fn: (dir: string) => Promise): Promise { const dir = await Deno.makeTempDir({ prefix: "swamp-data-ownership-" }); @@ -71,7 +72,11 @@ function createOwner( Deno.test("Data Ownership: create data with model-method owner", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("model-method", "test/model:create"); @@ -106,7 +111,11 @@ Deno.test("Data Ownership: create data with model-method owner", async () => { Deno.test("Data Ownership: create data with workflow-step owner", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const workflowId = crypto.randomUUID(); @@ -147,7 +156,11 @@ Deno.test("Data Ownership: create data with workflow-step owner", async () => { Deno.test("Data Ownership: create data with manual owner", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("manual", "user:admin@example.com"); @@ -184,7 +197,11 @@ Deno.test("Data Ownership: create data with manual owner", async () => { Deno.test("Data Ownership: same owner can write new versions", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("model-method", "test/model:update"); @@ -230,7 +247,11 @@ Deno.test("Data Ownership: same owner can write new versions", async () => { Deno.test("Data Ownership: different owner is rejected", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); @@ -292,7 +313,11 @@ Deno.test("Data Ownership: different owner is rejected", async () => { Deno.test("Data Ownership: different owner type is rejected", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); @@ -343,7 +368,11 @@ Deno.test("Data Ownership: different owner type is rejected", async () => { Deno.test("Data Ownership: modified ownerRef is rejected", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); @@ -400,7 +429,11 @@ Deno.test("Data Ownership: modified ownerRef is rejected", async () => { Deno.test("Data Ownership: multiple data items can have different owners", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); @@ -502,7 +535,11 @@ Deno.test("Data Ownership: multiple data items can have different owners", async Deno.test("Data Ownership: new data with non-existing name succeeds", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); @@ -541,7 +578,11 @@ Deno.test("Data Ownership: new data with non-existing name succeeds", async () = Deno.test("Data Ownership: same ownerType+ownerRef matches regardless of other fields", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); @@ -591,7 +632,11 @@ Deno.test("Data Ownership: same ownerType+ownerRef matches regardless of other f Deno.test("Data Ownership: rejected write doesn't corrupt existing data", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); @@ -657,7 +702,11 @@ Deno.test("Data Ownership: rejected write doesn't corrupt existing data", async Deno.test("Data Ownership: owner persists across multiple versions", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("model-method", "persistent-owner"); @@ -693,7 +742,11 @@ Deno.test("Data Ownership: owner persists across multiple versions", async () => Deno.test("Data Ownership: different models can own data with same name", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); // Two different model instances diff --git a/integration/data_tagging_test.ts b/integration/data_tagging_test.ts index c7096b9c..c7e18b50 100644 --- a/integration/data_tagging_test.ts +++ b/integration/data_tagging_test.ts @@ -67,7 +67,11 @@ function createOwner(ref: string): OwnerDefinition { Deno.test("Data Tagging: create data with required type tag", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:tag-test"); @@ -97,7 +101,11 @@ Deno.test("Data Tagging: create data with required type tag", async () => { Deno.test("Data Tagging: create data with multiple tags", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:multi-tag"); @@ -137,7 +145,11 @@ Deno.test("Data Tagging: create data with multiple tags", async () => { Deno.test("Data Tagging: tags persist across versions", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:tag-persist"); @@ -183,7 +195,11 @@ Deno.test("Data Tagging: tags persist across versions", async () => { Deno.test("Data Tagging: findByTag returns matching records", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:find-tag"); @@ -264,7 +280,11 @@ Deno.test("Data Tagging: findByTag returns matching records", async () => { Deno.test("Data Tagging: findByTag with custom tags", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:custom-tags"); @@ -339,7 +359,11 @@ Deno.test("Data Tagging: findByTag with custom tags", async () => { Deno.test("Data Tagging: findByTag returns only latest version with matching tag", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:latest-only"); @@ -404,7 +428,11 @@ Deno.test("Data Tagging: findByTag returns only latest version with matching tag Deno.test("Data Tagging: different type categories", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:categories"); @@ -473,7 +501,11 @@ Deno.test("Data Tagging: different type categories", async () => { Deno.test("Data Tagging: workflow and step tags", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:workflow-tags"); @@ -563,7 +595,11 @@ Deno.test("Data Tagging: workflow and step tags", async () => { Deno.test("Data Tagging: access tags via model.X.resource.specName.tags", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:tag-access"); @@ -623,7 +659,11 @@ Deno.test("Data Tagging: access tags via model.X.resource.specName.tags", async Deno.test("Data Tagging: multiple resource items with different tags", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:multi-data"); @@ -739,7 +779,11 @@ Deno.test("Data Tagging: multiple resource items with different tags", async () Deno.test("Data Tagging: empty findByTag results", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const modelResolver = new ModelResolver(definitionRepo, { @@ -759,7 +803,11 @@ Deno.test("Data Tagging: empty findByTag results", async () => { Deno.test("Data Tagging: special characters in tag values", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:special-tags"); @@ -807,7 +855,11 @@ Deno.test("Data Tagging: special characters in tag values", async () => { Deno.test("Data Tagging: tag-based organization in findAllForModel", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:find-all"); diff --git a/integration/data_versioning_test.ts b/integration/data_versioning_test.ts index f65ca115..00f10e24 100644 --- a/integration/data_versioning_test.ts +++ b/integration/data_versioning_test.ts @@ -67,7 +67,11 @@ function createOwner(ref: string): OwnerDefinition { Deno.test("Data Versioning: write multiple versions of same data", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:versioning"); @@ -117,7 +121,11 @@ Deno.test("Data Versioning: write multiple versions of same data", async () => { Deno.test("Data Versioning: retrieve specific versions", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:retrieve"); @@ -165,7 +173,11 @@ Deno.test("Data Versioning: retrieve specific versions", async () => { Deno.test("Data Versioning: latest symlink points to newest version", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:symlink"); @@ -220,7 +232,11 @@ Deno.test("Data Versioning: latest symlink points to newest version", async () = Deno.test("Data Versioning: garbage collection by version count", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:gc-count"); @@ -271,7 +287,11 @@ Deno.test("Data Versioning: garbage collection by version count", async () => { Deno.test("Data Versioning: garbage collection preserves minimum versions", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:gc-preserve"); @@ -309,7 +329,11 @@ Deno.test("Data Versioning: garbage collection preserves minimum versions", asyn Deno.test("Data Versioning: multiple data items with different GC policies", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:multi-gc"); @@ -371,7 +395,11 @@ Deno.test("Data Versioning: multiple data items with different GC policies", asy Deno.test("Data Versioning: access specific version via data.version()", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -468,7 +496,11 @@ Deno.test("Data Versioning: access specific version via data.version()", async ( Deno.test("Data Versioning: listVersions returns all versions in order", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); @@ -524,7 +556,11 @@ Deno.test("Data Versioning: listVersions returns all versions in order", async ( Deno.test("Data Versioning: each version has independent content", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:content-integrity"); @@ -569,7 +605,11 @@ Deno.test("Data Versioning: each version has independent content", async () => { Deno.test("Data Versioning: version metadata is preserved", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:metadata"); @@ -605,7 +645,11 @@ Deno.test("Data Versioning: version metadata is preserved", async () => { Deno.test("Data Versioning: non-existent version returns null", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:nonexistent"); @@ -639,7 +683,11 @@ Deno.test("Data Versioning: non-existent version returns null", async () => { Deno.test("Data Versioning: delete specific version", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:delete-version"); @@ -681,7 +729,11 @@ Deno.test("Data Versioning: delete specific version", async () => { Deno.test("Data Versioning: delete all versions", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:delete-all"); diff --git a/integration/unified_data_test.ts b/integration/unified_data_test.ts index ef85a7ec..47840c8f 100644 --- a/integration/unified_data_test.ts +++ b/integration/unified_data_test.ts @@ -37,6 +37,7 @@ import { FileSystemUnifiedDataRepository, OwnershipValidationError, } from "../src/infrastructure/persistence/unified_data_repository.ts"; +import { CatalogStore } from "../src/infrastructure/persistence/catalog_store.ts"; async function withTempDir(fn: (dir: string) => Promise): Promise { const dir = await Deno.makeTempDir({ prefix: "swamp-unified-data-" }); @@ -65,7 +66,11 @@ function createOwner(ref: string): OwnerDefinition { Deno.test("Integration: save creates directory structure and symlink", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:create"); @@ -107,7 +112,11 @@ Deno.test("Integration: save creates directory structure and symlink", async () Deno.test("Integration: save auto-increments version", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:version"); @@ -157,7 +166,11 @@ Deno.test("Integration: save auto-increments version", async () => { Deno.test("Integration: save validates ownership", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner1 = createOwner("test/model:owner1"); @@ -208,7 +221,11 @@ Deno.test("Integration: save validates ownership", async () => { Deno.test("Integration: findByName returns latest version by default", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:find"); @@ -234,7 +251,11 @@ Deno.test("Integration: findByName returns latest version by default", async () Deno.test("Integration: findByName returns specific version when requested", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:find"); @@ -263,7 +284,11 @@ Deno.test("Integration: findByName returns specific version when requested", asy Deno.test("Integration: listVersions returns sorted version list", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:list"); @@ -289,7 +314,11 @@ Deno.test("Integration: listVersions returns sorted version list", async () => { Deno.test("Integration: findAllForModel returns all data", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:all"); @@ -331,7 +360,11 @@ Deno.test("Integration: findAllForModel returns all data", async () => { Deno.test("Integration: getContent returns data content", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:content"); @@ -361,7 +394,11 @@ Deno.test("Integration: getContent returns data content", async () => { Deno.test("Integration: stream returns content chunks", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:stream"); @@ -408,7 +445,11 @@ Deno.test("Integration: stream returns content chunks", async () => { Deno.test("Integration: append works for streaming data", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:append"); @@ -460,7 +501,11 @@ Deno.test("Integration: append works for streaming data", async () => { Deno.test("Integration: delete removes specific version", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:delete"); @@ -489,7 +534,11 @@ Deno.test("Integration: delete removes specific version", async () => { Deno.test("Integration: delete removes all versions when no version specified", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:delete-all"); @@ -521,7 +570,11 @@ Deno.test("Integration: delete removes all versions when no version specified", Deno.test("Integration: collectGarbage removes old versions by count", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:gc"); @@ -556,7 +609,11 @@ Deno.test("Integration: collectGarbage removes old versions by count", async () Deno.test("Integration: full lifecycle - create, write versions, read by version", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const repo = new FileSystemUnifiedDataRepository(repoDir); + const repo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const type = ModelType.create("test/model"); const modelId = crypto.randomUUID(); const owner = createOwner("test/model:lifecycle"); diff --git a/integration/vary_test.ts b/integration/vary_test.ts index eb34b830..2ff3c130 100644 --- a/integration/vary_test.ts +++ b/integration/vary_test.ts @@ -142,7 +142,11 @@ async function runCliCommand( Deno.test("Vary: data.latest() resolves composite name from persisted data", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:vary"); @@ -210,7 +214,11 @@ Deno.test("Vary: data.latest() resolves composite name from persisted data", asy Deno.test("Vary: CEL data.latest() with vary array resolves composite name", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:cel-vary"); @@ -269,7 +277,11 @@ Deno.test("Vary: CEL data.latest() with vary array resolves composite name", asy Deno.test("Vary: each varied data name has independent versioning", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:versioning"); @@ -355,7 +367,11 @@ Deno.test("Vary: each varied data name has independent versioning", async () => Deno.test("Vary: CEL data.version() with vary resolves specific version", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:cel-version"); @@ -416,7 +432,11 @@ Deno.test("Vary: CEL data.version() with vary resolves specific version", async Deno.test("Vary: CEL data.listVersions() with vary lists correct versions", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const definitionRepo = new YamlDefinitionRepository(repoDir); const type = ModelType.create("test/model"); const owner = createOwner("test/model:cel-list"); diff --git a/integration/workflow_new_architecture_test.ts b/integration/workflow_new_architecture_test.ts index a5575b0b..a1073ff8 100644 --- a/integration/workflow_new_architecture_test.ts +++ b/integration/workflow_new_architecture_test.ts @@ -42,6 +42,7 @@ import { YamlWorkflowRepository } from "../src/infrastructure/persistence/yaml_w import { YamlDefinitionRepository } from "../src/infrastructure/persistence/yaml_definition_repository.ts"; import { YamlWorkflowRunRepository } from "../src/infrastructure/persistence/yaml_workflow_run_repository.ts"; import { FileSystemUnifiedDataRepository } from "../src/infrastructure/persistence/unified_data_repository.ts"; +import { CatalogStore } from "../src/infrastructure/persistence/catalog_store.ts"; import { SHELL_MODEL_TYPE } from "../src/domain/models/command/shell/shell_model.ts"; async function withTempDir(fn: (dir: string) => Promise): Promise { @@ -792,7 +793,11 @@ Deno.test("Workflow Architecture: data persists after workflow completion", asyn await setupRepoDir(repoDir); const definitionRepo = new YamlDefinitionRepository(repoDir); const workflowRepo = new YamlWorkflowRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + new CatalogStore(join(repoDir, "_catalog.db")), + ); const model = Definition.create({ name: "persist-data-model", diff --git a/src/cli/commands/model_evaluate.ts b/src/cli/commands/model_evaluate.ts index e1f866fb..dc7c1829 100644 --- a/src/cli/commands/model_evaluate.ts +++ b/src/cli/commands/model_evaluate.ts @@ -92,7 +92,7 @@ export const modelEvaluateCommand = new Command() const lockResult = await acquireModelLocks(datastoreConfig, [ { modelType: type.normalized, modelId: definition.id }, ], repoDir); - if (lockResult.synced) repoContext.catalogStore?.invalidate(); + if (lockResult.synced) repoContext.catalogStore.invalidate(); const flushModelLocks = lockResult.flush; const ctx = createLibSwampContext({ logger: cliCtx.logger }); diff --git a/src/cli/commands/model_method_run.ts b/src/cli/commands/model_method_run.ts index f4874782..6bb99d95 100644 --- a/src/cli/commands/model_method_run.ts +++ b/src/cli/commands/model_method_run.ts @@ -167,20 +167,20 @@ export const modelMethodRunCommand = new Command() lookupDefinition: (idOrName) => findDefinitionByIdOrName(repoContext.definitionRepo, idOrName), getModelDef: (type) => resolveModelType(type, getAutoResolver()), - createEvaluationService: () => - new ExpressionEvaluationService( + createEvaluationService: () => { + const dqs = new DataQueryService( + repoContext.catalogStore, + repoContext.unifiedDataRepo, + ); + return new ExpressionEvaluationService( repoContext.definitionRepo, repoDir, { dataRepo: repoContext.unifiedDataRepo, - dataQueryService: repoContext.catalogStore - ? new DataQueryService( - repoContext.catalogStore, - repoContext.unifiedDataRepo, - ) - : undefined, + dataQueryService: dqs, }, - ), + ); + }, loadEvaluatedDefinition: (type, name) => repoContext.evaluatedDefinitionRepo.findByName(type, name), saveEvaluatedDefinition: (type, definition) => @@ -190,15 +190,14 @@ export const modelMethodRunCommand = new Command() dataRepo: repoContext.unifiedDataRepo, definitionRepo: repoContext.definitionRepo, outputRepo: repoContext.outputRepo, - queryData: repoContext.catalogStore - ? ((dqs) => (predicate: string, select?: string) => + queryData: + ((dqs) => (predicate: string, select?: string) => dqs.query(predicate, { select }))( new DataQueryService( repoContext.catalogStore, repoContext.unifiedDataRepo, ), - ) - : undefined, + ), createRunLog: async (modelType, method, definitionId) => { const redactor = new SecretRedactor(); const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); @@ -242,7 +241,7 @@ export const modelMethodRunCommand = new Command() modelId: preResult.definition.id, }, ], repoDir); - if (lockResult.synced) repoContext.catalogStore?.invalidate(); + if (lockResult.synced) repoContext.catalogStore.invalidate(); flushModelLocks = lockResult.flush; } diff --git a/src/cli/commands/serve.ts b/src/cli/commands/serve.ts index 6dc523a9..e633645d 100644 --- a/src/cli/commands/serve.ts +++ b/src/cli/commands/serve.ts @@ -321,5 +321,5 @@ export const serveCommand = new Command() await server.finished; - repoContext.catalogStore?.close(); + repoContext.catalogStore.close(); }); diff --git a/src/cli/commands/workflow_evaluate.ts b/src/cli/commands/workflow_evaluate.ts index 0d031fdb..67293997 100644 --- a/src/cli/commands/workflow_evaluate.ts +++ b/src/cli/commands/workflow_evaluate.ts @@ -166,7 +166,7 @@ export const workflowEvaluateCommand = new Command() unlocked.repoDir, ); if (lockResult.synced) { - unlocked.repoContext.catalogStore?.invalidate(); + unlocked.repoContext.catalogStore.invalidate(); } flushModelLocks = lockResult.flush; } diff --git a/src/cli/commands/workflow_run.ts b/src/cli/commands/workflow_run.ts index af4bb52e..d10b72d2 100644 --- a/src/cli/commands/workflow_run.ts +++ b/src/cli/commands/workflow_run.ts @@ -182,7 +182,7 @@ export const workflowRunCommand = new Command() unlocked.repoDir, ); if (lockResult.synced) { - unlocked.repoContext.catalogStore?.invalidate(); + unlocked.repoContext.catalogStore.invalidate(); } flushModelLocks = lockResult.flush; } diff --git a/src/cli/repo_context.ts b/src/cli/repo_context.ts index bd5547f7..973a4e8a 100644 --- a/src/cli/repo_context.ts +++ b/src/cli/repo_context.ts @@ -388,7 +388,7 @@ export function requireInitializedRepo( // If a remote sync pulled fresh data, invalidate the catalog so the // next query backfills from the freshly-pulled local cache. if (needsCatalogInvalidation) { - repoContext.catalogStore?.invalidate(); + repoContext.catalogStore.invalidate(); } return { diff --git a/src/domain/data/data_query_service_test.ts b/src/domain/data/data_query_service_test.ts index 17fbd27b..c3172f8d 100644 --- a/src/domain/data/data_query_service_test.ts +++ b/src/domain/data/data_query_service_test.ts @@ -66,7 +66,7 @@ function setupTest(): { const dbPath = join(dir, ".swamp", "data", "_catalog.db"); const catalog = new CatalogStore(dbPath); catalog.markPopulated(); // Pre-mark to avoid backfill - const dataRepo = new FileSystemUnifiedDataRepository(dir); + const dataRepo = new FileSystemUnifiedDataRepository(dir, undefined, catalog); const service = new DataQueryService(catalog, dataRepo); return { catalog, service, dir }; } @@ -284,7 +284,7 @@ Deno.test("DataQueryService: attributes filter with content on disk", () => { catalog.upsert(makeRow()); - const dataRepo = new FileSystemUnifiedDataRepository(dir); + const dataRepo = new FileSystemUnifiedDataRepository(dir, undefined, catalog); const service = new DataQueryService(catalog, dataRepo); const results = service.querySync( @@ -350,7 +350,7 @@ Deno.test("DataQueryService: select loads attributes for map literal projection" catalog.upsert(makeRow()); - const dataRepo = new FileSystemUnifiedDataRepository(dir); + const dataRepo = new FileSystemUnifiedDataRepository(dir, undefined, catalog); const service = new DataQueryService(catalog, dataRepo); // Filter doesn't reference attributes, but select does (in a map literal). @@ -408,7 +408,7 @@ Deno.test("DataQueryService: backfill triggers on unpopulated catalog", async () "1", ); - const dataRepo = new FileSystemUnifiedDataRepository(dir); + const dataRepo = new FileSystemUnifiedDataRepository(dir, undefined, catalog); const service = new DataQueryService(catalog, dataRepo); // Should trigger backfill since catalog is not populated diff --git a/src/domain/expressions/model_resolver_test.ts b/src/domain/expressions/model_resolver_test.ts index d7fcf500..29e4e644 100644 --- a/src/domain/expressions/model_resolver_test.ts +++ b/src/domain/expressions/model_resolver_test.ts @@ -57,7 +57,12 @@ Deno.test("data.latest() reads from disk synchronously", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -80,8 +85,6 @@ Deno.test("data.latest() reads from disk synchronously", async () => { data, new TextEncoder().encode(JSON.stringify({ value: 42 })), ); - - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -108,7 +111,12 @@ Deno.test("data.latest() sees data written after buildContext()", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -134,7 +142,6 @@ Deno.test("data.latest() sees data written after buildContext()", async () => { ); // Build context - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -175,7 +182,12 @@ Deno.test("data.version() reads specific version from disk", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalogStore = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalogStore, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -221,7 +233,12 @@ Deno.test("data.listVersions() returns sorted version numbers", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalogStore = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalogStore, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -265,7 +282,12 @@ Deno.test("data.findByTag() returns matching records from disk", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -305,8 +327,6 @@ Deno.test("data.findByTag() returns matching records from disk", async () => { otherData, new TextEncoder().encode(JSON.stringify({ key: "other" })), ); - - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -340,7 +360,12 @@ Deno.test("data.findByTag() deduplicates when data exists under orphan coordinat await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); // Step 1: Create model and save data under its UUID @@ -374,7 +399,6 @@ Deno.test("data.findByTag() deduplicates when data exists under orphan coordinat await defRepo.save(type, recreatedModel); // Step 3: Build context — orphan recovery maps old UUID data to new model name - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -399,7 +423,12 @@ Deno.test("data.findByTag() deduplicates when both old and new UUIDs have data f await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); // Step 1: Create model and save data under its UUID @@ -449,7 +478,6 @@ Deno.test("data.findByTag() deduplicates when both old and new UUIDs have data f ); // Step 4: Build context — both UUIDs have data for "tagged-item" - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -479,7 +507,12 @@ Deno.test("data.findBySpec() returns records matching specName tag", async () => await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -518,8 +551,6 @@ Deno.test("data.findBySpec() returns records matching specName tag", async () => subnetB, new TextEncoder().encode(JSON.stringify({ cidr: "10.0.2.0/24" })), ); - - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -543,7 +574,12 @@ Deno.test("data.findBySpec() deduplicates when both old and new UUIDs have data await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); // Step 1: Create model and save data under its UUID @@ -593,7 +629,6 @@ Deno.test("data.findBySpec() deduplicates when both old and new UUIDs have data ); // Step 4: Build context — both UUIDs have data for "subnet-a" - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -619,7 +654,12 @@ Deno.test("data.findBySpec() returns only latest version when multiple versions await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -655,8 +695,6 @@ Deno.test("data.findBySpec() returns only latest version when multiple versions subnet, new TextEncoder().encode(JSON.stringify({ cidr: "10.0.3.0/24" })), ); - - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -680,7 +718,12 @@ Deno.test("data.findByTag() returns only latest version when multiple versions e await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -716,8 +759,6 @@ Deno.test("data.findByTag() returns only latest version when multiple versions e item, new TextEncoder().encode(JSON.stringify({ v: 3 })), ); - - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -745,7 +786,12 @@ Deno.test("data.* returns null/empty for missing model", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalogStore = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalogStore, + ); const resolver = new ModelResolver(defRepo, { repoDir, dataRepo }); const ctx = await resolver.buildContext(); @@ -763,7 +809,12 @@ Deno.test("data.* returns null/empty for missing data name", async () => { await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalogStore = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalogStore, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -790,7 +841,12 @@ Deno.test("findBySpec: returns all data regardless of workflowRunId", async () = await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -840,8 +896,6 @@ Deno.test("findBySpec: returns all data regardless of workflowRunId", async () = episodeB, new TextEncoder().encode(JSON.stringify({ title: "Episode B" })), ); - - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); @@ -871,7 +925,12 @@ Deno.test("findBySpec: returns all data when workflowRunId is not set", async () await withTempDir(async (repoDir) => { await setupRepoDir(repoDir); const defRepo = new YamlDefinitionRepository(repoDir); - const dataRepo = new FileSystemUnifiedDataRepository(repoDir); + const catalog = new CatalogStore(join(repoDir, "_catalog.db")); + const dataRepo = new FileSystemUnifiedDataRepository( + repoDir, + undefined, + catalog, + ); const type = ModelType.create("test/model"); const model = Definition.create({ @@ -916,8 +975,6 @@ Deno.test("findBySpec: returns all data when workflowRunId is not set", async () dataWithRun, new TextEncoder().encode(JSON.stringify({ value: 2 })), ); - - const catalog = new CatalogStore(join(repoDir, "_catalog.db")); const dqs = new DataQueryService(catalog, dataRepo); await dqs.query('name == ""'); diff --git a/src/domain/workflows/execution_service.ts b/src/domain/workflows/execution_service.ts index 2a746227..ffd73d1d 100644 --- a/src/domain/workflows/execution_service.ts +++ b/src/domain/workflows/execution_service.ts @@ -141,6 +141,8 @@ export interface StepExecutionContext { skipAllChecks?: boolean; /** Resolved base directory for data storage (S3 cache path) */ dataBaseDir?: string; + /** Catalog store for write-through indexing */ + catalogStore: CatalogStore; } /** @@ -205,6 +207,7 @@ export class DefaultStepExecutor implements StepExecutor { const unifiedDataRepo = new FileSystemUnifiedDataRepository( ctx.repoDir, ctx.dataBaseDir, + ctx.catalogStore, ); const outputRepo = new YamlOutputRepository(ctx.repoDir); const executionService = new DefaultMethodExecutionService(); @@ -1097,15 +1100,15 @@ export class WorkflowExecutionService { private readonly modelResolver: ModelResolver; private readonly dataRepo: FileSystemUnifiedDataRepository; private readonly dataBaseDir?: string; - private readonly catalogStore?: CatalogStore; + private readonly catalogStore: CatalogStore; constructor( private readonly workflowRepo: WorkflowRepository, private readonly runRepo: WorkflowRunRepository, private readonly repoDir: string, - executor?: StepExecutor, - dataBaseDir?: string, - catalogStore?: CatalogStore, + executor: StepExecutor | undefined, + dataBaseDir: string | undefined, + catalogStore: CatalogStore, ) { this.executor = executor ?? new DefaultStepExecutor(); this.dataBaseDir = dataBaseDir; @@ -1116,9 +1119,7 @@ export class WorkflowExecutionService { dataBaseDir, catalogStore, ); - const dataQueryService = catalogStore - ? new DataQueryService(catalogStore, this.dataRepo) - : undefined; + const dataQueryService = new DataQueryService(catalogStore, this.dataRepo); this.modelResolver = new ModelResolver(this.definitionRepo, { repoDir, dataRepo: this.dataRepo, @@ -1779,6 +1780,7 @@ export class WorkflowExecutionService { skipCheckLabels: options.skipCheckLabels, skipAllChecks: options.skipAllChecks, dataBaseDir: this.dataBaseDir, + catalogStore: this.catalogStore, }; return this.executor.execute(step, ctx); }); diff --git a/src/domain/workflows/execution_service_test.ts b/src/domain/workflows/execution_service_test.ts index 7c95d684..a0658e99 100644 --- a/src/domain/workflows/execution_service_test.ts +++ b/src/domain/workflows/execution_service_test.ts @@ -18,6 +18,7 @@ // along with Swamp. If not, see . import { assertEquals, assertNotEquals, assertRejects } from "@std/assert"; +import { join } from "@std/path"; import { coerceToSuffix, DefaultStepExecutor, @@ -26,6 +27,7 @@ import { type StepExecutor, WorkflowExecutionService, } from "./execution_service.ts"; +import { CatalogStore } from "../../infrastructure/persistence/catalog_store.ts"; import { Workflow } from "./workflow.ts"; import { Job } from "./job.ts"; import { Step } from "./step.ts"; @@ -202,11 +204,14 @@ Deno.test("executes simple workflow with one job and one step", async () => { const workflow = createSimpleWorkflow(); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -253,11 +258,14 @@ Deno.test("executes workflow with multiple jobs", async () => { await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -300,11 +308,14 @@ Deno.test("executes workflow with step dependencies", async () => { await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -327,11 +338,14 @@ Deno.test("marks workflow as failed when step fails", async () => { const workflow = createSimpleWorkflow(); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -379,11 +393,14 @@ Deno.test("skips job when trigger condition not met", async () => { await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -430,11 +447,14 @@ Deno.test("runs job on failure condition", async () => { await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -450,10 +470,14 @@ Deno.test("throws error for nonexistent workflow", async () => { const workflowRepo = new InMemoryWorkflowRepository(); const runRepo = new InMemoryWorkflowRunRepository(); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, + undefined, + undefined, + catalogStore, ); try { @@ -474,11 +498,14 @@ Deno.test("saves workflow run to repository", async () => { const workflow = createSimpleWorkflow(); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -498,11 +525,14 @@ Deno.test("run() yields lifecycle events during execution", async () => { const workflow = createSimpleWorkflow(); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const events: string[] = []; @@ -599,11 +629,14 @@ Deno.test("executes independent jobs in parallel", async () => { await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const startTime = Date.now(); @@ -688,11 +721,14 @@ Deno.test("executes dependent jobs sequentially across levels", async () => { await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const events: string[] = []; @@ -766,11 +802,14 @@ Deno.test("executes independent steps within a job in parallel", async () => { await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const startTime = Date.now(); @@ -910,11 +949,14 @@ Deno.test("updates data context between workflow steps", async () => { await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -1010,11 +1052,14 @@ Deno.test("updates both resource and file context when step produces both", asyn await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -1109,11 +1154,14 @@ Deno.test("executes linear chain where multiple steps reference same model", asy await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -1138,11 +1186,14 @@ Deno.test("run() event stream includes all expected event types", async () => { const workflow = createSimpleWorkflow(); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const eventTypes: string[] = []; @@ -1184,10 +1235,14 @@ Deno.test("workflow step fails when nesting depth exceeded", async () => { }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, + undefined, + undefined, + catalogStore, ); const events: { kind: string; error?: string }[] = []; @@ -1231,10 +1286,14 @@ Deno.test("workflow step fails on direct cycle detection", async () => { }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, + undefined, + undefined, + catalogStore, ); const events: { kind: string; error?: string }[] = []; @@ -1264,6 +1323,7 @@ Deno.test("DefaultStepExecutor rejects workflow task type", async () => { task: StepTask.workflow("child-workflow"), }); + const catalogStore = new CatalogStore(join("/tmp", "_catalog.db")); const ctx: StepExecutionContext = { workflowId: createWorkflowId("parent-id"), workflowRunId: "run-123", @@ -1272,6 +1332,7 @@ Deno.test("DefaultStepExecutor rejects workflow task type", async () => { stepName: "nested-step", repoDir: "/tmp", signal: new AbortController().signal, + catalogStore, }; await assertRejects( @@ -1336,11 +1397,14 @@ Deno.test("evaluateWorkflow skips task.inputs with step-output dependencies", as await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); // This should NOT crash during evaluateWorkflow - the resource expression @@ -1386,11 +1450,14 @@ Deno.test("step executor receives correct workflow context", async () => { }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); await service.execute(workflow.name); @@ -1444,11 +1511,14 @@ Deno.test("task.inputs matching definition input keys are forwarded to step exec }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -1515,11 +1585,14 @@ Deno.test("workflow expressions are evaluated before step execution (Bug A)", as }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name, { @@ -1586,11 +1659,14 @@ Deno.test("useLastEvaluated context carries task.inputs and expressionContext (B const evalWorkflowRepo = new YamlEvaluatedWorkflowRepository(tempDir); await evalWorkflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name, { @@ -1651,11 +1727,14 @@ Deno.test("step with allowFailure true fails but job still succeeds", async () = }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -1696,11 +1775,14 @@ Deno.test("step with allowFailure true fails but workflow still succeeds", async }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -1755,11 +1837,14 @@ Deno.test("downstream step with dependsOn succeeded skips when allowFailure step }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -1810,11 +1895,14 @@ Deno.test("downstream step with dependsOn completed runs when allowFailure step }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -1860,11 +1948,14 @@ Deno.test("mix of allowFailure and regular failing steps causes job failure", as }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name); @@ -1917,11 +2008,14 @@ Deno.test("check skip options and swampSha are threaded to step context", async }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); for await ( @@ -1974,11 +2068,14 @@ Deno.test("expandForEachSteps: multi-expression step name produces unique names }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name, { @@ -2031,11 +2128,14 @@ Deno.test("expandForEachSteps: single-expression step name resolves correctly", }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name, { @@ -2080,11 +2180,14 @@ Deno.test("expandForEachSteps: step name without expressions appends item value" }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name, { @@ -2129,11 +2232,14 @@ Deno.test("expandForEachSteps: object iteration with multi-expression step name" }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name, { @@ -2187,11 +2293,14 @@ Deno.test("expandForEachSteps: appends index when array item expression evaluati }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name, { @@ -2246,11 +2355,14 @@ Deno.test("expandForEachSteps: appends key when object item expression evaluatio }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name, { @@ -2303,11 +2415,14 @@ Deno.test("expandForEachSteps: does not append index when expression evaluates s }); await workflowRepo.save(workflow); + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); const service = new WorkflowExecutionService( workflowRepo, runRepo, tempDir, executor, + undefined, + catalogStore, ); const run = await service.execute(workflow.name, { diff --git a/src/infrastructure/persistence/repository_factory.ts b/src/infrastructure/persistence/repository_factory.ts index 1ec69b45..4cf4322b 100644 --- a/src/infrastructure/persistence/repository_factory.ts +++ b/src/infrastructure/persistence/repository_factory.ts @@ -68,6 +68,31 @@ import { SWAMP_SUBDIRS, swampPath } from "./paths.ts"; import { CatalogStore } from "./catalog_store.ts"; import { join } from "@std/path"; +// ============================================================================= +// Catalog Store Factory +// ============================================================================= + +/** + * Creates a CatalogStore for the given repository. + * + * Centralizes the three-step pattern: resolve data base dir, build DB path, + * construct CatalogStore. Use this whenever you need a CatalogStore outside + * of {@link createRepositoryContext}. + * + * @param repoDir - The repository directory path + * @param datastoreResolver - Optional datastore path resolver + * @returns A new CatalogStore instance + */ +export function createCatalogStore( + repoDir: string, + datastoreResolver?: DatastorePathResolver, +): CatalogStore { + const dataBaseDir = datastoreResolver?.resolvePath(SWAMP_SUBDIRS.data) ?? + swampPath(repoDir, SWAMP_SUBDIRS.data); + const catalogDbPath = join(dataBaseDir, "_catalog.db"); + return new CatalogStore(catalogDbPath); +} + // ============================================================================= // Standalone Repository Factory Functions // ============================================================================= @@ -118,8 +143,10 @@ export function createEvaluatedDefinitionRepository( */ export function createUnifiedDataRepository( repoDir: string, + catalogStore: CatalogStore, + baseDir?: string, ): FileSystemUnifiedDataRepository { - return new FileSystemUnifiedDataRepository(repoDir); + return new FileSystemUnifiedDataRepository(repoDir, baseDir, catalogStore); } /** @@ -211,7 +238,7 @@ export interface RepositoryContext { workflowRepo: WorkflowRepository; workflowRunRepo: YamlWorkflowRunRepository; vaultConfigRepo: YamlVaultConfigRepository; - catalogStore?: CatalogStore; + catalogStore: CatalogStore; } /** @@ -277,10 +304,7 @@ export function createRepositoryContext( ); // Create catalog store for data query - const dataBaseDir = dsPath(SWAMP_SUBDIRS.data) ?? - swampPath(repoDir, SWAMP_SUBDIRS.data); - const catalogDbPath = join(dataBaseDir, "_catalog.db"); - const catalogStore = new CatalogStore(catalogDbPath); + const catalogStore = createCatalogStore(repoDir, datastoreResolver); // Unified data repository with catalog write-through const unifiedDataRepo = new FileSystemUnifiedDataRepository( diff --git a/src/infrastructure/persistence/unified_data_repository.ts b/src/infrastructure/persistence/unified_data_repository.ts index 91799655..6ee40921 100644 --- a/src/infrastructure/persistence/unified_data_repository.ts +++ b/src/infrastructure/persistence/unified_data_repository.ts @@ -430,14 +430,13 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { constructor( private readonly repoDir: string, - baseDir?: string, - private readonly catalogStore?: CatalogStore, + baseDir: string | undefined, + private readonly catalogStore: CatalogStore, ) { this.baseDir = baseDir ?? swampPath(repoDir, SWAMP_SUBDIRS.data); } private catalogUpsert(type: ModelType, modelId: string, data: Data): void { - if (!this.catalogStore) return; this.catalogStore.upsert({ type_normalized: type.normalized, model_id: modelId, @@ -468,7 +467,6 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { modelId: string, dataName: string, ): void { - if (!this.catalogStore) return; this.catalogStore.remove(type.normalized, modelId, dataName); } diff --git a/src/infrastructure/persistence/unified_data_repository_test.ts b/src/infrastructure/persistence/unified_data_repository_test.ts index 33c0587e..3d24427f 100644 --- a/src/infrastructure/persistence/unified_data_repository_test.ts +++ b/src/infrastructure/persistence/unified_data_repository_test.ts @@ -23,14 +23,21 @@ import { assertRejects, assertStringIncludes, } from "@std/assert"; +import { join } from "@std/path"; import { FileSystemUnifiedDataRepository } from "./unified_data_repository.ts"; +import { CatalogStore } from "./catalog_store.ts"; import { Data } from "../../domain/data/mod.ts"; import { ModelType } from "../../domain/models/model_type.ts"; const testType = ModelType.create("test/model"); Deno.test("getPath rejects dataName with path traversal", () => { - const repo = new FileSystemUnifiedDataRepository("/tmp/test-repo"); + const catalogStore = new CatalogStore(join("/tmp/test-repo", "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + "/tmp/test-repo", + undefined, + catalogStore, + ); try { repo.getPath(testType, "valid-model", "../escape", 1); throw new Error("Expected path traversal error"); @@ -43,7 +50,12 @@ Deno.test("getPath rejects dataName with path traversal", () => { }); Deno.test("getPath rejects modelId with path traversal", () => { - const repo = new FileSystemUnifiedDataRepository("/tmp/test-repo"); + const catalogStore = new CatalogStore(join("/tmp/test-repo", "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + "/tmp/test-repo", + undefined, + catalogStore, + ); try { repo.getPath(testType, "../escape", "valid-data", 1); throw new Error("Expected path traversal error"); @@ -56,14 +68,24 @@ Deno.test("getPath rejects modelId with path traversal", () => { }); Deno.test("getPath accepts valid modelId and dataName", () => { - const repo = new FileSystemUnifiedDataRepository("/tmp/test-repo"); + const catalogStore = new CatalogStore(join("/tmp/test-repo", "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + "/tmp/test-repo", + undefined, + catalogStore, + ); const path = repo.getPath(testType, "my-model-id", "my-data-name", 1); assertStringIncludes(path, "my-model-id"); assertStringIncludes(path, "my-data-name"); }); Deno.test("listVersions rejects dataName with path traversal", async () => { - const repo = new FileSystemUnifiedDataRepository("/tmp/test-repo"); + const catalogStore = new CatalogStore(join("/tmp/test-repo", "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + "/tmp/test-repo", + undefined, + catalogStore, + ); await assertRejects( () => repo.listVersions(testType, "valid-model", "../escape"), Error, @@ -90,7 +112,12 @@ function makeData(name: string): Data { Deno.test("concurrent allocateVersion returns unique versions", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data = makeData("concurrent-alloc"); const concurrency = 10; @@ -118,7 +145,12 @@ Deno.test("concurrent allocateVersion returns unique versions", async () => { Deno.test("concurrent save returns unique versions with distinct content", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data = makeData("concurrent-save"); const concurrency = 10; @@ -158,7 +190,12 @@ Deno.test("concurrent save returns unique versions with distinct content", async Deno.test("save rejects reserved data name 'latest'", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data = makeData("latest"); const content = new TextEncoder().encode("test"); @@ -175,7 +212,12 @@ Deno.test("save rejects reserved data name 'latest'", async () => { Deno.test("save rejects reserved data name case-insensitively", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data = makeData("LATEST"); const content = new TextEncoder().encode("test"); @@ -192,7 +234,12 @@ Deno.test("save rejects reserved data name case-insensitively", async () => { Deno.test("allocateVersion rejects reserved data name 'latest'", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data = makeData("latest"); await assertRejects( @@ -223,7 +270,12 @@ function makeJsonData(name: string): Data { Deno.test("getLatestVersionSync reads latest symlink", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data = makeJsonData("sync-latest"); await repo.save( @@ -251,7 +303,14 @@ Deno.test("getLatestVersionSync reads latest symlink", async () => { }); Deno.test("getLatestVersionSync returns null for missing data", () => { - const repo = new FileSystemUnifiedDataRepository("/tmp/nonexistent-repo"); + const catalogStore = new CatalogStore( + join("/tmp/nonexistent-repo", "_catalog.db"), + ); + const repo = new FileSystemUnifiedDataRepository( + "/tmp/nonexistent-repo", + undefined, + catalogStore, + ); const result = repo.getLatestVersionSync( testType, "missing-model", @@ -263,7 +322,12 @@ Deno.test("getLatestVersionSync returns null for missing data", () => { Deno.test("findByNameSync reads metadata", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data = makeJsonData("sync-find"); await repo.save( @@ -283,7 +347,14 @@ Deno.test("findByNameSync reads metadata", async () => { }); Deno.test("findByNameSync returns null for missing data", () => { - const repo = new FileSystemUnifiedDataRepository("/tmp/nonexistent-repo"); + const catalogStore = new CatalogStore( + join("/tmp/nonexistent-repo", "_catalog.db"), + ); + const repo = new FileSystemUnifiedDataRepository( + "/tmp/nonexistent-repo", + undefined, + catalogStore, + ); const result = repo.findByNameSync( testType, "missing-model", @@ -295,7 +366,12 @@ Deno.test("findByNameSync returns null for missing data", () => { Deno.test("listVersionsSync returns sorted version numbers", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data = makeJsonData("sync-list"); for (let i = 0; i < 3; i++) { @@ -315,7 +391,14 @@ Deno.test("listVersionsSync returns sorted version numbers", async () => { }); Deno.test("listVersionsSync returns empty for missing data", () => { - const repo = new FileSystemUnifiedDataRepository("/tmp/nonexistent-repo"); + const catalogStore = new CatalogStore( + join("/tmp/nonexistent-repo", "_catalog.db"), + ); + const repo = new FileSystemUnifiedDataRepository( + "/tmp/nonexistent-repo", + undefined, + catalogStore, + ); const versions = repo.listVersionsSync( testType, "missing-model", @@ -327,7 +410,12 @@ Deno.test("listVersionsSync returns empty for missing data", () => { Deno.test("getContentSync reads content bytes", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data = makeJsonData("sync-content"); const content = new TextEncoder().encode('{"hello":"world"}'); @@ -342,7 +430,14 @@ Deno.test("getContentSync reads content bytes", async () => { }); Deno.test("getContentSync returns null for missing content", () => { - const repo = new FileSystemUnifiedDataRepository("/tmp/nonexistent-repo"); + const catalogStore = new CatalogStore( + join("/tmp/nonexistent-repo", "_catalog.db"), + ); + const repo = new FileSystemUnifiedDataRepository( + "/tmp/nonexistent-repo", + undefined, + catalogStore, + ); const result = repo.getContentSync( testType, "missing-model", @@ -354,7 +449,12 @@ Deno.test("getContentSync returns null for missing content", () => { Deno.test("findAllForModelSync returns all data items", async () => { const tmpDir = await Deno.makeTempDir(); try { - const repo = new FileSystemUnifiedDataRepository(tmpDir); + const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); + const repo = new FileSystemUnifiedDataRepository( + tmpDir, + undefined, + catalogStore, + ); const data1 = makeJsonData("item-a"); const data2 = makeJsonData("item-b"); @@ -382,7 +482,14 @@ Deno.test("findAllForModelSync returns all data items", async () => { }); Deno.test("findAllForModelSync returns empty for missing model", () => { - const repo = new FileSystemUnifiedDataRepository("/tmp/nonexistent-repo"); + const catalogStore = new CatalogStore( + join("/tmp/nonexistent-repo", "_catalog.db"), + ); + const repo = new FileSystemUnifiedDataRepository( + "/tmp/nonexistent-repo", + undefined, + catalogStore, + ); const results = repo.findAllForModelSync(testType, "missing-model"); assertEquals(results, []); }); diff --git a/src/libswamp/data/gc.ts b/src/libswamp/data/gc.ts index b7d4325c..4c91c1dc 100644 --- a/src/libswamp/data/gc.ts +++ b/src/libswamp/data/gc.ts @@ -25,6 +25,7 @@ import { import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts"; import { YamlWorkflowRunRepository } from "../../infrastructure/persistence/yaml_workflow_run_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import type { SwampError } from "../errors.ts"; @@ -85,6 +86,7 @@ export function createDataGcDeps( const unifiedDataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); const workflowRunRepo = new YamlWorkflowRunRepository( repoDir, diff --git a/src/libswamp/data/get.ts b/src/libswamp/data/get.ts index 5f1c4c1c..cdbded4f 100644 --- a/src/libswamp/data/get.ts +++ b/src/libswamp/data/get.ts @@ -30,6 +30,7 @@ import { SWAMP_SUBDIRS, toRelativePath, } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import type { SwampError } from "../errors.ts"; @@ -177,6 +178,7 @@ export function createDataGetDeps( const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); const workflowRepo = new YamlWorkflowRepository(repoDir); const runRepo = new YamlWorkflowRunRepository( diff --git a/src/libswamp/data/list.ts b/src/libswamp/data/list.ts index 9bcad3fa..0b06893a 100644 --- a/src/libswamp/data/list.ts +++ b/src/libswamp/data/list.ts @@ -27,6 +27,7 @@ import { YamlDefinitionRepository } from "../../infrastructure/persistence/yaml_ import { YamlWorkflowRepository } from "../../infrastructure/persistence/yaml_workflow_repository.ts"; import { YamlWorkflowRunRepository } from "../../infrastructure/persistence/yaml_workflow_run_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import { notFound, type SwampError, validationFailed } from "../errors.ts"; @@ -163,6 +164,7 @@ export function createDataListDeps( const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); const workflowRepo = new YamlWorkflowRepository(repoDir); const runRepo = new YamlWorkflowRunRepository( diff --git a/src/libswamp/data/rename.ts b/src/libswamp/data/rename.ts index d02e172f..c27877f6 100644 --- a/src/libswamp/data/rename.ts +++ b/src/libswamp/data/rename.ts @@ -24,6 +24,7 @@ import { import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts"; import { YamlDefinitionRepository } from "../../infrastructure/persistence/yaml_definition_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import type { SwampError } from "../errors.ts"; @@ -75,6 +76,7 @@ export function createDataRenameDeps( const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); const definitionRepo = new YamlDefinitionRepository(repoDir); const service = new DataRenameService(dataRepo, definitionRepo); diff --git a/src/libswamp/data/versions.ts b/src/libswamp/data/versions.ts index c948c03b..21f1644e 100644 --- a/src/libswamp/data/versions.ts +++ b/src/libswamp/data/versions.ts @@ -23,6 +23,7 @@ import type { ModelType } from "../../domain/models/model_type.ts"; import { YamlDefinitionRepository } from "../../infrastructure/persistence/yaml_definition_repository.ts"; import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import { notFound, type SwampError } from "../errors.ts"; @@ -94,6 +95,7 @@ export function createDataVersionsDeps( const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); return { lookupDefinition: (idOrName) => diff --git a/src/libswamp/models/delete.ts b/src/libswamp/models/delete.ts index dfb9d7d6..992c00e5 100644 --- a/src/libswamp/models/delete.ts +++ b/src/libswamp/models/delete.ts @@ -27,6 +27,7 @@ import { YamlWorkflowRepository } from "../../infrastructure/persistence/yaml_wo import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts"; import { YamlOutputRepository } from "../../infrastructure/persistence/yaml_output_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import { createModelOutputId } from "../../domain/models/model_output.ts"; import type { LibSwampContext } from "../context.ts"; @@ -108,6 +109,7 @@ export function createModelDeleteDeps( const unifiedDataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); const outputRepo = new YamlOutputRepository( repoDir, diff --git a/src/libswamp/models/evaluate.ts b/src/libswamp/models/evaluate.ts index 2a3a0a2f..11f7696b 100644 --- a/src/libswamp/models/evaluate.ts +++ b/src/libswamp/models/evaluate.ts @@ -29,6 +29,7 @@ import { YamlDefinitionRepository } from "../../infrastructure/persistence/yaml_ import { YamlEvaluatedDefinitionRepository } from "../../infrastructure/persistence/yaml_evaluated_definition_repository.ts"; import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import { notFound, type SwampError } from "../errors.ts"; @@ -94,6 +95,7 @@ export function createModelEvaluateDeps( const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); const evaluationService = new ExpressionEvaluationService( definitionRepo, diff --git a/src/libswamp/models/output_data.ts b/src/libswamp/models/output_data.ts index 213d0673..06cf41d2 100644 --- a/src/libswamp/models/output_data.ts +++ b/src/libswamp/models/output_data.ts @@ -31,6 +31,7 @@ import { YamlOutputRepository } from "../../infrastructure/persistence/yaml_outp import { YamlDefinitionRepository } from "../../infrastructure/persistence/yaml_definition_repository.ts"; import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import { notFound, type SwampError, validationFailed } from "../errors.ts"; @@ -114,6 +115,7 @@ export function createModelOutputDataDeps( const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); return { isPartialId, diff --git a/src/libswamp/models/output_logs.ts b/src/libswamp/models/output_logs.ts index 4db2f1a9..1c38e078 100644 --- a/src/libswamp/models/output_logs.ts +++ b/src/libswamp/models/output_logs.ts @@ -26,6 +26,7 @@ import { import { YamlOutputRepository } from "../../infrastructure/persistence/yaml_output_repository.ts"; import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import { notFound, type SwampError, validationFailed } from "../errors.ts"; @@ -90,6 +91,7 @@ export function createModelOutputLogsDeps( const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); return { isPartialId, diff --git a/src/libswamp/models/run.ts b/src/libswamp/models/run.ts index c6d07d44..2bb6dc52 100644 --- a/src/libswamp/models/run.ts +++ b/src/libswamp/models/run.ts @@ -156,8 +156,8 @@ export interface ModelMethodRunDeps { dataRepo: UnifiedDataRepository; definitionRepo: YamlDefinitionRepository; outputRepo: OutputRepository; - /** Pre-built query function for context.queryData(). Absent = feature unavailable. */ - queryData?: ( + /** Pre-built query function for context.queryData(). */ + queryData: ( predicate: string, select?: string, ) => Promise; diff --git a/src/libswamp/models/run_test.ts b/src/libswamp/models/run_test.ts index aa391439..3a9991ca 100644 --- a/src/libswamp/models/run_test.ts +++ b/src/libswamp/models/run_test.ts @@ -165,6 +165,7 @@ function createTestDeps( dataRepo: createFakeDataRepo(), definitionRepo: createFakeDefinitionRepo(), outputRepo: createFakeOutputRepo(), + queryData: () => Promise.resolve([]), createRunLog: () => Promise.resolve({ logFilePath: "/tmp/test.log", diff --git a/src/libswamp/models/validate.ts b/src/libswamp/models/validate.ts index 5b8969fd..caf9e4d2 100644 --- a/src/libswamp/models/validate.ts +++ b/src/libswamp/models/validate.ts @@ -29,6 +29,7 @@ import { import { YamlDefinitionRepository } from "../../infrastructure/persistence/yaml_definition_repository.ts"; import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { ModelType } from "../../domain/models/model_type.ts"; import type { LibSwampContext } from "../context.ts"; @@ -124,6 +125,7 @@ export function createModelValidateDeps( const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); const validationService = new DefaultModelValidationService(); diff --git a/src/libswamp/reports/search.ts b/src/libswamp/reports/search.ts index df742e4e..ea3b0124 100644 --- a/src/libswamp/reports/search.ts +++ b/src/libswamp/reports/search.ts @@ -31,6 +31,7 @@ import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistenc import { YamlDefinitionRepository } from "../../infrastructure/persistence/yaml_definition_repository.ts"; import { YamlWorkflowRepository } from "../../infrastructure/persistence/yaml_workflow_repository.ts"; import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import type { LibSwampContext } from "../context.ts"; import { notFound } from "../errors.ts"; import type { ReportSearchEvent, StoredReportSummary } from "./report_views.ts"; @@ -83,6 +84,7 @@ export async function createReportSearchDeps( const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), + createCatalogStore(repoDir, datastoreResolver), ); const workflowRepo = new YamlWorkflowRepository(repoDir); return { diff --git a/src/libswamp/workflows/evaluate.ts b/src/libswamp/workflows/evaluate.ts index a42b6658..f5cff95f 100644 --- a/src/libswamp/workflows/evaluate.ts +++ b/src/libswamp/workflows/evaluate.ts @@ -40,13 +40,9 @@ import type { WorkflowRepository } from "../../domain/workflows/repositories.ts" import { YamlEvaluatedWorkflowRepository } from "../../infrastructure/persistence/yaml_evaluated_workflow_repository.ts"; import { YamlDefinitionRepository } from "../../infrastructure/persistence/yaml_definition_repository.ts"; import { FileSystemUnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts"; -import { - SWAMP_SUBDIRS, - swampPath, -} from "../../infrastructure/persistence/paths.ts"; -import { CatalogStore } from "../../infrastructure/persistence/catalog_store.ts"; +import { SWAMP_SUBDIRS } from "../../infrastructure/persistence/paths.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; import { DataQueryService } from "../../domain/data/data_query_service.ts"; -import { join } from "@std/path"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import { notFound, type SwampError } from "../errors.ts"; @@ -111,10 +107,7 @@ export function createWorkflowEvaluateDeps( const dsPath = (subdir: string): string | undefined => datastoreResolver?.resolvePath(subdir); const definitionRepo = new YamlDefinitionRepository(repoDir); - const dataBaseDir = dsPath(SWAMP_SUBDIRS.data) ?? - swampPath(repoDir, SWAMP_SUBDIRS.data); - const catalogDbPath = join(dataBaseDir, "_catalog.db"); - const catalogStore = new CatalogStore(catalogDbPath); + const catalogStore = createCatalogStore(repoDir, datastoreResolver); const dataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), diff --git a/src/libswamp/workflows/run.ts b/src/libswamp/workflows/run.ts index 3e839365..11ef5278 100644 --- a/src/libswamp/workflows/run.ts +++ b/src/libswamp/workflows/run.ts @@ -185,9 +185,9 @@ export interface WorkflowRunDeps { workflowRepo: WorkflowRepository, runRepo: WorkflowRunRepository, repoDir: string, - catalogStore?: CatalogStore, + catalogStore: CatalogStore, ) => WorkflowExecutionService; - catalogStore?: CatalogStore; + catalogStore: CatalogStore; dataRepo?: UnifiedDataRepository; definitionRepo?: DefinitionRepository; } diff --git a/src/libswamp/workflows/run_test.ts b/src/libswamp/workflows/run_test.ts index 91302577..72c3b0e5 100644 --- a/src/libswamp/workflows/run_test.ts +++ b/src/libswamp/workflows/run_test.ts @@ -29,6 +29,8 @@ import { } from "./run.ts"; import { createLibSwampContext } from "../context.ts"; import { collect } from "../testing.ts"; +import { CatalogStore } from "../../infrastructure/persistence/catalog_store.ts"; +import { join } from "@std/path"; import { Workflow } from "../../domain/workflows/workflow.ts"; import { Job } from "../../domain/workflows/job.ts"; import { Step } from "../../domain/workflows/step.ts"; @@ -169,14 +171,18 @@ function createTestDeps( ): WorkflowRunDeps { const workflowRepo = new InMemoryWorkflowRepository(); const runRepo = new InMemoryWorkflowRunRepository(); + const tempDir = Deno.makeTempDirSync(); + const catalogStore = new CatalogStore(join(tempDir, "catalog.db")); return { workflowRepo, runRepo, repoDir: "/tmp/test", + catalogStore, lookupWorkflow: (_repo, _idOrName) => Promise.resolve(workflow), - // deno-lint-ignore no-explicit-any - createExecutionService: () => createFakeService(events) as any, + createExecutionService: (_wr, _rr, _rd, _cs) => + // deno-lint-ignore no-explicit-any + createFakeService(events) as any, }; } @@ -432,13 +438,16 @@ function createTestDepsWithCapture( ): WorkflowRunDeps { const workflowRepo = new InMemoryWorkflowRepository(); const runRepo = new InMemoryWorkflowRunRepository(); + const tempDir = Deno.makeTempDirSync(); + const catalogStore = new CatalogStore(join(tempDir, "catalog.db")); return { workflowRepo, runRepo, repoDir: "/tmp/test", + catalogStore, lookupWorkflow: (_repo, _idOrName) => Promise.resolve(workflow), - createExecutionService: () => + createExecutionService: (_wr, _rr, _rd, _cs) => // deno-lint-ignore no-explicit-any createCapturingFakeService(events, captured) as any, }; @@ -868,7 +877,7 @@ Deno.test("workflowRun yields cancelled error when abort signal fires during exe // Create a service that aborts mid-stream const deps: WorkflowRunDeps = { ...createTestDeps(workflow, []), - createExecutionService: () => + createExecutionService: (_wr, _rr, _rd, _cs) => ({ // deno-lint-ignore require-yield async *run() { @@ -902,7 +911,7 @@ Deno.test("workflowRun yields cancelled error when signal is pre-aborted", async // Service throws AbortError immediately const deps: WorkflowRunDeps = { ...createTestDeps(workflow, []), - createExecutionService: () => + createExecutionService: (_wr, _rr, _rd, _cs) => ({ // deno-lint-ignore require-yield async *run() { diff --git a/src/serve/connection.ts b/src/serve/connection.ts index 134ae177..11e801ca 100644 --- a/src/serve/connection.ts +++ b/src/serve/connection.ts @@ -255,7 +255,7 @@ async function handleModelMethodRun( }], ctx.repoDir, ); - if (lockResult.synced) ctx.repoContext.catalogStore?.invalidate(); + if (lockResult.synced) ctx.repoContext.catalogStore.invalidate(); flushLocks = lockResult.flush; } diff --git a/src/serve/deps.ts b/src/serve/deps.ts index a07069e1..d625ca52 100644 --- a/src/serve/deps.ts +++ b/src/serve/deps.ts @@ -104,20 +104,20 @@ export async function createModelMethodRunDeps( lookupDefinition: (idOrName) => findDefinitionByIdOrName(repoContext.definitionRepo, idOrName), getModelDef: (type) => resolveModelType(type, getAutoResolver()), - createEvaluationService: () => - new ExpressionEvaluationService( + createEvaluationService: () => { + const dqs = new DataQueryService( + repoContext.catalogStore, + repoContext.unifiedDataRepo, + ); + return new ExpressionEvaluationService( repoContext.definitionRepo, repoDir, { dataRepo: repoContext.unifiedDataRepo, - dataQueryService: repoContext.catalogStore - ? new DataQueryService( - repoContext.catalogStore, - repoContext.unifiedDataRepo, - ) - : undefined, + dataQueryService: dqs, }, - ), + ); + }, loadEvaluatedDefinition: (type, name) => repoContext.evaluatedDefinitionRepo.findByName(type, name), saveEvaluatedDefinition: (type, definition) => @@ -127,15 +127,14 @@ export async function createModelMethodRunDeps( dataRepo: repoContext.unifiedDataRepo, definitionRepo: repoContext.definitionRepo, outputRepo: repoContext.outputRepo, - queryData: repoContext.catalogStore - ? ((dqs) => (predicate: string, select?: string) => + queryData: + ((dqs) => (predicate: string, select?: string) => dqs.query(predicate, { select }))( new DataQueryService( repoContext.catalogStore, repoContext.unifiedDataRepo, ), - ) - : undefined, + ), createRunLog: async (modelType, method, definitionId) => { const redactor = new SecretRedactor(); const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); @@ -215,7 +214,7 @@ export async function executeWorkflowWithLocks( resolvedModels, repoDir, ); - if (lockResult.synced) repoContext.catalogStore?.invalidate(); + if (lockResult.synced) repoContext.catalogStore.invalidate(); flushLocks = lockResult.flush; } }