Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { ProjectionPendingApprovalRepository } from "../src/persistence/Services
import { EnvironmentVariablesLive } from "../src/persistence/Services/EnvironmentVariables.ts";
import { ProviderUnsupportedError } from "../src/provider/Errors.ts";
import { ProviderAdapterRegistry } from "../src/provider/Services/ProviderAdapterRegistry.ts";
import { ProviderRuntimeEventFeedLive } from "../src/provider/Layers/ProviderRuntimeEventFeed.ts";
import { ProviderSessionDirectoryLive } from "../src/provider/Layers/ProviderSessionDirectory.ts";
import { makeProviderServiceLive } from "../src/provider/Layers/ProviderService.ts";
import { makeCodexAdapterLive } from "../src/provider/Layers/CodexAdapter.ts";
Expand Down Expand Up @@ -298,6 +299,7 @@ export const makeOrchestrationIntegrationHarness = (
Layer.provideMerge(ProjectionPendingApprovalRepositoryLive),
Layer.provideMerge(checkpointStoreLayer),
Layer.provideMerge(providerLayer),
Layer.provideMerge(ProviderRuntimeEventFeedLive),
Layer.provideMerge(RuntimeReceiptBusLive),
);
const runtimeIngestionLayer = ProviderRuntimeIngestionLive.pipe(
Expand Down
2 changes: 2 additions & 0 deletions apps/server/integration/providerService.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Effect, FileSystem, Layer, Path, Queue, Stream } from "effect";

import { ProviderUnsupportedError } from "../src/provider/Errors.ts";
import { ProviderAdapterRegistry } from "../src/provider/Services/ProviderAdapterRegistry.ts";
import { ProviderRuntimeEventFeedLive } from "../src/provider/Layers/ProviderRuntimeEventFeed.ts";
import { ProviderSessionDirectoryLive } from "../src/provider/Layers/ProviderSessionDirectory.ts";
import { makeProviderServiceLive } from "../src/provider/Layers/ProviderService.ts";
import {
Expand Down Expand Up @@ -59,6 +60,7 @@ const makeIntegrationFixture = Effect.gen(function* () {
const shared = Layer.mergeAll(
directoryLayer,
Layer.succeed(ProviderAdapterRegistry, registry),
ProviderRuntimeEventFeedLive,
).pipe(Layer.provide(SqlitePersistenceMemory));

const layer = makeProviderServiceLive().pipe(Layer.provide(shared));
Expand Down
10 changes: 6 additions & 4 deletions apps/server/src/orchestration/Layers/CheckpointReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async function waitForGitRefExists(cwd: string, ref: string, timeoutMs = 15_000)

describe("CheckpointReactor", () => {
let runtime: ManagedRuntime.ManagedRuntime<
OrchestrationEngineService | CheckpointReactor | CheckpointStore,
OrchestrationEngineService | CheckpointReactor | CheckpointStore | ProviderRuntimeEventFeed,
unknown
> | null = null;
let scope: Scope.Closeable | null = null;
Expand Down Expand Up @@ -331,10 +331,12 @@ describe("CheckpointReactor", () => {

return {
engine,
provider,
provider: {
...provider,
emit: (event: LegacyProviderRuntimeEvent) =>
Effect.runSync(eventFeed.publish(event as unknown as ProviderRuntimeEvent)),
},
cwd,
emit: (event: LegacyProviderRuntimeEvent) =>
Effect.runSync(eventFeed.publish(event as unknown as ProviderRuntimeEvent)),
drain,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type ProviderRuntimeTestCheckpoint = ProviderRuntimeTestThread["checkpoints"][nu

describe("ProviderRuntimeIngestion", () => {
let runtime: ManagedRuntime.ManagedRuntime<
OrchestrationEngineService | ProviderRuntimeIngestionService,
OrchestrationEngineService | ProviderRuntimeIngestionService | ProviderRuntimeEventFeed,
unknown
> | null = null;
let scope: Scope.Closeable | null = null;
Expand Down
2 changes: 1 addition & 1 deletion apps/server/src/prReview/Layers/PrReview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ query PullRequestReviewDashboard($owner: String!, $name: String!, $number: Int!)
headRefName
baseRefOid
headRefOid
reviews(last: 100) {
reviews(last: 10) {
nodes {
state
body
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventId, ThreadId, TurnId, type ProviderRuntimeEvent } from "@okcode/contracts";
import { it } from "@effect/vitest";
import { describe, expect } from "vitest";
import { Effect, Layer, Stream } from "effect";
import { Effect, Fiber, Stream } from "effect";

import { ProviderRuntimeEventFeedLive } from "./ProviderRuntimeEventFeed.ts";
import { ProviderRuntimeEventFeed } from "../Services/ProviderRuntimeEventFeed.ts";
Expand All @@ -11,6 +11,7 @@ function makeTurnStartedEvent(id: string): ProviderRuntimeEvent {
type: "turn.started",
eventId: EventId.makeUnsafe(id),
provider: "codex",
payload: {},
threadId: ThreadId.makeUnsafe("thread-1"),
turnId: TurnId.makeUnsafe(`turn-${id}`),
createdAt: "2026-01-01T00:00:00.000Z",
Expand All @@ -27,17 +28,17 @@ describe("ProviderRuntimeEventFeedLive", () => {

const events = yield* Stream.take(feed.subscribeWithReplay(), 3).pipe(
Stream.runCollect,
Effect.fork,
Effect.forkScoped,
);

yield* feed.publish(makeTurnStartedEvent("evt-3"));

const collected = yield* Effect.fromFiber(events);
const collected = yield* Fiber.join(events);
expect(Array.from(collected).map((event) => event.eventId)).toEqual([
"evt-1",
"evt-2",
"evt-3",
]);
}).pipe(Effect.provide(Layer.mergeAll(ProviderRuntimeEventFeedLive))),
}).pipe(Effect.provide(ProviderRuntimeEventFeedLive)),
);
});
6 changes: 4 additions & 2 deletions apps/server/src/provider/Layers/ProviderRuntimeEventFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ const makeProviderRuntimeEventFeed = Effect.gen(function* () {
);

const subscribeWithReplay: ProviderRuntimeEventFeedShape["subscribeWithReplay"] = () =>
Stream.unwrapScoped(
Stream.unwrap(
Effect.gen(function* () {
const scope = yield* Effect.scope;
const subscriber = yield* Queue.unbounded<ProviderRuntimeEvent>();
const replay = yield* mutex.withPermits(1)(
Ref.modify(stateRef, (state) => {
Expand All @@ -69,7 +70,8 @@ const makeProviderRuntimeEventFeed = Effect.gen(function* () {
discard: true,
});

yield* Scope.addFinalizer(() =>
yield* Scope.addFinalizer(
scope,
mutex.withPermits(1)(
Ref.update(stateRef, (state) => {
const subscribers = new Set(state.subscribers);
Expand Down
5 changes: 0 additions & 5 deletions apps/web/src/components/chat/ProviderSetupCard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ const PROVIDER_CONFIG = {
verifyCmd: "gh auth status",
note: undefined,
},
copilot: {
installCmd: "npm install -g @github/copilot",
authCmd: "copilot login",
verifyCmd: "gh auth status",
},
} as const;

function StatusIcon({ status }: { status: ServerProviderStatus["status"] }) {
Expand Down
4 changes: 3 additions & 1 deletion apps/web/src/components/pr-review/PrReviewShell.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,9 @@ export function PrReviewShell({
...(conflictQuery.data?.status === "conflicted" ? ["Merge conflicts must be resolved"] : []),
...checksSummary.failing.map((name) => `Failing check: ${name}`),
...checksSummary.pending.map((name) => `Pending check: ${name}`),
...blockingWorkflowStepsComputed.map((step) => `Workflow blocked: ${step.title}`),
...blockingWorkflowStepsComputed.map(
(step) => `Workflow blocked: ${step.detail ?? step.stepId}`,
),
];
const approveDisabled =
submitReviewMutation.isPending ||
Expand Down
14 changes: 6 additions & 8 deletions apps/web/src/lib/snapshotSyncManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ describe("createSnapshotSyncManager", () => {
});

it("coalesces overlapping sync requests and reruns once after success", async () => {
let resolveFetch: ((snapshot: OrchestrationReadModel) => void) | null = null;
let releaseFirstFetch!: (snapshot: OrchestrationReadModel) => void;
const firstFetchPromise = new Promise<OrchestrationReadModel>((resolve) => {
releaseFirstFetch = resolve;
});
const fetchSnapshot = vi
.fn<() => Promise<OrchestrationReadModel>>()
.mockImplementation(
() =>
new Promise<OrchestrationReadModel>((resolve) => {
resolveFetch = resolve;
}),
)
.mockImplementationOnce(() => firstFetchPromise)
.mockResolvedValueOnce(makeSnapshot(2));
const applySnapshot = vi.fn();
const manager = createSnapshotSyncManager({
Expand All @@ -69,7 +67,7 @@ describe("createSnapshotSyncManager", () => {
expect(fetchSnapshot).toHaveBeenCalledTimes(1);
expect(firstSync).toBe(secondSync);

resolveFetch?.(makeSnapshot(1));
releaseFirstFetch(makeSnapshot(1));
await firstSync;
await Promise.resolve();

Expand Down
17 changes: 9 additions & 8 deletions packages/contracts/src/prReview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ export const PrReviewStatusCheck = Schema.Struct({
});
export type PrReviewStatusCheck = typeof PrReviewStatusCheck.Type;

export const PrReviewHistoryEntry = Schema.Struct({
authorLogin: TrimmedNonEmptyString,
state: TrimmedNonEmptyString,
body: Schema.String,
submittedAt: Schema.String,
});
export type PrReviewHistoryEntry = typeof PrReviewHistoryEntry.Type;

export const PrReviewComment = Schema.Struct({
id: TrimmedNonEmptyString,
databaseId: Schema.NullOr(PositiveInt),
Expand Down Expand Up @@ -227,14 +235,7 @@ export const PrReviewSummary = Schema.Struct({
statusChecks: Schema.Array(PrReviewStatusCheck),
participants: Schema.Array(PrReviewParticipant),
reviewRequests: Schema.Array(PrReviewParticipant),
recentReviews: Schema.Array(
Schema.Struct({
authorLogin: TrimmedNonEmptyString,
state: TrimmedNonEmptyString,
body: Schema.String,
submittedAt: Schema.String,
}),
),
recentReviews: Schema.Array(PrReviewHistoryEntry),
totalThreadCount: NonNegativeInt,
unresolvedThreadCount: NonNegativeInt,
headSha: Schema.NullOr(TrimmedNonEmptyString),
Expand Down
Loading