Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions integration/data_versioning_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,51 @@ Deno.test("Data Versioning: garbage collection preserves minimum versions", asyn
});
});

Deno.test("Data Versioning: garbage collection dry-run reports counts without deleting", async () => {
await withTempDir(async (repoDir) => {
await setupRepoDir(repoDir);
const repo = new FileSystemUnifiedDataRepository(
repoDir,
undefined,
new CatalogStore(join(repoDir, "_catalog.db")),
);
const type = ModelType.create("test/model");
const modelId = crypto.randomUUID();
const owner = createOwner("test/model:gc-dryrun");

const data = Data.create({
name: "gc-dryrun-test",
contentType: "text/plain",
lifetime: "infinite",
garbageCollection: 3,
tags: { type: "test" },
ownerDefinition: owner,
});

// Write 7 versions — 4 over the retention count
for (let i = 1; i <= 7; i++) {
await repo.save(
type,
modelId,
data,
new TextEncoder().encode(`v${i}`),
);
}

const before = await repo.listVersions(type, modelId, "gc-dryrun-test");
assertEquals(before, [1, 2, 3, 4, 5, 6, 7]);

// Dry-run should report what would be pruned, but remove nothing
const gcResult = await repo.collectGarbage(type, modelId, { dryRun: true });
assertEquals(gcResult.versionsRemoved, 4);
// Bytes reclaimed should be nonzero since the files still exist to stat
assertEquals(gcResult.bytesReclaimed > 0, true);

const after = await repo.listVersions(type, modelId, "gc-dryrun-test");
assertEquals(after, [1, 2, 3, 4, 5, 6, 7]);
});
});

Deno.test("Data Versioning: multiple data items with different GC policies", async () => {
await withTempDir(async (repoDir) => {
await setupRepoDir(repoDir);
Expand Down
6 changes: 4 additions & 2 deletions src/cli/commands/data_gc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ export const dataGcCommand = new Command()
// Phase 1: Preview + Prompt (only in interactive mode without --force and not dry-run)
if (cliCtx.outputMode === "log" && !options.force && !options.dryRun) {
const preview = await dataGcPreview(ctx, deps);
if (preview.items.length === 0) {
console.log("No expired data found. Nothing to clean up.");
if (
preview.items.length === 0 && preview.versionGcItems.length === 0
) {
console.log("Nothing to clean up.");
return;
}

Expand Down
126 changes: 76 additions & 50 deletions src/domain/data/data_lifecycle_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import { getLogger } from "@logtape/logtape";
import type { Data } from "./data.ts";
import type { Lifetime } from "./data_metadata.ts";
import { parseDataDuration } from "./duration.ts";
import type { UnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts";
import type { WorkflowRunRepository } from "../workflows/repositories.ts";
import type { ModelType } from "../models/model_type.ts";
Expand All @@ -41,6 +42,17 @@ export interface ExpiredDataInfo {
reason: "duration-expired" | "workflow-deleted" | "job-deleted";
}

/**
* A (modelType, modelId) pair that would have versions pruned by version GC,
* with the counts computed via a dry-run against the repository.
*/
export interface VersionGcPreviewInfo {
type: ModelType;
modelId: string;
versionsWouldBeRemoved: number;
bytesWouldBeReclaimed: number;
}

/**
* Result of garbage collection operation.
*/
Expand Down Expand Up @@ -72,6 +84,13 @@ export interface DataLifecycleService {
*/
findExpiredData(): Promise<ExpiredDataInfo[]>;

/**
* Previews version-based garbage collection across all unique models without
* deleting anything. Returns one entry per (modelType, modelId) that has
* versions to prune.
*/
previewVersionGarbage(): Promise<VersionGcPreviewInfo[]>;

/**
* Deletes expired data and applies version garbage collection.
*
Expand Down Expand Up @@ -127,7 +146,7 @@ export class DefaultDataLifecycleService implements DataLifecycleService {

// Duration-based lifetime
try {
const durationMs = this.parseDuration(lifetime);
const durationMs = parseDataDuration(lifetime);
return new Date(createdAt.getTime() + durationMs);
} catch (error) {
logger.error("Failed to parse lifetime duration: {lifetime}", {
Expand Down Expand Up @@ -223,6 +242,39 @@ export class DefaultDataLifecycleService implements DataLifecycleService {
return expired;
}

async previewVersionGarbage(): Promise<VersionGcPreviewInfo[]> {
const previews: VersionGcPreviewInfo[] = [];
const allData = await this.dataRepo.findAllGlobal();
const seen = new Set<string>();
for (const { modelType, modelId } of allData) {
const key = `${modelType.toDirectoryPath()}/${modelId}`;
if (seen.has(key)) continue;
seen.add(key);

try {
const result = await this.dataRepo.collectGarbage(
modelType,
modelId,
{ dryRun: true },
);
if (result.versionsRemoved > 0) {
previews.push({
type: modelType,
modelId,
versionsWouldBeRemoved: result.versionsRemoved,
bytesWouldBeReclaimed: result.bytesReclaimed,
});
}
} catch (error) {
logger.error(
"Error previewing version GC on {path}",
{ path: key, error },
);
}
}
return previews;
}

async deleteExpiredData(options?: {
dryRun?: boolean;
}): Promise<LifecycleGCResult> {
Expand Down Expand Up @@ -307,28 +359,29 @@ export class DefaultDataLifecycleService implements DataLifecycleService {
});
}

// Phase 2: Version-based garbage collection on all unique models
// Reuses the allData result from the single findAllGlobal() call
if (!dryRun) {
const seen = new Set<string>();
for (const { modelType, modelId } of allData) {
const key = `${modelType.toDirectoryPath()}/${modelId}`;
if (seen.has(key)) continue;
seen.add(key);

try {
const result = await this.dataRepo.collectGarbage(
modelType,
modelId,
);
versionsDeleted += result.versionsRemoved;
bytesReclaimed += result.bytesReclaimed;
} catch (error) {
logger.error(
"Error running GC on {path}",
{ path: `${modelType.toDirectoryPath()}/${modelId}`, error },
);
}
// Phase 2: Version-based garbage collection on all unique models.
// Reuses the allData result from the single findAllGlobal() call. Runs in
// both dry-run and real mode — the repository computes would-be counts
// without deleting when dryRun is true.
const seen = new Set<string>();
for (const { modelType, modelId } of allData) {
const key = `${modelType.toDirectoryPath()}/${modelId}`;
if (seen.has(key)) continue;
seen.add(key);

try {
const result = await this.dataRepo.collectGarbage(
modelType,
modelId,
{ dryRun },
);
versionsDeleted += result.versionsRemoved;
bytesReclaimed += result.bytesReclaimed;
} catch (error) {
logger.error(
"Error running GC on {path}",
{ path: `${modelType.toDirectoryPath()}/${modelId}`, error },
);
}
}

Expand All @@ -340,31 +393,4 @@ export class DefaultDataLifecycleService implements DataLifecycleService {
expiredEntries,
};
}

private parseDuration(duration: string): number {
const match = duration.match(/^(\d+)(mo|y|h|m|d|w)$/);
if (!match) {
throw new Error(`Invalid duration format: ${duration}`);
}

const value = parseInt(match[1], 10);
const unit = match[2];

switch (unit) {
case "mo":
return value * 30 * 24 * 60 * 60 * 1000;
case "y":
return value * 365 * 24 * 60 * 60 * 1000;
case "h":
return value * 60 * 60 * 1000;
case "m":
return value * 60 * 1000;
case "d":
return value * 24 * 60 * 60 * 1000;
case "w":
return value * 7 * 24 * 60 * 60 * 1000;
default:
throw new Error(`Unknown duration unit: ${unit}`);
}
}
}
Loading
Loading