From 7ff4445a1c055d9bae7800999e47296352cd1336 Mon Sep 17 00:00:00 2001 From: stack72 Date: Wed, 8 Apr 2026 20:58:17 +0100 Subject: [PATCH] fix(datastore): exclude _catalog.db* from S3/GCS sync walker (swamp-club#29) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `swamp datastore sync --push` against an @swamp/s3-datastore fails fatally on `_catalog.db-wal` because the SQLite data catalog lives inside the sync cache directory and the walker enqueues all three `_catalog.db*` files for upload. SQLite rewrites the WAL mid-upload with PRAGMA journal_mode=WAL, and the S3 PUT fails. Fix in two layers: 1. Add an `isInternalCacheFile` helper that filters the three pre-existing internal files PLUS basenames `_catalog.db` and anything starting with `_catalog.db-` (matching the SQLite WAL, SHM, and journal sidecars). The helper uses basename matching so it's robust to future changes in the data tier subdirectory. 2. Add passive index cleanup so polluted remote indexes from the bug period heal automatically. A `scrubIndex` method removes zombie `_catalog.db*` entries from the in-memory index on load, and an `indexMutated` flag propagates the cleaned index back to the remote on the next push (even a no-op push). The scrub runs inside `loadIndex` and both branches of `pullIndex` so the persistence boundary enforces the invariant. On the cache-miss branch of `pullIndex` the local cache file is rewritten with the scrubbed JSON for on-disk/in-memory consistency. Applied identically to `@swamp/s3-datastore` and `@swamp/gcs-datastore` since both carry the same defect — the hardcoded three-filename skip list at `s3_cache_sync.ts:252` and `gcs_cache_sync.ts:221`. Seven new unit tests per extension cover: push skips catalog files, push still skips pre-existing internal files, walker-level skip is a true safety net via post-scrub zombie injection, pull skips zombies from remote index, pullIndex S3/GCS-fetch path scrubs and rewrites local cache file, pullIndex cache-hit path scrubs and indexMutated propagates via next push, no-op push propagates scrubbed index then flag resets on second call. Out of scope (tracked separately): - swamp-club#30: `sync --push` calls `pushChanged()` twice per invocation via the coordinator flush path. - swamp-club#31: orphan physical `_catalog.db*` objects remaining in pre-fix buckets. Acceptable trade-off — release notes can direct users to `aws s3 rm` / `gsutil rm` if they want to reclaim the few KB of dead weight. - systeminit/swamp-uat#131: CLI-level UAT coverage for the populated-catalog push scenario. Manifest versions bumped to 2026.04.08.1 for CI auto-publish. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../datastores/_lib/gcs_cache_sync.ts | 114 +++- .../datastores/_lib/gcs_cache_sync_test.ts | 464 ++++++++++++++++ datastore/gcs/manifest.yaml | 2 +- .../datastores/_lib/s3_cache_sync.ts | 115 +++- .../datastores/_lib/s3_cache_sync_test.ts | 504 ++++++++++++++++++ datastore/s3/manifest.yaml | 2 +- 6 files changed, 1181 insertions(+), 20 deletions(-) create mode 100644 datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts create mode 100644 datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts index 517c0853f..9e2595500 100644 --- a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts @@ -48,6 +48,40 @@ function assertSafePath(cachePath: string, relativePath: string): string { return resolved; } +/** + * Returns true for files that live inside the cache directory but must + * NOT cross the sync boundary in either direction (push or pull). + * + * Excluded patterns: + * - `.datastore-index.json` — the remote index manifest itself; pulled + * and pushed via dedicated code paths, never as a walked payload. + * - `.push-queue.json` — local push-queue scratch file. + * - `.datastore.lock` — distributed lock file; managed by the lock + * subsystem, must never be uploaded as data. + * - basename `_catalog.db` and anything starting with `_catalog.db-` + * (the SQLite WAL/SHM/journal sidecars) — the local-only data catalog + * store. It is deliberately colocated with the data tier so it can + * be rebuilt from whatever the local cache holds, but the database + * itself is per-machine state and must never leak to the shared + * bucket. See swamp-club issue #29 for the bug this exclusion fixes: + * without it, `swamp datastore sync --push` would walk `_catalog.db*` + * into `toPush`, SQLite would rewrite the WAL mid-upload, and the + * push would fail on `_catalog.db-wal`. + * + * Uses basename matching for the catalog pattern so the filter is + * robust to any future change in the data tier subdirectory name. + */ +function isInternalCacheFile(rel: string): boolean { + if ( + rel === ".datastore-index.json" || rel === ".push-queue.json" || + rel === ".datastore.lock" + ) { + return true; + } + const base = rel.split("/").pop() ?? ""; + return base === "_catalog.db" || base.startsWith("_catalog.db-"); +} + /** Metadata index entry for a file in GCS. */ interface IndexEntry { key: string; @@ -75,6 +109,13 @@ export class GcsCacheSyncService implements DatastoreSyncService { private readonly cachePath: string; private readonly indexPath: string; private index: DatastoreIndex | null = null; + /** + * Set to true when `scrubIndex()` removes zombie entries from the + * in-memory index. Drives the write-back gate in `pushChanged()` so + * that the cleaned index propagates to the remote even on a no-op + * push (no new files to upload). Reset after a successful write-back. + */ + private indexMutated = false; constructor(gcs: GcsClient, cachePath: string) { this.gcs = gcs; @@ -82,9 +123,40 @@ export class GcsCacheSyncService implements DatastoreSyncService { this.indexPath = join(cachePath, ".datastore-index.json"); } + /** + * Removes zombie internal-file entries from the in-memory index. + * Runs whenever the index is populated from disk or remote so the + * invariant "internal files never cross the sync boundary" is + * enforced at the persistence boundary. + * + * Returns true if any entries were removed. See + * `isInternalCacheFile` for the exclusion criteria and swamp-club + * issue #29 for the motivating bug. + */ + private scrubIndex(): boolean { + if (!this.index || !this.index.entries) return false; + let mutated = false; + for (const rel of Object.keys(this.index.entries)) { + if (isInternalCacheFile(rel)) { + delete this.index.entries[rel]; + mutated = true; + } + } + return mutated; + } + /** * Pulls the metadata index from GCS (lightweight, single GET). * Uses a local cache with a 60-second TTL to avoid redundant fetches. + * + * Both entry paths (cache-hit and GCS-fetch) scrub the in-memory + * index after parsing so zombie internal-file entries never reach + * the rest of the sync pipeline (see swamp-club#29). On the + * GCS-fetch path the local cache file is rewritten with the scrubbed + * JSON when scrub mutated — keeping the on-disk and in-memory views + * consistent. The cache-hit path does NOT rewrite the local file; + * its on-disk view self-heals on the next GCS fetch or via the + * `indexMutated` propagation on the next `pushChanged`. */ async pullIndex(): Promise { // Check local cache freshness @@ -94,6 +166,8 @@ export class GcsCacheSyncService implements DatastoreSyncService { if (ageMs < INDEX_CACHE_TTL_MS && this.index === null) { const content = await Deno.readTextFile(this.indexPath); this.index = JSON.parse(content) as DatastoreIndex; + // Scrub zombies from the in-memory view before returning. + this.indexMutated ||= this.scrubIndex(); return; } } catch { @@ -105,7 +179,18 @@ export class GcsCacheSyncService implements DatastoreSyncService { const text = new TextDecoder().decode(data); this.index = JSON.parse(text) as DatastoreIndex; await ensureDir(this.cachePath); - await atomicWriteTextFile(this.indexPath, text); + // Scrub zombies, then write the local cache file. If scrub + // mutated, write the cleaned JSON so on-disk matches in-memory. + // Otherwise write the raw remote text to preserve the fast path. + if (this.scrubIndex()) { + this.indexMutated = true; + await atomicWriteTextFile( + this.indexPath, + JSON.stringify(this.index, null, 2), + ); + } else { + await atomicWriteTextFile(this.indexPath, text); + } } catch { // No index exists yet — start fresh this.index = { @@ -134,6 +219,12 @@ export class GcsCacheSyncService implements DatastoreSyncService { const toPull: string[] = []; for (const [rel, entry] of Object.entries(this.index?.entries ?? {})) { + // Belt-and-suspenders: `scrubIndex` already removed internal + // entries in `pullIndex`, but if anything re-adds a zombie + // between the scrub and the walk, this guard still catches it. + if (isInternalCacheFile(rel)) { + continue; + } const localPath = assertSafePath(this.cachePath, rel); try { const stat = await Deno.stat(localPath); @@ -216,11 +307,8 @@ export class GcsCacheSyncService implements DatastoreSyncService { }) ) { const rel = relative(this.cachePath, entry.path); - // Skip internal metadata files - if ( - rel === ".datastore-index.json" || rel === ".push-queue.json" || - rel === ".datastore.lock" - ) { + // Skip internal metadata files (see isInternalCacheFile) + if (isInternalCacheFile(rel)) { continue; } @@ -268,13 +356,18 @@ export class GcsCacheSyncService implements DatastoreSyncService { ); } - // Push updated index if anything changed - if (pushed > 0 && this.index) { - this.index.lastPulled = new Date().toISOString(); + // Push updated index if anything changed — either new files were + // pushed OR scrubIndex removed zombie entries that need to + // propagate to the remote (swamp-club#29 migration path). + if ((pushed > 0 || this.indexMutated) && this.index) { + if (pushed > 0) { + this.index.lastPulled = new Date().toISOString(); + } const indexJson = JSON.stringify(this.index, null, 2); const indexData = new TextEncoder().encode(indexJson); await this.gcs.putObject(".datastore-index.json", indexData); await atomicWriteTextFile(this.indexPath, indexJson); + this.indexMutated = false; } return pushed; @@ -292,5 +385,8 @@ export class GcsCacheSyncService implements DatastoreSyncService { entries: {}, }; } + // Scrub zombies from the in-memory view so the rest of + // `pushChanged` operates on a clean index. See swamp-club#29. + this.indexMutated ||= this.scrubIndex(); } } diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts new file mode 100644 index 000000000..7149c014f --- /dev/null +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts @@ -0,0 +1,464 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +// Tests for GcsCacheSyncService focusing on the swamp-club#29 fix: +// mirrors s3_cache_sync_test.ts behavior since the GCS sync service +// shares the identical internal-file exclusion pattern and scrub +// logic. See swamp-club#29 for the motivating bug. + +import { assertEquals, assertExists } from "jsr:@std/assert@1.0.19"; +import { join } from "jsr:@std/path@1"; +import { GcsCacheSyncService } from "./gcs_cache_sync.ts"; +import type { GcsClient } from "./gcs_client.ts"; + +/** Captured putObject call for test assertions. */ +interface PutCall { + key: string; + body: Uint8Array; +} + +/** In-memory mock of GcsClient recording getObject/putObject calls. */ +function createMockGcsClient(): GcsClient & { + storage: Map; + puts: PutCall[]; + gets: string[]; +} { + const storage = new Map(); + const puts: PutCall[] = []; + const gets: string[] = []; + + return { + storage, + puts, + gets, + + putObject(key: string, body: Uint8Array): Promise<{ generation: string }> { + storage.set(key, body); + puts.push({ key, body }); + return Promise.resolve({ generation: "1" }); + }, + + getObject(key: string): Promise { + gets.push(key); + const data = storage.get(key); + if (!data) return Promise.reject(new Error(`NoSuchKey: ${key}`)); + return Promise.resolve(data); + }, + } as unknown as GcsClient & { + storage: Map; + puts: PutCall[]; + gets: string[]; + }; +} + +/** Seeds a file inside the cache directory, creating parent dirs. */ +async function seedFile( + cachePath: string, + relPath: string, + contents: string, +): Promise { + const full = join(cachePath, relPath); + const parent = full.substring(0, full.lastIndexOf("/")); + if (parent && parent !== cachePath) { + await Deno.mkdir(parent, { recursive: true }); + } else { + await Deno.mkdir(cachePath, { recursive: true }); + } + await Deno.writeTextFile(full, contents); +} + +/** Encodes an index object as a remote .datastore-index.json body. */ +function encodeIndex( + entries: Record, +): Uint8Array { + return new TextEncoder().encode( + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries, + }), + ); +} + +/** Decodes a recorded putObject body as a parsed index object. */ +function decodeIndex(body: Uint8Array): { + entries: Record; +} { + return JSON.parse(new TextDecoder().decode(body)); +} + +/** Type assertion helper for reaching private instance state in tests. */ +function privateState(service: GcsCacheSyncService): { + index: { entries: Record } | null; + indexMutated: boolean; +} { + return service as unknown as { + index: { entries: Record } | null; + indexMutated: boolean; + }; +} + +// -- (a) push skips _catalog.db, _catalog.db-shm, _catalog.db-wal ---------- + +Deno.test("pushChanged: skips _catalog.db and its WAL/SHM sidecars", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-a-" }); + try { + const mock = createMockGcsClient(); + const service = new GcsCacheSyncService(mock, cachePath); + + await seedFile(cachePath, "data/@my-model/payload.yaml", "name: x\n"); + await seedFile(cachePath, "data/_catalog.db", "SQLITE-MAIN"); + await seedFile(cachePath, "data/_catalog.db-shm", "SQLITE-SHM"); + await seedFile(cachePath, "data/_catalog.db-wal", "SQLITE-WAL"); + + await service.pushChanged(); + + const uploadedKeys = mock.puts.map((p) => p.key); + assertEquals( + uploadedKeys.includes("data/@my-model/payload.yaml"), + true, + "legitimate payload should be pushed", + ); + for (const key of uploadedKeys) { + assertEquals( + key.includes("_catalog.db"), + false, + `catalog file leaked: ${key}`, + ); + } + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (b) push still skips the pre-existing three internal files ----------- + +Deno.test("pushChanged: still skips .datastore-index.json/.push-queue.json/.datastore.lock", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-b-" }); + try { + const mock = createMockGcsClient(); + const service = new GcsCacheSyncService(mock, cachePath); + + await seedFile(cachePath, "data/ok.yaml", "ok: true\n"); + await seedFile( + cachePath, + ".datastore-index.json", + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: {}, + }), + ); + await seedFile(cachePath, ".push-queue.json", "{}"); + await seedFile(cachePath, ".datastore.lock", "lock"); + + await service.pushChanged(); + + const uploadedKeys = mock.puts.map((p) => p.key); + assertEquals( + uploadedKeys.filter((k) => k === ".push-queue.json").length, + 0, + ); + assertEquals( + uploadedKeys.filter((k) => k === ".datastore.lock").length, + 0, + ); + assertEquals(uploadedKeys.includes("data/ok.yaml"), true); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (c) walker skip is a safety net beyond scrub ------------------------- + +Deno.test("pullChanged walker: skips zombies even if re-injected post-scrub", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-c-" }); + try { + const mock = createMockGcsClient(); + mock.storage.set( + ".datastore-index.json", + encodeIndex({ + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 8, + lastModified: new Date().toISOString(), + }, + }), + ); + await seedFile(cachePath, "data/@m/ok.yaml", "ok: yes\n"); + + const service = new GcsCacheSyncService(mock, cachePath); + await service.pullIndex(); + + const state = privateState(service); + assertExists(state.index); + state.index.entries["data/_catalog.db-wal"] = { + key: "data/_catalog.db-wal", + size: 1024, + lastModified: new Date().toISOString(), + }; + + await service.pullChanged(); + + assertEquals( + mock.gets.includes("data/_catalog.db-wal"), + false, + "walker must skip zombies regardless of whether scrub caught them", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (d) pull skips zombies from the remote index ------------------------- + +Deno.test("pullChanged: skips zombie _catalog.db* entries from remote index", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-d-" }); + try { + const mock = createMockGcsClient(); + mock.storage.set( + ".datastore-index.json", + encodeIndex({ + "data/@m/payload.yaml": { + key: "data/@m/payload.yaml", + size: 5, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 999, + lastModified: new Date().toISOString(), + }, + }), + ); + mock.storage.set( + "data/@m/payload.yaml", + new TextEncoder().encode("hi!\n\n"), + ); + + const service = new GcsCacheSyncService(mock, cachePath); + await service.pullChanged(); + + assertEquals( + mock.gets.includes("data/@m/payload.yaml"), + true, + "legitimate payload must be fetched", + ); + assertEquals( + mock.gets.includes("data/_catalog.db-wal"), + false, + "zombie must not be fetched", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (e1) pullIndex GCS-fetch path scrubs in-memory AND rewrites local ---- + +Deno.test("pullIndex GCS-fetch path: scrubs in-memory and rewrites local cache file", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-e1-" }); + try { + const mock = createMockGcsClient(); + const polluted = { + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 3, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db": { + key: "data/_catalog.db", + size: 100, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 200, + lastModified: new Date().toISOString(), + }, + }; + mock.storage.set(".datastore-index.json", encodeIndex(polluted)); + + const service = new GcsCacheSyncService(mock, cachePath); + await service.pullIndex(); + + const state = privateState(service); + assertExists(state.index); + assertEquals( + Object.keys(state.index.entries).sort(), + ["data/@m/ok.yaml"], + "in-memory index should contain only the legitimate entry", + ); + assertEquals( + state.indexMutated, + true, + "indexMutated must be set after scrubbing zombies", + ); + + const localFile = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + const localParsed = JSON.parse(localFile) as { + entries: Record; + }; + assertEquals( + Object.keys(localParsed.entries).sort(), + ["data/@m/ok.yaml"], + "local cache file must match scrubbed in-memory view", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (e2) pullIndex cache-hit path scrubs in-memory, pushChanged propagates - + +Deno.test("pullIndex cache-hit path: scrubs in-memory, next pushChanged propagates to remote", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-e2-" }); + try { + const mock = createMockGcsClient(); + + const pollutedJson = JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: { + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 3, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 42, + lastModified: new Date().toISOString(), + }, + }, + }); + await seedFile(cachePath, ".datastore-index.json", pollutedJson); + + const service = new GcsCacheSyncService(mock, cachePath); + await service.pullIndex(); + + assertEquals( + mock.gets.includes(".datastore-index.json"), + false, + "cache-hit branch must not fetch the index from GCS", + ); + + const state = privateState(service); + assertExists(state.index); + assertEquals(Object.keys(state.index.entries).sort(), ["data/@m/ok.yaml"]); + assertEquals(state.indexMutated, true); + + const diskJson = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + assertEquals( + diskJson, + pollutedJson, + "cache-hit branch must not rewrite the local index file", + ); + + await service.pushChanged(); + + const indexPuts = mock.puts.filter((p) => + p.key === ".datastore-index.json" + ); + assertEquals( + indexPuts.length, + 1, + "scrubbed index must be pushed exactly once via indexMutated", + ); + const pushedIndex = decodeIndex(indexPuts[0].body); + assertEquals( + Object.keys(pushedIndex.entries).sort(), + ["data/@m/ok.yaml"], + "pushed index must contain only the legitimate entry", + ); + assertEquals( + state.indexMutated, + false, + "indexMutated must reset after write-back", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (f) indexMutated propagates on no-op push and resets ------------------ + +Deno.test("pushChanged: no-op push still propagates scrubbed index, second call is quiet", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-f-" }); + try { + const mock = createMockGcsClient(); + + await seedFile( + cachePath, + ".datastore-index.json", + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: { + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 42, + lastModified: new Date().toISOString(), + }, + }, + }), + ); + + const service = new GcsCacheSyncService(mock, cachePath); + + await service.pushChanged(); + + const firstIndexPuts = mock.puts.filter( + (p) => p.key === ".datastore-index.json", + ); + assertEquals( + firstIndexPuts.length, + 1, + "first pushChanged must write scrubbed index despite pushed=0", + ); + const firstPushed = decodeIndex(firstIndexPuts[0].body); + assertEquals( + Object.keys(firstPushed.entries).length, + 0, + "scrubbed index must contain no zombie entries", + ); + + await service.pushChanged(); + + const secondIndexPuts = mock.puts.filter( + (p) => p.key === ".datastore-index.json", + ); + assertEquals( + secondIndexPuts.length, + 1, + "second pushChanged must NOT rewrite the index — flag must reset", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); diff --git a/datastore/gcs/manifest.yaml b/datastore/gcs/manifest.yaml index f44aeccda..45b34c090 100644 --- a/datastore/gcs/manifest.yaml +++ b/datastore/gcs/manifest.yaml @@ -1,6 +1,6 @@ manifestVersion: 1 name: "@swamp/gcs-datastore" -version: "2026.03.31.1" +version: "2026.04.08.1" description: | Store data in a Google Cloud Storage bucket with local cache synchronization. Provides distributed locking via GCS generation-based preconditions and diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts index 0d6d88d01..3b42e8d39 100644 --- a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts @@ -48,6 +48,40 @@ function assertSafePath(cachePath: string, relativePath: string): string { return resolved; } +/** + * Returns true for files that live inside the cache directory but must + * NOT cross the sync boundary in either direction (push or pull). + * + * Excluded patterns: + * - `.datastore-index.json` — the remote index manifest itself; pulled + * and pushed via dedicated code paths, never as a walked payload. + * - `.push-queue.json` — local push-queue scratch file. + * - `.datastore.lock` — distributed lock file; managed by the lock + * subsystem, must never be uploaded as data. + * - basename `_catalog.db` and anything starting with `_catalog.db-` + * (the SQLite WAL/SHM/journal sidecars) — the local-only data catalog + * store. It is deliberately colocated with the data tier so it can + * be rebuilt from whatever the local cache holds, but the database + * itself is per-machine state and must never leak to the shared + * bucket. See swamp-club issue #29 for the bug this exclusion fixes: + * without it, `swamp datastore sync --push` would walk `_catalog.db*` + * into `toPush`, SQLite would rewrite the WAL mid-upload, and the + * push would fail on `_catalog.db-wal`. + * + * Uses basename matching for the catalog pattern so the filter is + * robust to any future change in the data tier subdirectory name. + */ +function isInternalCacheFile(rel: string): boolean { + if ( + rel === ".datastore-index.json" || rel === ".push-queue.json" || + rel === ".datastore.lock" + ) { + return true; + } + const base = rel.split("/").pop() ?? ""; + return base === "_catalog.db" || base.startsWith("_catalog.db-"); +} + /** Metadata index entry for a file in S3. */ interface IndexEntry { key: string; @@ -87,6 +121,13 @@ export class S3CacheSyncService implements DatastoreSyncService { private readonly indexPath: string; private readonly pushQueuePath: string; private index: DatastoreIndex | null = null; + /** + * Set to true when `scrubIndex()` removes zombie entries from the + * in-memory index. Drives the write-back gate in `pushChanged()` so + * that the cleaned index propagates to the remote even on a no-op + * push (no new files to upload). Reset after a successful write-back. + */ + private indexMutated = false; constructor(s3: S3Client, cachePath: string) { this.s3 = s3; @@ -95,10 +136,41 @@ export class S3CacheSyncService implements DatastoreSyncService { this.pushQueuePath = join(cachePath, ".push-queue.json"); } + /** + * Removes zombie internal-file entries from the in-memory index. + * Runs whenever the index is populated from disk or remote so the + * invariant "internal files never cross the sync boundary" is + * enforced at the persistence boundary. + * + * Returns true if any entries were removed. See + * `isInternalCacheFile` for the exclusion criteria and swamp-club + * issue #29 for the motivating bug. + */ + private scrubIndex(): boolean { + if (!this.index || !this.index.entries) return false; + let mutated = false; + for (const rel of Object.keys(this.index.entries)) { + if (isInternalCacheFile(rel)) { + delete this.index.entries[rel]; + mutated = true; + } + } + return mutated; + } + /** * Pulls the metadata index from S3 (lightweight, single GET). * Uses a local cache with a 60-second TTL to avoid redundant fetches * during rapid command sequences. + * + * Both entry paths (cache-hit and S3-fetch) scrub the in-memory + * index after parsing so zombie internal-file entries never reach + * the rest of the sync pipeline (see swamp-club#29). On the + * S3-fetch path the local cache file is rewritten with the scrubbed + * JSON when scrub mutated — keeping the on-disk and in-memory views + * consistent. The cache-hit path does NOT rewrite the local file; + * its on-disk view self-heals on the next S3 fetch or via the + * `indexMutated` propagation on the next `pushChanged`. */ async pullIndex(): Promise { // Check local cache freshness @@ -108,6 +180,8 @@ export class S3CacheSyncService implements DatastoreSyncService { if (ageMs < INDEX_CACHE_TTL_MS && this.index === null) { const content = await Deno.readTextFile(this.indexPath); this.index = JSON.parse(content) as DatastoreIndex; + // Scrub zombies from the in-memory view before returning. + this.indexMutated ||= this.scrubIndex(); return; // Fresh enough — skip S3 } } catch { @@ -119,7 +193,19 @@ export class S3CacheSyncService implements DatastoreSyncService { const text = new TextDecoder().decode(data); this.index = JSON.parse(text) as DatastoreIndex; await ensureDir(this.cachePath); - await atomicWriteTextFile(this.indexPath, text); + // Scrub zombies, then write the local cache file. If scrub + // mutated, write the cleaned JSON so on-disk matches in-memory. + // Otherwise write the raw remote text to preserve the fast path + // (no re-serialization cost). + if (this.scrubIndex()) { + this.indexMutated = true; + await atomicWriteTextFile( + this.indexPath, + JSON.stringify(this.index, null, 2), + ); + } else { + await atomicWriteTextFile(this.indexPath, text); + } } catch { // No index exists yet - start fresh this.index = { @@ -149,6 +235,12 @@ export class S3CacheSyncService implements DatastoreSyncService { // Build list of files that need pulling const toPull: string[] = []; for (const [rel, entry] of Object.entries(this.index?.entries ?? {})) { + // Belt-and-suspenders: `scrubIndex` already removed internal + // entries in `pullIndex`, but if anything re-adds a zombie + // between the scrub and the walk, this guard still catches it. + if (isInternalCacheFile(rel)) { + continue; + } const localPath = assertSafePath(this.cachePath, rel); try { const stat = await Deno.stat(localPath); @@ -248,11 +340,8 @@ export class S3CacheSyncService implements DatastoreSyncService { ) { totalFiles++; const rel = relative(this.cachePath, entry.path); - // Skip internal metadata files - if ( - rel === ".datastore-index.json" || rel === ".push-queue.json" || - rel === ".datastore.lock" - ) { + // Skip internal metadata files (see isInternalCacheFile) + if (isInternalCacheFile(rel)) { continue; } @@ -320,14 +409,19 @@ export class S3CacheSyncService implements DatastoreSyncService { ); } - // Push updated index if anything changed. - if (pushed > 0 && this.index) { - this.index.lastPulled = new Date().toISOString(); + // Push updated index if anything changed — either new files were + // pushed OR scrubIndex removed zombie entries that need to + // propagate to the remote (swamp-club#29 migration path). + if ((pushed > 0 || this.indexMutated) && this.index) { + if (pushed > 0) { + this.index.lastPulled = new Date().toISOString(); + } const indexJson = JSON.stringify(this.index, null, 2); const indexData = new TextEncoder().encode(indexJson); await this.s3.putObject(".datastore-index.json", indexData); // Also update the local cache await atomicWriteTextFile(this.indexPath, indexJson); + this.indexMutated = false; } return pushed; @@ -345,5 +439,8 @@ export class S3CacheSyncService implements DatastoreSyncService { entries: {}, }; } + // Scrub zombies from the in-memory view so the rest of + // `pushChanged` operates on a clean index. See swamp-club#29. + this.indexMutated ||= this.scrubIndex(); } } diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts new file mode 100644 index 000000000..eabc29348 --- /dev/null +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts @@ -0,0 +1,504 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +// Tests for S3CacheSyncService focusing on the swamp-club#29 fix: +// the sync walker and index scrub must keep internal files +// (including the SQLite catalog at `_catalog.db*`) from crossing the +// sync boundary in either direction, and zombie entries from pre-fix +// remote indexes must self-heal via the `indexMutated` flag. + +import { assertEquals, assertExists } from "jsr:@std/assert@1.0.19"; +import { join } from "jsr:@std/path@1"; +import { S3CacheSyncService } from "./s3_cache_sync.ts"; +import type { S3Client } from "./s3_client.ts"; + +/** Captured putObject call for test assertions. */ +interface PutCall { + key: string; + body: Uint8Array; +} + +/** In-memory mock of S3Client recording getObject/putObject calls. */ +function createMockS3Client(): S3Client & { + storage: Map; + puts: PutCall[]; + gets: string[]; +} { + const storage = new Map(); + const puts: PutCall[] = []; + const gets: string[] = []; + + return { + storage, + puts, + gets, + + putObject(key: string, body: Uint8Array): Promise { + storage.set(key, body); + puts.push({ key, body }); + return Promise.resolve(); + }, + + getObject(key: string): Promise { + gets.push(key); + const data = storage.get(key); + if (!data) return Promise.reject(new Error(`NoSuchKey: ${key}`)); + return Promise.resolve(data); + }, + } as unknown as S3Client & { + storage: Map; + puts: PutCall[]; + gets: string[]; + }; +} + +/** Seeds a file inside the cache directory, creating parent dirs. */ +async function seedFile( + cachePath: string, + relPath: string, + contents: string, +): Promise { + const full = join(cachePath, relPath); + const parent = full.substring(0, full.lastIndexOf("/")); + if (parent && parent !== cachePath) { + await Deno.mkdir(parent, { recursive: true }); + } else { + await Deno.mkdir(cachePath, { recursive: true }); + } + await Deno.writeTextFile(full, contents); +} + +/** Encodes an index object as a remote .datastore-index.json body. */ +function encodeIndex( + entries: Record, +): Uint8Array { + return new TextEncoder().encode( + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries, + }), + ); +} + +/** Decodes a recorded putObject body as a parsed index object. */ +function decodeIndex(body: Uint8Array): { + entries: Record; +} { + return JSON.parse(new TextDecoder().decode(body)); +} + +/** Type assertion helper for reaching private instance state in tests. */ +function privateState(service: S3CacheSyncService): { + index: { entries: Record } | null; + indexMutated: boolean; +} { + return service as unknown as { + index: { entries: Record } | null; + indexMutated: boolean; + }; +} + +// -- (a) push skips _catalog.db, _catalog.db-shm, _catalog.db-wal ---------- + +Deno.test("pushChanged: skips _catalog.db and its WAL/SHM sidecars", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-a-" }); + try { + const mock = createMockS3Client(); + const service = new S3CacheSyncService(mock, cachePath); + + // Seed legitimate payload plus catalog files plus internal files. + await seedFile(cachePath, "data/@my-model/payload.yaml", "name: x\n"); + await seedFile(cachePath, "data/_catalog.db", "SQLITE-MAIN"); + await seedFile(cachePath, "data/_catalog.db-shm", "SQLITE-SHM"); + await seedFile(cachePath, "data/_catalog.db-wal", "SQLITE-WAL"); + + await service.pushChanged(); + + // Expect exactly two puts: the legitimate payload plus the + // index write-back. Nothing matching _catalog.db*. + const uploadedKeys = mock.puts.map((p) => p.key); + assertEquals( + uploadedKeys.includes("data/@my-model/payload.yaml"), + true, + "legitimate payload should be pushed", + ); + for (const key of uploadedKeys) { + assertEquals( + key.includes("_catalog.db"), + false, + `catalog file leaked: ${key}`, + ); + } + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (b) push still skips the pre-existing three internal files ----------- + +Deno.test("pushChanged: still skips .datastore-index.json/.push-queue.json/.datastore.lock", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-b-" }); + try { + const mock = createMockS3Client(); + const service = new S3CacheSyncService(mock, cachePath); + + // Seed a valid empty index so loadIndex succeeds, then seed the + // other two internal files as raw walk-targets. The assertion is + // that the walker skips them, not that loadIndex parses them. + await seedFile(cachePath, "data/ok.yaml", "ok: true\n"); + await seedFile( + cachePath, + ".datastore-index.json", + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: {}, + }), + ); + await seedFile(cachePath, ".push-queue.json", "{}"); + await seedFile(cachePath, ".datastore.lock", "lock"); + + await service.pushChanged(); + + const uploadedKeys = mock.puts.map((p) => p.key); + // Only the payload plus the index write-back. Internal files + // must never appear as standalone put targets. + assertEquals( + uploadedKeys.filter((k) => k === ".push-queue.json").length, + 0, + ); + assertEquals( + uploadedKeys.filter((k) => k === ".datastore.lock").length, + 0, + ); + assertEquals(uploadedKeys.includes("data/ok.yaml"), true); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (c) walker skip is a safety net beyond scrub ------------------------- + +Deno.test("pullChanged walker: skips zombies even if re-injected post-scrub", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-c-" }); + try { + const mock = createMockS3Client(); + // Seed a CLEAN remote index (no zombies). pullIndex will fetch this, + // scrub is a no-op, then we manually inject a zombie entry before + // the walker runs to prove the walker's own skip is load-bearing. + mock.storage.set( + ".datastore-index.json", + encodeIndex({ + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 8, + lastModified: new Date().toISOString(), + }, + }), + ); + // Pre-populate the legitimate local file so pullChanged doesn't + // have to fetch it (size match → skip). + await seedFile(cachePath, "data/@m/ok.yaml", "ok: yes\n"); + + const service = new S3CacheSyncService(mock, cachePath); + // Force pullIndex first so we can mutate the in-memory index + // AFTER scrub has already run. + await service.pullIndex(); + + // Inject a zombie entry directly into the in-memory index, bypassing + // scrub. The walker must still skip this via isInternalCacheFile. + const state = privateState(service); + assertExists(state.index); + state.index.entries["data/_catalog.db-wal"] = { + key: "data/_catalog.db-wal", + size: 1024, + lastModified: new Date().toISOString(), + }; + + await service.pullChanged(); + + // The walker must not have tried to fetch the injected zombie. + assertEquals( + mock.gets.includes("data/_catalog.db-wal"), + false, + "walker must skip zombies regardless of whether scrub caught them", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (d) pull skips zombies from the remote index ------------------------- + +Deno.test("pullChanged: skips zombie _catalog.db* entries from remote index", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-d-" }); + try { + const mock = createMockS3Client(); + // Remote index contains both a legitimate entry and a zombie. + mock.storage.set( + ".datastore-index.json", + encodeIndex({ + "data/@m/payload.yaml": { + key: "data/@m/payload.yaml", + size: 5, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 999, + lastModified: new Date().toISOString(), + }, + }), + ); + // Fake remote payload so pullChanged's fetch succeeds. + mock.storage.set( + "data/@m/payload.yaml", + new TextEncoder().encode("hi!\n\n"), + ); + + const service = new S3CacheSyncService(mock, cachePath); + await service.pullChanged(); + + assertEquals( + mock.gets.includes("data/@m/payload.yaml"), + true, + "legitimate payload must be fetched", + ); + assertEquals( + mock.gets.includes("data/_catalog.db-wal"), + false, + "zombie must not be fetched", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (e1) pullIndex S3-fetch path scrubs in-memory AND rewrites local ---- + +Deno.test("pullIndex S3-fetch path: scrubs in-memory and rewrites local cache file", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-e1-" }); + try { + const mock = createMockS3Client(); + // Remote index has zombies. No local cache file yet → forces the + // S3-fetch branch. + const polluted = { + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 3, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db": { + key: "data/_catalog.db", + size: 100, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 200, + lastModified: new Date().toISOString(), + }, + }; + mock.storage.set(".datastore-index.json", encodeIndex(polluted)); + + const service = new S3CacheSyncService(mock, cachePath); + await service.pullIndex(); + + // In-memory state: zombies gone. + const state = privateState(service); + assertExists(state.index); + assertEquals( + Object.keys(state.index.entries).sort(), + ["data/@m/ok.yaml"], + "in-memory index should contain only the legitimate entry", + ); + assertEquals( + state.indexMutated, + true, + "indexMutated must be set after scrubbing zombies", + ); + + // Local cache file at /.datastore-index.json must + // have been rewritten with the scrubbed JSON, not the raw + // polluted text. + const localFile = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + const localParsed = JSON.parse(localFile) as { + entries: Record; + }; + assertEquals( + Object.keys(localParsed.entries).sort(), + ["data/@m/ok.yaml"], + "local cache file must match scrubbed in-memory view", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (e2) pullIndex cache-hit path scrubs in-memory, pushChanged propagates - + +Deno.test("pullIndex cache-hit path: scrubs in-memory, next pushChanged propagates to remote", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-e2-" }); + try { + const mock = createMockS3Client(); + + // Pre-seed a POLLUTED local cache file so pullIndex takes the + // cache-hit branch. Remote index is left empty — we must never + // call getObject for it on this path. + const pollutedJson = JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: { + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 3, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 42, + lastModified: new Date().toISOString(), + }, + }, + }); + await seedFile(cachePath, ".datastore-index.json", pollutedJson); + + const service = new S3CacheSyncService(mock, cachePath); + await service.pullIndex(); + + // Must have taken the cache-hit branch (no remote fetch). + assertEquals( + mock.gets.includes(".datastore-index.json"), + false, + "cache-hit branch must not fetch the index from S3", + ); + + // In-memory state is scrubbed. + const state = privateState(service); + assertExists(state.index); + assertEquals(Object.keys(state.index.entries).sort(), ["data/@m/ok.yaml"]); + assertEquals(state.indexMutated, true); + + // The on-disk file is NOT rewritten by the cache-hit branch. + const diskJson = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + assertEquals( + diskJson, + pollutedJson, + "cache-hit branch must not rewrite the local index file", + ); + + // Now push against a cache with no new payload files. The + // indexMutated flag must trigger a write-back of the scrubbed + // index to the remote. + await service.pushChanged(); + + const indexPuts = mock.puts.filter((p) => + p.key === ".datastore-index.json" + ); + assertEquals( + indexPuts.length, + 1, + "scrubbed index must be pushed exactly once via indexMutated", + ); + const pushedIndex = decodeIndex(indexPuts[0].body); + assertEquals( + Object.keys(pushedIndex.entries).sort(), + ["data/@m/ok.yaml"], + "pushed index must contain only the legitimate entry", + ); + assertEquals( + state.indexMutated, + false, + "indexMutated must reset after write-back", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (f) indexMutated propagates on no-op push and resets ------------------ + +Deno.test("pushChanged: no-op push still propagates scrubbed index, second call is quiet", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-f-" }); + try { + const mock = createMockS3Client(); + + // Seed a polluted local index. No remote index — loadIndex reads + // straight from disk. + await seedFile( + cachePath, + ".datastore-index.json", + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: { + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 42, + lastModified: new Date().toISOString(), + }, + }, + }), + ); + + const service = new S3CacheSyncService(mock, cachePath); + + // First push: no files to push, but scrub triggered the flag. + await service.pushChanged(); + + const firstIndexPuts = mock.puts.filter( + (p) => p.key === ".datastore-index.json", + ); + assertEquals( + firstIndexPuts.length, + 1, + "first pushChanged must write scrubbed index despite pushed=0", + ); + const firstPushed = decodeIndex(firstIndexPuts[0].body); + assertEquals( + Object.keys(firstPushed.entries).length, + 0, + "scrubbed index must contain no zombie entries", + ); + + // Second push on the same instance: flag is reset, nothing to do. + await service.pushChanged(); + + const secondIndexPuts = mock.puts.filter( + (p) => p.key === ".datastore-index.json", + ); + assertEquals( + secondIndexPuts.length, + 1, + "second pushChanged must NOT rewrite the index — flag must reset", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); diff --git a/datastore/s3/manifest.yaml b/datastore/s3/manifest.yaml index b713f87cd..6da36858f 100644 --- a/datastore/s3/manifest.yaml +++ b/datastore/s3/manifest.yaml @@ -1,6 +1,6 @@ manifestVersion: 1 name: "@swamp/s3-datastore" -version: "2026.04.03.1" +version: "2026.04.08.1" description: | Store data in an Amazon S3 bucket with local cache synchronization. Provides distributed locking via S3 conditional writes and bidirectional