Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 94 additions & 3 deletions packages/core/src/__tests__/lifecycle-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
resolveProbeDecision,
} from "../lifecycle-status-decisions.js";
import { createSessionManager } from "../session-manager.js";
import { writeMetadata, readMetadataRaw } from "../metadata.js";
import { updateMetadata, writeMetadata, readMetadataRaw } from "../metadata.js";
import { readObservabilitySummary } from "../observability.js";
import type {
OrchestratorConfig,
Expand Down Expand Up @@ -904,6 +904,42 @@ describe("check (single session)", () => {
expect(lm.getStates().get("app-1")).toBe("stuck");
});

it("keeps prs metadata deduplicated across repeated detectPR polls", async () => {
const detectedPR = makePR({
owner: "aoagents",
repo: "ReverbCode",
number: 143,
url: "https://github.com/aoagents/ReverbCode/pull/143",
});
const mockSCM = createMockSCM({
detectPR: vi.fn().mockResolvedValue(detectedPR),
});
const registry = createMockRegistry({
runtime: plugins.runtime,
agent: plugins.agent,
workspace: plugins.workspace,
scm: mockSCM,
});
writeMetadata(env.sessionsDir, "app-1", {
worktree: "/tmp",
branch: "feat/reverb-fix",
status: "working",
project: "my-app",
agent: "mock-agent",
} as SessionMetadata);
const realSessionManager = createSessionManager({ config, registry });
const lm = createLifecycleManager({ config, registry, sessionManager: realSessionManager });

for (let i = 0; i < 10; i += 1) {
await lm.check("app-1");
}

const meta = readMetadataRaw(env.sessionsDir, "app-1");
expect(meta?.["pr"]).toBe(detectedPR.url);
expect(meta?.["prs"]?.split(",")).toEqual([detectedPR.url]);
expect(mockSCM.detectPR).toHaveBeenCalledTimes(10);
});

it("refreshes worker branch metadata from the current worktree HEAD before PR detection", async () => {
const workspacePath = join(env.tmpDir, "worker-ws");
const gitDir = join(env.tmpDir, "repo", ".git", "worktrees", "app-1");
Expand Down Expand Up @@ -1323,8 +1359,10 @@ describe("check (single session)", () => {
// can race past the lifecycle list() call before adoption resolves.
const deadline = Date.now() + 2000;
while (Date.now() < deadline) {
if (vi.mocked(mockSessionManager.list).mock.calls.length >= 1) {
await new Promise((resolve) => setTimeout(resolve, 25));
const adoptedCount = [sessionA.branch, sessionB.branch].filter(
(branch) => branch === "shared-branch",
).length;
if (vi.mocked(mockSessionManager.list).mock.calls.length >= 1 && adoptedCount > 0) {
break;
}
await new Promise((resolve) => setTimeout(resolve, 10));
Expand Down Expand Up @@ -4538,6 +4576,59 @@ describe("multi-PR state machine aggregation", () => {
}
});

it("2.1b — enrichment metadata uses unique PRs and deletes duplicate-index orphans", async () => {
vi.useFakeTimers();
try {
const pr10 = makeMatchingPR({ number: 10, url: "https://github.com/org/my-app/pull/10" });
const mockSCM = createMockSCM({
enrichSessionsPRBatch: mockBatchEnrichmentPerPR({
"org/my-app#10": { state: "open", ciStatus: "passing", reviewDecision: "none" },
}),
});
const registry = createMockRegistry({
runtime: plugins.runtime,
agent: plugins.agent,
scm: mockSCM,
});
const session = makeSession({
id: "app-1",
status: "pr_open",
pr: pr10,
prs: [pr10, { ...pr10 }],
metadata: {
prEnrichment_1: "{\"state\":\"open\"}",
prReviewComments_1: "{\"unresolvedThreads\":0}",
},
});

const lm = setupPollCheck("app-1", {
session,
registry,
metaOverrides: {
pr: pr10.url,
prs: `${pr10.url},${pr10.url}`,
prEnrichment_1: "{\"state\":\"open\"}",
prReviewComments_1: "{\"unresolvedThreads\":0}",
},
});
updateMetadata(env.sessionsDir, "app-1", {
prEnrichment_1: "{\"state\":\"open\"}",
prReviewComments_1: "{\"unresolvedThreads\":0}",
});

lm.start(60_000);
await vi.advanceTimersByTimeAsync(0);
lm.stop();

const metadata = readMetadataRaw(env.sessionsDir, "app-1");
expect(metadata?.["prEnrichment"]).toBeDefined();
expect(metadata?.["prEnrichment_1"]).toBeUndefined();
expect(metadata?.["prReviewComments_1"]).toBeUndefined();
} finally {
vi.useRealTimers();
}
});

it("2.2 — session merges when ALL PRs are merged", async () => {
vi.useFakeTimers();
try {
Expand Down
20 changes: 20 additions & 0 deletions packages/core/src/__tests__/session-from-metadata.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,24 @@ describe("sessionFromMetadata — multi-PR (issue #1821)", () => {
expect(session.prs[1].owner).toBe("org-b");
expect(session.prs[1].repo).toBe("repo-y");
});

it("1.10 — duplicate prs entries are deduplicated by owner, repo, and number", () => {
const session = sessionFromMetadata(
"app-1",
{
prs: [
"https://github.com/acme/main/pull/10",
"https://github.com/acme/main/pull/10",
"https://github.com/acme/sub/pull/10",
].join(","),
branch: "feat/pr-10",
},
baseOptions,
);
expect(session.prs.map((pr) => pr.url)).toEqual([
"https://github.com/acme/main/pull/10",
"https://github.com/acme/sub/pull/10",
]);
expect(session.pr).toBe(session.prs[0]);
});
});
46 changes: 45 additions & 1 deletion packages/core/src/__tests__/session-manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { createSessionManager } from "../session-manager.js";
import { writeMetadata } from "../metadata.js";
import { readMetadataRaw, updateMetadata, writeMetadata } from "../metadata.js";
import { recordActivityEvent } from "../activity-events.js";
import type { OrchestratorConfig, PluginRegistry, Agent } from "../types.js";
import { setupTestContext, teardownTestContext, makeHandle, type TestContext } from "./test-utils.js";
Expand Down Expand Up @@ -70,6 +70,50 @@ afterEach(() => {
vi.useRealTimers();
});

describe("PR metadata startup migration", () => {
it("deduplicates corrupt prs CSV data and removes indexed enrichment keys", () => {
writeMetadata(sessionsDir, "app-1", {
worktree: "/tmp/app-1",
branch: "feat/pr-storage",
status: "working",
project: "my-app",
agent: "mock-agent",
});
updateMetadata(sessionsDir, "app-1", {
prs: [
"https://github.com/aoagents/ReverbCode/pull/143",
"https://github.com/aoagents/ReverbCode/pull/143",
"https://github.com/aoagents/ReverbCode/pull/143",
].join(","),
prEnrichment_1: "{\"state\":\"open\"}",
prEnrichment_2: "{\"state\":\"open\"}",
prReviewComments_1: "{\"unresolvedThreads\":0}",
prReviewComments_2: "{\"unresolvedThreads\":0}",
});

createSessionManager({ config, registry: mockRegistry });

const meta = readMetadataRaw(sessionsDir, "app-1");
expect(meta?.["prs"]).toBe("https://github.com/aoagents/ReverbCode/pull/143");
expect(meta?.["prEnrichment_1"]).toBeUndefined();
expect(meta?.["prEnrichment_2"]).toBeUndefined();
expect(meta?.["prReviewComments_1"]).toBeUndefined();
expect(meta?.["prReviewComments_2"]).toBeUndefined();
expect(recordActivityEvent).toHaveBeenCalledWith({
projectId: "my-app",
sessionId: "app-1",
source: "session-manager",
kind: "metadata.deduplicated",
summary: "deduplicated PR metadata: app-1",
data: {
beforePrCount: 3,
afterPrCount: 1,
deletedIndexedKeyCount: 4,
},
});
});
});

describe("activity event logging", () => {
it("records session.spawned after a successful spawn", async () => {
const { execFile } = await import("node:child_process");
Expand Down
67 changes: 56 additions & 11 deletions packages/core/src/lifecycle-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import {
resolveProbeDecision,
type LifecycleDecision,
} from "./lifecycle-status-decisions.js";
import { dedupePrInfos } from "./utils/pr.js";
import {
buildCIFailureNotificationData,
buildPRStateNotificationData,
Expand Down Expand Up @@ -320,7 +321,9 @@ function buildEventContext(
session: Session | ReactionSessionContext,
prEnrichmentCache: Map<string, PREnrichmentData>,
): EventContext {
const sessionPRs = "prs" in session && Array.isArray(session.prs) ? session.prs : (session.pr ? [session.pr] : []);
const sessionPRs = dedupePrInfos(
"prs" in session && Array.isArray(session.prs) ? session.prs : session.pr ? [session.pr] : [],
);

const prs: EventContext["prs"] = sessionPRs.map((p) => {
const cached = prEnrichmentCache.get(`${p.owner}/${p.repo}#${p.number}`);
Expand Down Expand Up @@ -508,6 +511,32 @@ export function createLifecycleManager(deps: LifecycleManagerDeps): LifecycleMan
*/
const prEnrichmentCache = new Map<string, PREnrichmentData>();

function normalizeSessionPRs(session: Session): PRInfo[] {
const candidatePRs = session.prs.length > 0 ? session.prs : session.pr ? [session.pr] : [];
const uniquePRs = dedupePrInfos(candidatePRs);
if (uniquePRs.length !== session.prs.length || session.pr !== (uniquePRs[0] ?? null)) {
session.prs = uniquePRs;
session.pr = uniquePRs[0] ?? null;
}
return uniquePRs;
}

function indexedPRMetadataCleanup(
session: Session,
prCount: number,
): Partial<Record<string, string>> {
const updates: Partial<Record<string, string>> = {};
for (const key of Object.keys(session.metadata)) {
const match = key.match(/^(prEnrichment|prReviewComments)_(\d+)$/);
if (!match) continue;
const index = Number.parseInt(match[2], 10);
if (Number.isNaN(index) || index >= prCount) {
updates[key] = "";
}
}
return updates;
}

function getPREnrichmentForSession(
session: Session | ReactionSessionContext,
): PREnrichmentData | undefined {
Expand Down Expand Up @@ -555,10 +584,11 @@ export function createLifecycleManager(deps: LifecycleManagerDeps): LifecycleMan
reposByPlugin.set(pluginKey, new Set());
}
reposByPlugin.get(pluginKey)!.add(project.repo);
if (session.prs.length === 0) continue;
const sessionPRs = normalizeSessionPRs(session);
if (sessionPRs.length === 0) continue;
// Loop over all PRs in the session — supports multi-repo sessions
// where an agent opened PRs on multiple repos.
for (const pr of session.prs) {
for (const pr of sessionPRs) {
const actualPRRepo = `${pr.owner}/${pr.repo}`;
if (actualPRRepo !== project.repo) {
reposByPlugin.get(pluginKey)!.add(actualPRRepo);
Expand Down Expand Up @@ -690,13 +720,14 @@ export function createLifecycleManager(deps: LifecycleManagerDeps): LifecycleMan
continue;
// Skip detectPR only if we already have a PR on the configured project repo.
// This allows detecting additional PRs on different repos (multi-repo support).
const trackedRepos = new Set(session.prs.map((p) => `${p.owner}/${p.repo}`));
const sessionPRs = normalizeSessionPRs(session);
const trackedRepos = new Set(sessionPRs.map((p) => `${p.owner}/${p.repo}`));
const projectRepoForDetect = config.projects[session.projectId]?.repo;
// primaryPR.branch is always the session branch (metadata doesn't store per-PR branches),
// so use the lifecycle closed-state alone to allow re-detection after a PR is rejected.
const primaryPRIsClosed = session.lifecycle.pr.state === "closed";
if (
session.prs.length > 0 &&
sessionPRs.length > 0 &&
projectRepoForDetect &&
trackedRepos.has(projectRepoForDetect) &&
!primaryPRIsClosed
Expand All @@ -720,7 +751,7 @@ export function createLifecycleManager(deps: LifecycleManagerDeps): LifecycleMan
// in the same session (e.g. agent opens PR #10 and PR #11 both on acme/main-app).
// Only skip if we already have this exact PR number on this exact repo.
// If the existing PR on the same repo is closed, replace it with the new one.
const alreadyTracked = session.prs.some(
const alreadyTracked = sessionPRs.some(
(p) =>
p.owner === detectedPR.owner &&
p.repo === detectedPR.repo &&
Expand All @@ -741,10 +772,11 @@ export function createLifecycleManager(deps: LifecycleManagerDeps): LifecycleMan
)
.concat(detectedPR);
}
session.prs = dedupePrInfos(session.prs);
// pr is always the primary (first) PR
session.pr = session.prs[0] ?? detectedPR;
const sessionsDir = getProjectSessionsDir(session.projectId);
const allPrUrls = session.prs.map((p) => p.url).join(",");
const allPrUrls = [...new Set(session.prs.map((p) => p.url))].join(",");
updateMetadata(sessionsDir, session.id, {
pr: session.pr.url,
prs: allPrUrls,
Expand Down Expand Up @@ -798,10 +830,18 @@ export function createLifecycleManager(deps: LifecycleManagerDeps): LifecycleMan
*/
function persistPREnrichmentToMetadata(sessions: Session[]): void {
for (const session of sessions) {
const sessionPRs = normalizeSessionPRs(session);
if (!session.pr) continue;
const project = config.projects[session.projectId];
if (!project) continue;
const sessionsDir = getProjectSessionsDir(session.projectId);
const cleanupUpdates = indexedPRMetadataCleanup(session, sessionPRs.length);
if (Object.keys(cleanupUpdates).length > 0) {
updateMetadata(sessionsDir, session.id, cleanupUpdates);
session.metadata = Object.fromEntries(
Object.entries(session.metadata).filter(([key]) => cleanupUpdates[key] === undefined),
);
}

const prKey = `${session.pr.owner}/${session.pr.repo}#${session.pr.number}`;
const cached = prEnrichmentCache.get(prKey);
Expand Down Expand Up @@ -835,8 +875,8 @@ export function createLifecycleManager(deps: LifecycleManagerDeps): LifecycleMan
}
}

for (let i = 1; i < session.prs.length; i++) {
const secondaryPR = session.prs[i];
for (let i = 1; i < sessionPRs.length; i++) {
const secondaryPR = sessionPRs[i];
if (!secondaryPR) continue;
const secondaryKey = `${secondaryPR.owner}/${secondaryPR.repo}#${secondaryPR.number}`;
const secondaryCached = prEnrichmentCache.get(secondaryKey);
Expand Down Expand Up @@ -1933,8 +1973,13 @@ export function createLifecycleManager(deps: LifecycleManagerDeps): LifecycleMan

// Persist per-PR review comment blobs for secondary PRs so the dashboard
// can enrich them independently (prReviewComments_1, prReviewComments_2, …).
for (let i = 1; i < session.prs.length; i++) {
const secondaryPR = session.prs[i];
const sessionPRs = normalizeSessionPRs(session);
const cleanupUpdates = indexedPRMetadataCleanup(session, sessionPRs.length);
if (Object.keys(cleanupUpdates).length > 0) {
updateSessionMetadata(session, cleanupUpdates);
}
for (let i = 1; i < sessionPRs.length; i++) {
const secondaryPR = sessionPRs[i];
if (!secondaryPR) continue;
let secondaryThreads: ReviewComment[];
let secondaryReviews: ReviewSummary[];
Expand Down
Loading
Loading