Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 105 additions & 9 deletions datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,40 @@ function assertSafePath(cachePath: string, relativePath: string): string {
return resolved;
}

/**
* Returns true for files that live inside the cache directory but must
* NOT cross the sync boundary in either direction (push or pull).
*
* Excluded patterns:
* - `.datastore-index.json` — the remote index manifest itself; pulled
* and pushed via dedicated code paths, never as a walked payload.
* - `.push-queue.json` — local push-queue scratch file.
* - `.datastore.lock` — distributed lock file; managed by the lock
* subsystem, must never be uploaded as data.
* - basename `_catalog.db` and anything starting with `_catalog.db-`
* (the SQLite WAL/SHM/journal sidecars) — the local-only data catalog
* store. It is deliberately colocated with the data tier so it can
* be rebuilt from whatever the local cache holds, but the database
* itself is per-machine state and must never leak to the shared
* bucket. See swamp-club issue #29 for the bug this exclusion fixes:
* without it, `swamp datastore sync --push` would walk `_catalog.db*`
* into `toPush`, SQLite would rewrite the WAL mid-upload, and the
* push would fail on `_catalog.db-wal`.
*
* Uses basename matching for the catalog pattern so the filter is
* robust to any future change in the data tier subdirectory name.
*/
function isInternalCacheFile(rel: string): boolean {
if (
rel === ".datastore-index.json" || rel === ".push-queue.json" ||
rel === ".datastore.lock"
) {
return true;
}
const base = rel.split("/").pop() ?? "";
return base === "_catalog.db" || base.startsWith("_catalog.db-");
}

/** Metadata index entry for a file in GCS. */
interface IndexEntry {
key: string;
Expand Down Expand Up @@ -75,16 +109,54 @@ export class GcsCacheSyncService implements DatastoreSyncService {
private readonly cachePath: string;
private readonly indexPath: string;
private index: DatastoreIndex | null = null;
/**
* Set to true when `scrubIndex()` removes zombie entries from the
* in-memory index. Drives the write-back gate in `pushChanged()` so
* that the cleaned index propagates to the remote even on a no-op
* push (no new files to upload). Reset after a successful write-back.
*/
private indexMutated = false;

constructor(gcs: GcsClient, cachePath: string) {
this.gcs = gcs;
this.cachePath = cachePath;
this.indexPath = join(cachePath, ".datastore-index.json");
}

/**
* Removes zombie internal-file entries from the in-memory index.
* Runs whenever the index is populated from disk or remote so the
* invariant "internal files never cross the sync boundary" is
* enforced at the persistence boundary.
*
* Returns true if any entries were removed. See
* `isInternalCacheFile` for the exclusion criteria and swamp-club
* issue #29 for the motivating bug.
*/
private scrubIndex(): boolean {
if (!this.index || !this.index.entries) return false;
let mutated = false;
for (const rel of Object.keys(this.index.entries)) {
if (isInternalCacheFile(rel)) {
delete this.index.entries[rel];
mutated = true;
}
}
return mutated;
}

/**
* Pulls the metadata index from GCS (lightweight, single GET).
* Uses a local cache with a 60-second TTL to avoid redundant fetches.
*
* Both entry paths (cache-hit and GCS-fetch) scrub the in-memory
* index after parsing so zombie internal-file entries never reach
* the rest of the sync pipeline (see swamp-club#29). On the
* GCS-fetch path the local cache file is rewritten with the scrubbed
* JSON when scrub mutated — keeping the on-disk and in-memory views
* consistent. The cache-hit path does NOT rewrite the local file;
* its on-disk view self-heals on the next GCS fetch or via the
* `indexMutated` propagation on the next `pushChanged`.
*/
async pullIndex(): Promise<void> {
// Check local cache freshness
Expand All @@ -94,6 +166,8 @@ export class GcsCacheSyncService implements DatastoreSyncService {
if (ageMs < INDEX_CACHE_TTL_MS && this.index === null) {
const content = await Deno.readTextFile(this.indexPath);
this.index = JSON.parse(content) as DatastoreIndex;
// Scrub zombies from the in-memory view before returning.
this.indexMutated ||= this.scrubIndex();
return;
}
} catch {
Expand All @@ -105,7 +179,18 @@ export class GcsCacheSyncService implements DatastoreSyncService {
const text = new TextDecoder().decode(data);
this.index = JSON.parse(text) as DatastoreIndex;
await ensureDir(this.cachePath);
await atomicWriteTextFile(this.indexPath, text);
// Scrub zombies, then write the local cache file. If scrub
// mutated, write the cleaned JSON so on-disk matches in-memory.
// Otherwise write the raw remote text to preserve the fast path.
if (this.scrubIndex()) {
this.indexMutated = true;
await atomicWriteTextFile(
this.indexPath,
JSON.stringify(this.index, null, 2),
);
} else {
await atomicWriteTextFile(this.indexPath, text);
}
} catch {
// No index exists yet — start fresh
this.index = {
Expand Down Expand Up @@ -134,6 +219,12 @@ export class GcsCacheSyncService implements DatastoreSyncService {

const toPull: string[] = [];
for (const [rel, entry] of Object.entries(this.index?.entries ?? {})) {
// Belt-and-suspenders: `scrubIndex` already removed internal
// entries in `pullIndex`, but if anything re-adds a zombie
// between the scrub and the walk, this guard still catches it.
if (isInternalCacheFile(rel)) {
continue;
}
const localPath = assertSafePath(this.cachePath, rel);
try {
const stat = await Deno.stat(localPath);
Expand Down Expand Up @@ -216,11 +307,8 @@ export class GcsCacheSyncService implements DatastoreSyncService {
})
) {
const rel = relative(this.cachePath, entry.path);
// Skip internal metadata files
if (
rel === ".datastore-index.json" || rel === ".push-queue.json" ||
rel === ".datastore.lock"
) {
// Skip internal metadata files (see isInternalCacheFile)
if (isInternalCacheFile(rel)) {
continue;
}

Expand Down Expand Up @@ -268,13 +356,18 @@ export class GcsCacheSyncService implements DatastoreSyncService {
);
}

// Push updated index if anything changed
if (pushed > 0 && this.index) {
this.index.lastPulled = new Date().toISOString();
// Push updated index if anything changed — either new files were
// pushed OR scrubIndex removed zombie entries that need to
// propagate to the remote (swamp-club#29 migration path).
if ((pushed > 0 || this.indexMutated) && this.index) {
if (pushed > 0) {
this.index.lastPulled = new Date().toISOString();
}
const indexJson = JSON.stringify(this.index, null, 2);
const indexData = new TextEncoder().encode(indexJson);
await this.gcs.putObject(".datastore-index.json", indexData);
await atomicWriteTextFile(this.indexPath, indexJson);
this.indexMutated = false;
}

return pushed;
Expand All @@ -292,5 +385,8 @@ export class GcsCacheSyncService implements DatastoreSyncService {
entries: {},
};
}
// Scrub zombies from the in-memory view so the rest of
// `pushChanged` operates on a clean index. See swamp-club#29.
this.indexMutated ||= this.scrubIndex();
}
}
Loading
Loading