diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts index 517c0853f..9e2595500 100644 --- a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts @@ -48,6 +48,40 @@ function assertSafePath(cachePath: string, relativePath: string): string { return resolved; } +/** + * Returns true for files that live inside the cache directory but must + * NOT cross the sync boundary in either direction (push or pull). + * + * Excluded patterns: + * - `.datastore-index.json` — the remote index manifest itself; pulled + * and pushed via dedicated code paths, never as a walked payload. + * - `.push-queue.json` — local push-queue scratch file. + * - `.datastore.lock` — distributed lock file; managed by the lock + * subsystem, must never be uploaded as data. + * - basename `_catalog.db` and anything starting with `_catalog.db-` + * (the SQLite WAL/SHM/journal sidecars) — the local-only data catalog + * store. It is deliberately colocated with the data tier so it can + * be rebuilt from whatever the local cache holds, but the database + * itself is per-machine state and must never leak to the shared + * bucket. See swamp-club issue #29 for the bug this exclusion fixes: + * without it, `swamp datastore sync --push` would walk `_catalog.db*` + * into `toPush`, SQLite would rewrite the WAL mid-upload, and the + * push would fail on `_catalog.db-wal`. + * + * Uses basename matching for the catalog pattern so the filter is + * robust to any future change in the data tier subdirectory name. + */ +function isInternalCacheFile(rel: string): boolean { + if ( + rel === ".datastore-index.json" || rel === ".push-queue.json" || + rel === ".datastore.lock" + ) { + return true; + } + const base = rel.split("/").pop() ?? ""; + return base === "_catalog.db" || base.startsWith("_catalog.db-"); +} + /** Metadata index entry for a file in GCS. */ interface IndexEntry { key: string; @@ -75,6 +109,13 @@ export class GcsCacheSyncService implements DatastoreSyncService { private readonly cachePath: string; private readonly indexPath: string; private index: DatastoreIndex | null = null; + /** + * Set to true when `scrubIndex()` removes zombie entries from the + * in-memory index. Drives the write-back gate in `pushChanged()` so + * that the cleaned index propagates to the remote even on a no-op + * push (no new files to upload). Reset after a successful write-back. + */ + private indexMutated = false; constructor(gcs: GcsClient, cachePath: string) { this.gcs = gcs; @@ -82,9 +123,40 @@ export class GcsCacheSyncService implements DatastoreSyncService { this.indexPath = join(cachePath, ".datastore-index.json"); } + /** + * Removes zombie internal-file entries from the in-memory index. + * Runs whenever the index is populated from disk or remote so the + * invariant "internal files never cross the sync boundary" is + * enforced at the persistence boundary. + * + * Returns true if any entries were removed. See + * `isInternalCacheFile` for the exclusion criteria and swamp-club + * issue #29 for the motivating bug. + */ + private scrubIndex(): boolean { + if (!this.index || !this.index.entries) return false; + let mutated = false; + for (const rel of Object.keys(this.index.entries)) { + if (isInternalCacheFile(rel)) { + delete this.index.entries[rel]; + mutated = true; + } + } + return mutated; + } + /** * Pulls the metadata index from GCS (lightweight, single GET). * Uses a local cache with a 60-second TTL to avoid redundant fetches. + * + * Both entry paths (cache-hit and GCS-fetch) scrub the in-memory + * index after parsing so zombie internal-file entries never reach + * the rest of the sync pipeline (see swamp-club#29). On the + * GCS-fetch path the local cache file is rewritten with the scrubbed + * JSON when scrub mutated — keeping the on-disk and in-memory views + * consistent. The cache-hit path does NOT rewrite the local file; + * its on-disk view self-heals on the next GCS fetch or via the + * `indexMutated` propagation on the next `pushChanged`. */ async pullIndex(): Promise { // Check local cache freshness @@ -94,6 +166,8 @@ export class GcsCacheSyncService implements DatastoreSyncService { if (ageMs < INDEX_CACHE_TTL_MS && this.index === null) { const content = await Deno.readTextFile(this.indexPath); this.index = JSON.parse(content) as DatastoreIndex; + // Scrub zombies from the in-memory view before returning. + this.indexMutated ||= this.scrubIndex(); return; } } catch { @@ -105,7 +179,18 @@ export class GcsCacheSyncService implements DatastoreSyncService { const text = new TextDecoder().decode(data); this.index = JSON.parse(text) as DatastoreIndex; await ensureDir(this.cachePath); - await atomicWriteTextFile(this.indexPath, text); + // Scrub zombies, then write the local cache file. If scrub + // mutated, write the cleaned JSON so on-disk matches in-memory. + // Otherwise write the raw remote text to preserve the fast path. + if (this.scrubIndex()) { + this.indexMutated = true; + await atomicWriteTextFile( + this.indexPath, + JSON.stringify(this.index, null, 2), + ); + } else { + await atomicWriteTextFile(this.indexPath, text); + } } catch { // No index exists yet — start fresh this.index = { @@ -134,6 +219,12 @@ export class GcsCacheSyncService implements DatastoreSyncService { const toPull: string[] = []; for (const [rel, entry] of Object.entries(this.index?.entries ?? {})) { + // Belt-and-suspenders: `scrubIndex` already removed internal + // entries in `pullIndex`, but if anything re-adds a zombie + // between the scrub and the walk, this guard still catches it. + if (isInternalCacheFile(rel)) { + continue; + } const localPath = assertSafePath(this.cachePath, rel); try { const stat = await Deno.stat(localPath); @@ -216,11 +307,8 @@ export class GcsCacheSyncService implements DatastoreSyncService { }) ) { const rel = relative(this.cachePath, entry.path); - // Skip internal metadata files - if ( - rel === ".datastore-index.json" || rel === ".push-queue.json" || - rel === ".datastore.lock" - ) { + // Skip internal metadata files (see isInternalCacheFile) + if (isInternalCacheFile(rel)) { continue; } @@ -268,13 +356,18 @@ export class GcsCacheSyncService implements DatastoreSyncService { ); } - // Push updated index if anything changed - if (pushed > 0 && this.index) { - this.index.lastPulled = new Date().toISOString(); + // Push updated index if anything changed — either new files were + // pushed OR scrubIndex removed zombie entries that need to + // propagate to the remote (swamp-club#29 migration path). + if ((pushed > 0 || this.indexMutated) && this.index) { + if (pushed > 0) { + this.index.lastPulled = new Date().toISOString(); + } const indexJson = JSON.stringify(this.index, null, 2); const indexData = new TextEncoder().encode(indexJson); await this.gcs.putObject(".datastore-index.json", indexData); await atomicWriteTextFile(this.indexPath, indexJson); + this.indexMutated = false; } return pushed; @@ -292,5 +385,8 @@ export class GcsCacheSyncService implements DatastoreSyncService { entries: {}, }; } + // Scrub zombies from the in-memory view so the rest of + // `pushChanged` operates on a clean index. See swamp-club#29. + this.indexMutated ||= this.scrubIndex(); } } diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts new file mode 100644 index 000000000..7149c014f --- /dev/null +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts @@ -0,0 +1,464 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +// Tests for GcsCacheSyncService focusing on the swamp-club#29 fix: +// mirrors s3_cache_sync_test.ts behavior since the GCS sync service +// shares the identical internal-file exclusion pattern and scrub +// logic. See swamp-club#29 for the motivating bug. + +import { assertEquals, assertExists } from "jsr:@std/assert@1.0.19"; +import { join } from "jsr:@std/path@1"; +import { GcsCacheSyncService } from "./gcs_cache_sync.ts"; +import type { GcsClient } from "./gcs_client.ts"; + +/** Captured putObject call for test assertions. */ +interface PutCall { + key: string; + body: Uint8Array; +} + +/** In-memory mock of GcsClient recording getObject/putObject calls. */ +function createMockGcsClient(): GcsClient & { + storage: Map; + puts: PutCall[]; + gets: string[]; +} { + const storage = new Map(); + const puts: PutCall[] = []; + const gets: string[] = []; + + return { + storage, + puts, + gets, + + putObject(key: string, body: Uint8Array): Promise<{ generation: string }> { + storage.set(key, body); + puts.push({ key, body }); + return Promise.resolve({ generation: "1" }); + }, + + getObject(key: string): Promise { + gets.push(key); + const data = storage.get(key); + if (!data) return Promise.reject(new Error(`NoSuchKey: ${key}`)); + return Promise.resolve(data); + }, + } as unknown as GcsClient & { + storage: Map; + puts: PutCall[]; + gets: string[]; + }; +} + +/** Seeds a file inside the cache directory, creating parent dirs. */ +async function seedFile( + cachePath: string, + relPath: string, + contents: string, +): Promise { + const full = join(cachePath, relPath); + const parent = full.substring(0, full.lastIndexOf("/")); + if (parent && parent !== cachePath) { + await Deno.mkdir(parent, { recursive: true }); + } else { + await Deno.mkdir(cachePath, { recursive: true }); + } + await Deno.writeTextFile(full, contents); +} + +/** Encodes an index object as a remote .datastore-index.json body. */ +function encodeIndex( + entries: Record, +): Uint8Array { + return new TextEncoder().encode( + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries, + }), + ); +} + +/** Decodes a recorded putObject body as a parsed index object. */ +function decodeIndex(body: Uint8Array): { + entries: Record; +} { + return JSON.parse(new TextDecoder().decode(body)); +} + +/** Type assertion helper for reaching private instance state in tests. */ +function privateState(service: GcsCacheSyncService): { + index: { entries: Record } | null; + indexMutated: boolean; +} { + return service as unknown as { + index: { entries: Record } | null; + indexMutated: boolean; + }; +} + +// -- (a) push skips _catalog.db, _catalog.db-shm, _catalog.db-wal ---------- + +Deno.test("pushChanged: skips _catalog.db and its WAL/SHM sidecars", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-a-" }); + try { + const mock = createMockGcsClient(); + const service = new GcsCacheSyncService(mock, cachePath); + + await seedFile(cachePath, "data/@my-model/payload.yaml", "name: x\n"); + await seedFile(cachePath, "data/_catalog.db", "SQLITE-MAIN"); + await seedFile(cachePath, "data/_catalog.db-shm", "SQLITE-SHM"); + await seedFile(cachePath, "data/_catalog.db-wal", "SQLITE-WAL"); + + await service.pushChanged(); + + const uploadedKeys = mock.puts.map((p) => p.key); + assertEquals( + uploadedKeys.includes("data/@my-model/payload.yaml"), + true, + "legitimate payload should be pushed", + ); + for (const key of uploadedKeys) { + assertEquals( + key.includes("_catalog.db"), + false, + `catalog file leaked: ${key}`, + ); + } + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (b) push still skips the pre-existing three internal files ----------- + +Deno.test("pushChanged: still skips .datastore-index.json/.push-queue.json/.datastore.lock", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-b-" }); + try { + const mock = createMockGcsClient(); + const service = new GcsCacheSyncService(mock, cachePath); + + await seedFile(cachePath, "data/ok.yaml", "ok: true\n"); + await seedFile( + cachePath, + ".datastore-index.json", + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: {}, + }), + ); + await seedFile(cachePath, ".push-queue.json", "{}"); + await seedFile(cachePath, ".datastore.lock", "lock"); + + await service.pushChanged(); + + const uploadedKeys = mock.puts.map((p) => p.key); + assertEquals( + uploadedKeys.filter((k) => k === ".push-queue.json").length, + 0, + ); + assertEquals( + uploadedKeys.filter((k) => k === ".datastore.lock").length, + 0, + ); + assertEquals(uploadedKeys.includes("data/ok.yaml"), true); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (c) walker skip is a safety net beyond scrub ------------------------- + +Deno.test("pullChanged walker: skips zombies even if re-injected post-scrub", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-c-" }); + try { + const mock = createMockGcsClient(); + mock.storage.set( + ".datastore-index.json", + encodeIndex({ + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 8, + lastModified: new Date().toISOString(), + }, + }), + ); + await seedFile(cachePath, "data/@m/ok.yaml", "ok: yes\n"); + + const service = new GcsCacheSyncService(mock, cachePath); + await service.pullIndex(); + + const state = privateState(service); + assertExists(state.index); + state.index.entries["data/_catalog.db-wal"] = { + key: "data/_catalog.db-wal", + size: 1024, + lastModified: new Date().toISOString(), + }; + + await service.pullChanged(); + + assertEquals( + mock.gets.includes("data/_catalog.db-wal"), + false, + "walker must skip zombies regardless of whether scrub caught them", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (d) pull skips zombies from the remote index ------------------------- + +Deno.test("pullChanged: skips zombie _catalog.db* entries from remote index", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-d-" }); + try { + const mock = createMockGcsClient(); + mock.storage.set( + ".datastore-index.json", + encodeIndex({ + "data/@m/payload.yaml": { + key: "data/@m/payload.yaml", + size: 5, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 999, + lastModified: new Date().toISOString(), + }, + }), + ); + mock.storage.set( + "data/@m/payload.yaml", + new TextEncoder().encode("hi!\n\n"), + ); + + const service = new GcsCacheSyncService(mock, cachePath); + await service.pullChanged(); + + assertEquals( + mock.gets.includes("data/@m/payload.yaml"), + true, + "legitimate payload must be fetched", + ); + assertEquals( + mock.gets.includes("data/_catalog.db-wal"), + false, + "zombie must not be fetched", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (e1) pullIndex GCS-fetch path scrubs in-memory AND rewrites local ---- + +Deno.test("pullIndex GCS-fetch path: scrubs in-memory and rewrites local cache file", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-e1-" }); + try { + const mock = createMockGcsClient(); + const polluted = { + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 3, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db": { + key: "data/_catalog.db", + size: 100, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 200, + lastModified: new Date().toISOString(), + }, + }; + mock.storage.set(".datastore-index.json", encodeIndex(polluted)); + + const service = new GcsCacheSyncService(mock, cachePath); + await service.pullIndex(); + + const state = privateState(service); + assertExists(state.index); + assertEquals( + Object.keys(state.index.entries).sort(), + ["data/@m/ok.yaml"], + "in-memory index should contain only the legitimate entry", + ); + assertEquals( + state.indexMutated, + true, + "indexMutated must be set after scrubbing zombies", + ); + + const localFile = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + const localParsed = JSON.parse(localFile) as { + entries: Record; + }; + assertEquals( + Object.keys(localParsed.entries).sort(), + ["data/@m/ok.yaml"], + "local cache file must match scrubbed in-memory view", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (e2) pullIndex cache-hit path scrubs in-memory, pushChanged propagates - + +Deno.test("pullIndex cache-hit path: scrubs in-memory, next pushChanged propagates to remote", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-e2-" }); + try { + const mock = createMockGcsClient(); + + const pollutedJson = JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: { + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 3, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 42, + lastModified: new Date().toISOString(), + }, + }, + }); + await seedFile(cachePath, ".datastore-index.json", pollutedJson); + + const service = new GcsCacheSyncService(mock, cachePath); + await service.pullIndex(); + + assertEquals( + mock.gets.includes(".datastore-index.json"), + false, + "cache-hit branch must not fetch the index from GCS", + ); + + const state = privateState(service); + assertExists(state.index); + assertEquals(Object.keys(state.index.entries).sort(), ["data/@m/ok.yaml"]); + assertEquals(state.indexMutated, true); + + const diskJson = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + assertEquals( + diskJson, + pollutedJson, + "cache-hit branch must not rewrite the local index file", + ); + + await service.pushChanged(); + + const indexPuts = mock.puts.filter((p) => + p.key === ".datastore-index.json" + ); + assertEquals( + indexPuts.length, + 1, + "scrubbed index must be pushed exactly once via indexMutated", + ); + const pushedIndex = decodeIndex(indexPuts[0].body); + assertEquals( + Object.keys(pushedIndex.entries).sort(), + ["data/@m/ok.yaml"], + "pushed index must contain only the legitimate entry", + ); + assertEquals( + state.indexMutated, + false, + "indexMutated must reset after write-back", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (f) indexMutated propagates on no-op push and resets ------------------ + +Deno.test("pushChanged: no-op push still propagates scrubbed index, second call is quiet", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-test-f-" }); + try { + const mock = createMockGcsClient(); + + await seedFile( + cachePath, + ".datastore-index.json", + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: { + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 42, + lastModified: new Date().toISOString(), + }, + }, + }), + ); + + const service = new GcsCacheSyncService(mock, cachePath); + + await service.pushChanged(); + + const firstIndexPuts = mock.puts.filter( + (p) => p.key === ".datastore-index.json", + ); + assertEquals( + firstIndexPuts.length, + 1, + "first pushChanged must write scrubbed index despite pushed=0", + ); + const firstPushed = decodeIndex(firstIndexPuts[0].body); + assertEquals( + Object.keys(firstPushed.entries).length, + 0, + "scrubbed index must contain no zombie entries", + ); + + await service.pushChanged(); + + const secondIndexPuts = mock.puts.filter( + (p) => p.key === ".datastore-index.json", + ); + assertEquals( + secondIndexPuts.length, + 1, + "second pushChanged must NOT rewrite the index — flag must reset", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); diff --git a/datastore/gcs/manifest.yaml b/datastore/gcs/manifest.yaml index f44aeccda..45b34c090 100644 --- a/datastore/gcs/manifest.yaml +++ b/datastore/gcs/manifest.yaml @@ -1,6 +1,6 @@ manifestVersion: 1 name: "@swamp/gcs-datastore" -version: "2026.03.31.1" +version: "2026.04.08.1" description: | Store data in a Google Cloud Storage bucket with local cache synchronization. Provides distributed locking via GCS generation-based preconditions and diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts index 0d6d88d01..3b42e8d39 100644 --- a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts @@ -48,6 +48,40 @@ function assertSafePath(cachePath: string, relativePath: string): string { return resolved; } +/** + * Returns true for files that live inside the cache directory but must + * NOT cross the sync boundary in either direction (push or pull). + * + * Excluded patterns: + * - `.datastore-index.json` — the remote index manifest itself; pulled + * and pushed via dedicated code paths, never as a walked payload. + * - `.push-queue.json` — local push-queue scratch file. + * - `.datastore.lock` — distributed lock file; managed by the lock + * subsystem, must never be uploaded as data. + * - basename `_catalog.db` and anything starting with `_catalog.db-` + * (the SQLite WAL/SHM/journal sidecars) — the local-only data catalog + * store. It is deliberately colocated with the data tier so it can + * be rebuilt from whatever the local cache holds, but the database + * itself is per-machine state and must never leak to the shared + * bucket. See swamp-club issue #29 for the bug this exclusion fixes: + * without it, `swamp datastore sync --push` would walk `_catalog.db*` + * into `toPush`, SQLite would rewrite the WAL mid-upload, and the + * push would fail on `_catalog.db-wal`. + * + * Uses basename matching for the catalog pattern so the filter is + * robust to any future change in the data tier subdirectory name. + */ +function isInternalCacheFile(rel: string): boolean { + if ( + rel === ".datastore-index.json" || rel === ".push-queue.json" || + rel === ".datastore.lock" + ) { + return true; + } + const base = rel.split("/").pop() ?? ""; + return base === "_catalog.db" || base.startsWith("_catalog.db-"); +} + /** Metadata index entry for a file in S3. */ interface IndexEntry { key: string; @@ -87,6 +121,13 @@ export class S3CacheSyncService implements DatastoreSyncService { private readonly indexPath: string; private readonly pushQueuePath: string; private index: DatastoreIndex | null = null; + /** + * Set to true when `scrubIndex()` removes zombie entries from the + * in-memory index. Drives the write-back gate in `pushChanged()` so + * that the cleaned index propagates to the remote even on a no-op + * push (no new files to upload). Reset after a successful write-back. + */ + private indexMutated = false; constructor(s3: S3Client, cachePath: string) { this.s3 = s3; @@ -95,10 +136,41 @@ export class S3CacheSyncService implements DatastoreSyncService { this.pushQueuePath = join(cachePath, ".push-queue.json"); } + /** + * Removes zombie internal-file entries from the in-memory index. + * Runs whenever the index is populated from disk or remote so the + * invariant "internal files never cross the sync boundary" is + * enforced at the persistence boundary. + * + * Returns true if any entries were removed. See + * `isInternalCacheFile` for the exclusion criteria and swamp-club + * issue #29 for the motivating bug. + */ + private scrubIndex(): boolean { + if (!this.index || !this.index.entries) return false; + let mutated = false; + for (const rel of Object.keys(this.index.entries)) { + if (isInternalCacheFile(rel)) { + delete this.index.entries[rel]; + mutated = true; + } + } + return mutated; + } + /** * Pulls the metadata index from S3 (lightweight, single GET). * Uses a local cache with a 60-second TTL to avoid redundant fetches * during rapid command sequences. + * + * Both entry paths (cache-hit and S3-fetch) scrub the in-memory + * index after parsing so zombie internal-file entries never reach + * the rest of the sync pipeline (see swamp-club#29). On the + * S3-fetch path the local cache file is rewritten with the scrubbed + * JSON when scrub mutated — keeping the on-disk and in-memory views + * consistent. The cache-hit path does NOT rewrite the local file; + * its on-disk view self-heals on the next S3 fetch or via the + * `indexMutated` propagation on the next `pushChanged`. */ async pullIndex(): Promise { // Check local cache freshness @@ -108,6 +180,8 @@ export class S3CacheSyncService implements DatastoreSyncService { if (ageMs < INDEX_CACHE_TTL_MS && this.index === null) { const content = await Deno.readTextFile(this.indexPath); this.index = JSON.parse(content) as DatastoreIndex; + // Scrub zombies from the in-memory view before returning. + this.indexMutated ||= this.scrubIndex(); return; // Fresh enough — skip S3 } } catch { @@ -119,7 +193,19 @@ export class S3CacheSyncService implements DatastoreSyncService { const text = new TextDecoder().decode(data); this.index = JSON.parse(text) as DatastoreIndex; await ensureDir(this.cachePath); - await atomicWriteTextFile(this.indexPath, text); + // Scrub zombies, then write the local cache file. If scrub + // mutated, write the cleaned JSON so on-disk matches in-memory. + // Otherwise write the raw remote text to preserve the fast path + // (no re-serialization cost). + if (this.scrubIndex()) { + this.indexMutated = true; + await atomicWriteTextFile( + this.indexPath, + JSON.stringify(this.index, null, 2), + ); + } else { + await atomicWriteTextFile(this.indexPath, text); + } } catch { // No index exists yet - start fresh this.index = { @@ -149,6 +235,12 @@ export class S3CacheSyncService implements DatastoreSyncService { // Build list of files that need pulling const toPull: string[] = []; for (const [rel, entry] of Object.entries(this.index?.entries ?? {})) { + // Belt-and-suspenders: `scrubIndex` already removed internal + // entries in `pullIndex`, but if anything re-adds a zombie + // between the scrub and the walk, this guard still catches it. + if (isInternalCacheFile(rel)) { + continue; + } const localPath = assertSafePath(this.cachePath, rel); try { const stat = await Deno.stat(localPath); @@ -248,11 +340,8 @@ export class S3CacheSyncService implements DatastoreSyncService { ) { totalFiles++; const rel = relative(this.cachePath, entry.path); - // Skip internal metadata files - if ( - rel === ".datastore-index.json" || rel === ".push-queue.json" || - rel === ".datastore.lock" - ) { + // Skip internal metadata files (see isInternalCacheFile) + if (isInternalCacheFile(rel)) { continue; } @@ -320,14 +409,19 @@ export class S3CacheSyncService implements DatastoreSyncService { ); } - // Push updated index if anything changed. - if (pushed > 0 && this.index) { - this.index.lastPulled = new Date().toISOString(); + // Push updated index if anything changed — either new files were + // pushed OR scrubIndex removed zombie entries that need to + // propagate to the remote (swamp-club#29 migration path). + if ((pushed > 0 || this.indexMutated) && this.index) { + if (pushed > 0) { + this.index.lastPulled = new Date().toISOString(); + } const indexJson = JSON.stringify(this.index, null, 2); const indexData = new TextEncoder().encode(indexJson); await this.s3.putObject(".datastore-index.json", indexData); // Also update the local cache await atomicWriteTextFile(this.indexPath, indexJson); + this.indexMutated = false; } return pushed; @@ -345,5 +439,8 @@ export class S3CacheSyncService implements DatastoreSyncService { entries: {}, }; } + // Scrub zombies from the in-memory view so the rest of + // `pushChanged` operates on a clean index. See swamp-club#29. + this.indexMutated ||= this.scrubIndex(); } } diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts new file mode 100644 index 000000000..eabc29348 --- /dev/null +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts @@ -0,0 +1,504 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +// Tests for S3CacheSyncService focusing on the swamp-club#29 fix: +// the sync walker and index scrub must keep internal files +// (including the SQLite catalog at `_catalog.db*`) from crossing the +// sync boundary in either direction, and zombie entries from pre-fix +// remote indexes must self-heal via the `indexMutated` flag. + +import { assertEquals, assertExists } from "jsr:@std/assert@1.0.19"; +import { join } from "jsr:@std/path@1"; +import { S3CacheSyncService } from "./s3_cache_sync.ts"; +import type { S3Client } from "./s3_client.ts"; + +/** Captured putObject call for test assertions. */ +interface PutCall { + key: string; + body: Uint8Array; +} + +/** In-memory mock of S3Client recording getObject/putObject calls. */ +function createMockS3Client(): S3Client & { + storage: Map; + puts: PutCall[]; + gets: string[]; +} { + const storage = new Map(); + const puts: PutCall[] = []; + const gets: string[] = []; + + return { + storage, + puts, + gets, + + putObject(key: string, body: Uint8Array): Promise { + storage.set(key, body); + puts.push({ key, body }); + return Promise.resolve(); + }, + + getObject(key: string): Promise { + gets.push(key); + const data = storage.get(key); + if (!data) return Promise.reject(new Error(`NoSuchKey: ${key}`)); + return Promise.resolve(data); + }, + } as unknown as S3Client & { + storage: Map; + puts: PutCall[]; + gets: string[]; + }; +} + +/** Seeds a file inside the cache directory, creating parent dirs. */ +async function seedFile( + cachePath: string, + relPath: string, + contents: string, +): Promise { + const full = join(cachePath, relPath); + const parent = full.substring(0, full.lastIndexOf("/")); + if (parent && parent !== cachePath) { + await Deno.mkdir(parent, { recursive: true }); + } else { + await Deno.mkdir(cachePath, { recursive: true }); + } + await Deno.writeTextFile(full, contents); +} + +/** Encodes an index object as a remote .datastore-index.json body. */ +function encodeIndex( + entries: Record, +): Uint8Array { + return new TextEncoder().encode( + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries, + }), + ); +} + +/** Decodes a recorded putObject body as a parsed index object. */ +function decodeIndex(body: Uint8Array): { + entries: Record; +} { + return JSON.parse(new TextDecoder().decode(body)); +} + +/** Type assertion helper for reaching private instance state in tests. */ +function privateState(service: S3CacheSyncService): { + index: { entries: Record } | null; + indexMutated: boolean; +} { + return service as unknown as { + index: { entries: Record } | null; + indexMutated: boolean; + }; +} + +// -- (a) push skips _catalog.db, _catalog.db-shm, _catalog.db-wal ---------- + +Deno.test("pushChanged: skips _catalog.db and its WAL/SHM sidecars", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-a-" }); + try { + const mock = createMockS3Client(); + const service = new S3CacheSyncService(mock, cachePath); + + // Seed legitimate payload plus catalog files plus internal files. + await seedFile(cachePath, "data/@my-model/payload.yaml", "name: x\n"); + await seedFile(cachePath, "data/_catalog.db", "SQLITE-MAIN"); + await seedFile(cachePath, "data/_catalog.db-shm", "SQLITE-SHM"); + await seedFile(cachePath, "data/_catalog.db-wal", "SQLITE-WAL"); + + await service.pushChanged(); + + // Expect exactly two puts: the legitimate payload plus the + // index write-back. Nothing matching _catalog.db*. + const uploadedKeys = mock.puts.map((p) => p.key); + assertEquals( + uploadedKeys.includes("data/@my-model/payload.yaml"), + true, + "legitimate payload should be pushed", + ); + for (const key of uploadedKeys) { + assertEquals( + key.includes("_catalog.db"), + false, + `catalog file leaked: ${key}`, + ); + } + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (b) push still skips the pre-existing three internal files ----------- + +Deno.test("pushChanged: still skips .datastore-index.json/.push-queue.json/.datastore.lock", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-b-" }); + try { + const mock = createMockS3Client(); + const service = new S3CacheSyncService(mock, cachePath); + + // Seed a valid empty index so loadIndex succeeds, then seed the + // other two internal files as raw walk-targets. The assertion is + // that the walker skips them, not that loadIndex parses them. + await seedFile(cachePath, "data/ok.yaml", "ok: true\n"); + await seedFile( + cachePath, + ".datastore-index.json", + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: {}, + }), + ); + await seedFile(cachePath, ".push-queue.json", "{}"); + await seedFile(cachePath, ".datastore.lock", "lock"); + + await service.pushChanged(); + + const uploadedKeys = mock.puts.map((p) => p.key); + // Only the payload plus the index write-back. Internal files + // must never appear as standalone put targets. + assertEquals( + uploadedKeys.filter((k) => k === ".push-queue.json").length, + 0, + ); + assertEquals( + uploadedKeys.filter((k) => k === ".datastore.lock").length, + 0, + ); + assertEquals(uploadedKeys.includes("data/ok.yaml"), true); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (c) walker skip is a safety net beyond scrub ------------------------- + +Deno.test("pullChanged walker: skips zombies even if re-injected post-scrub", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-c-" }); + try { + const mock = createMockS3Client(); + // Seed a CLEAN remote index (no zombies). pullIndex will fetch this, + // scrub is a no-op, then we manually inject a zombie entry before + // the walker runs to prove the walker's own skip is load-bearing. + mock.storage.set( + ".datastore-index.json", + encodeIndex({ + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 8, + lastModified: new Date().toISOString(), + }, + }), + ); + // Pre-populate the legitimate local file so pullChanged doesn't + // have to fetch it (size match → skip). + await seedFile(cachePath, "data/@m/ok.yaml", "ok: yes\n"); + + const service = new S3CacheSyncService(mock, cachePath); + // Force pullIndex first so we can mutate the in-memory index + // AFTER scrub has already run. + await service.pullIndex(); + + // Inject a zombie entry directly into the in-memory index, bypassing + // scrub. The walker must still skip this via isInternalCacheFile. + const state = privateState(service); + assertExists(state.index); + state.index.entries["data/_catalog.db-wal"] = { + key: "data/_catalog.db-wal", + size: 1024, + lastModified: new Date().toISOString(), + }; + + await service.pullChanged(); + + // The walker must not have tried to fetch the injected zombie. + assertEquals( + mock.gets.includes("data/_catalog.db-wal"), + false, + "walker must skip zombies regardless of whether scrub caught them", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (d) pull skips zombies from the remote index ------------------------- + +Deno.test("pullChanged: skips zombie _catalog.db* entries from remote index", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-d-" }); + try { + const mock = createMockS3Client(); + // Remote index contains both a legitimate entry and a zombie. + mock.storage.set( + ".datastore-index.json", + encodeIndex({ + "data/@m/payload.yaml": { + key: "data/@m/payload.yaml", + size: 5, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 999, + lastModified: new Date().toISOString(), + }, + }), + ); + // Fake remote payload so pullChanged's fetch succeeds. + mock.storage.set( + "data/@m/payload.yaml", + new TextEncoder().encode("hi!\n\n"), + ); + + const service = new S3CacheSyncService(mock, cachePath); + await service.pullChanged(); + + assertEquals( + mock.gets.includes("data/@m/payload.yaml"), + true, + "legitimate payload must be fetched", + ); + assertEquals( + mock.gets.includes("data/_catalog.db-wal"), + false, + "zombie must not be fetched", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (e1) pullIndex S3-fetch path scrubs in-memory AND rewrites local ---- + +Deno.test("pullIndex S3-fetch path: scrubs in-memory and rewrites local cache file", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-e1-" }); + try { + const mock = createMockS3Client(); + // Remote index has zombies. No local cache file yet → forces the + // S3-fetch branch. + const polluted = { + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 3, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db": { + key: "data/_catalog.db", + size: 100, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 200, + lastModified: new Date().toISOString(), + }, + }; + mock.storage.set(".datastore-index.json", encodeIndex(polluted)); + + const service = new S3CacheSyncService(mock, cachePath); + await service.pullIndex(); + + // In-memory state: zombies gone. + const state = privateState(service); + assertExists(state.index); + assertEquals( + Object.keys(state.index.entries).sort(), + ["data/@m/ok.yaml"], + "in-memory index should contain only the legitimate entry", + ); + assertEquals( + state.indexMutated, + true, + "indexMutated must be set after scrubbing zombies", + ); + + // Local cache file at /.datastore-index.json must + // have been rewritten with the scrubbed JSON, not the raw + // polluted text. + const localFile = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + const localParsed = JSON.parse(localFile) as { + entries: Record; + }; + assertEquals( + Object.keys(localParsed.entries).sort(), + ["data/@m/ok.yaml"], + "local cache file must match scrubbed in-memory view", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (e2) pullIndex cache-hit path scrubs in-memory, pushChanged propagates - + +Deno.test("pullIndex cache-hit path: scrubs in-memory, next pushChanged propagates to remote", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-e2-" }); + try { + const mock = createMockS3Client(); + + // Pre-seed a POLLUTED local cache file so pullIndex takes the + // cache-hit branch. Remote index is left empty — we must never + // call getObject for it on this path. + const pollutedJson = JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: { + "data/@m/ok.yaml": { + key: "data/@m/ok.yaml", + size: 3, + lastModified: new Date().toISOString(), + }, + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 42, + lastModified: new Date().toISOString(), + }, + }, + }); + await seedFile(cachePath, ".datastore-index.json", pollutedJson); + + const service = new S3CacheSyncService(mock, cachePath); + await service.pullIndex(); + + // Must have taken the cache-hit branch (no remote fetch). + assertEquals( + mock.gets.includes(".datastore-index.json"), + false, + "cache-hit branch must not fetch the index from S3", + ); + + // In-memory state is scrubbed. + const state = privateState(service); + assertExists(state.index); + assertEquals(Object.keys(state.index.entries).sort(), ["data/@m/ok.yaml"]); + assertEquals(state.indexMutated, true); + + // The on-disk file is NOT rewritten by the cache-hit branch. + const diskJson = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + assertEquals( + diskJson, + pollutedJson, + "cache-hit branch must not rewrite the local index file", + ); + + // Now push against a cache with no new payload files. The + // indexMutated flag must trigger a write-back of the scrubbed + // index to the remote. + await service.pushChanged(); + + const indexPuts = mock.puts.filter((p) => + p.key === ".datastore-index.json" + ); + assertEquals( + indexPuts.length, + 1, + "scrubbed index must be pushed exactly once via indexMutated", + ); + const pushedIndex = decodeIndex(indexPuts[0].body); + assertEquals( + Object.keys(pushedIndex.entries).sort(), + ["data/@m/ok.yaml"], + "pushed index must contain only the legitimate entry", + ); + assertEquals( + state.indexMutated, + false, + "indexMutated must reset after write-back", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// -- (f) indexMutated propagates on no-op push and resets ------------------ + +Deno.test("pushChanged: no-op push still propagates scrubbed index, second call is quiet", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-test-f-" }); + try { + const mock = createMockS3Client(); + + // Seed a polluted local index. No remote index — loadIndex reads + // straight from disk. + await seedFile( + cachePath, + ".datastore-index.json", + JSON.stringify({ + version: 1, + lastPulled: new Date().toISOString(), + entries: { + "data/_catalog.db-wal": { + key: "data/_catalog.db-wal", + size: 42, + lastModified: new Date().toISOString(), + }, + }, + }), + ); + + const service = new S3CacheSyncService(mock, cachePath); + + // First push: no files to push, but scrub triggered the flag. + await service.pushChanged(); + + const firstIndexPuts = mock.puts.filter( + (p) => p.key === ".datastore-index.json", + ); + assertEquals( + firstIndexPuts.length, + 1, + "first pushChanged must write scrubbed index despite pushed=0", + ); + const firstPushed = decodeIndex(firstIndexPuts[0].body); + assertEquals( + Object.keys(firstPushed.entries).length, + 0, + "scrubbed index must contain no zombie entries", + ); + + // Second push on the same instance: flag is reset, nothing to do. + await service.pushChanged(); + + const secondIndexPuts = mock.puts.filter( + (p) => p.key === ".datastore-index.json", + ); + assertEquals( + secondIndexPuts.length, + 1, + "second pushChanged must NOT rewrite the index — flag must reset", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); diff --git a/datastore/s3/manifest.yaml b/datastore/s3/manifest.yaml index b713f87cd..6da36858f 100644 --- a/datastore/s3/manifest.yaml +++ b/datastore/s3/manifest.yaml @@ -1,6 +1,6 @@ manifestVersion: 1 name: "@swamp/s3-datastore" -version: "2026.04.03.1" +version: "2026.04.08.1" description: | Store data in an Amazon S3 bucket with local cache synchronization. Provides distributed locking via S3 conditional writes and bidirectional