Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
428 changes: 428 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts

Large diffs are not rendered by default.

98 changes: 98 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import * as SqlClient from "effect/unstable/sql/SqlClient";
import { toPersistenceSqlError, type ProjectionRepositoryError } from "../../persistence/Errors.ts";
import { OrchestrationEventStore } from "../../persistence/Services/OrchestrationEventStore.ts";
import { ProjectionPendingApprovalRepository } from "../../persistence/Services/ProjectionPendingApprovals.ts";
import { ProjectionPendingUserInputRepository } from "../../persistence/Services/ProjectionPendingUserInputs.ts";
import { ProjectionProjectRepository } from "../../persistence/Services/ProjectionProjects.ts";
import { ProjectionStateRepository } from "../../persistence/Services/ProjectionState.ts";
import { ProjectionThreadActivityRepository } from "../../persistence/Services/ProjectionThreadActivities.ts";
Expand All @@ -30,6 +31,7 @@ import {
} from "../../persistence/Services/ProjectionTurns.ts";
import { ProjectionThreadRepository } from "../../persistence/Services/ProjectionThreads.ts";
import { ProjectionPendingApprovalRepositoryLive } from "../../persistence/Layers/ProjectionPendingApprovals.ts";
import { ProjectionPendingUserInputRepositoryLive } from "../../persistence/Layers/ProjectionPendingUserInputs.ts";
import { ProjectionProjectRepositoryLive } from "../../persistence/Layers/ProjectionProjects.ts";
import { ProjectionStateRepositoryLive } from "../../persistence/Layers/ProjectionState.ts";
import { ProjectionThreadActivityRepositoryLive } from "../../persistence/Layers/ProjectionThreadActivities.ts";
Expand Down Expand Up @@ -60,6 +62,7 @@ export const ORCHESTRATION_PROJECTOR_NAMES = {
threadTurns: "projection.thread-turns",
checkpoints: "projection.checkpoints",
pendingApprovals: "projection.pending-approvals",
pendingUserInputs: "projection.pending-user-inputs",
} as const;

type ProjectorName =
Expand Down Expand Up @@ -347,6 +350,7 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
const projectionThreadSessionRepository = yield* ProjectionThreadSessionRepository;
const projectionTurnRepository = yield* ProjectionTurnRepository;
const projectionPendingApprovalRepository = yield* ProjectionPendingApprovalRepository;
const projectionPendingUserInputRepository = yield* ProjectionPendingUserInputRepository;

const fileSystem = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
Expand All @@ -371,6 +375,18 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
}),
{ concurrency: 1 },
).pipe(Effect.asVoid);

const pendingUserInputs = yield* projectionPendingUserInputRepository.listByThreadId({
threadId,
});
yield* Effect.forEach(
pendingUserInputs,
(userInput) =>
projectionPendingUserInputRepository.deleteByRequestId({
requestId: userInput.requestId,
}),
{ concurrency: 1 },
).pipe(Effect.asVoid);
});

const applyProjectsProjection: ProjectorDefinition["apply"] = (event, _attachmentSideEffects) =>
Expand Down Expand Up @@ -1145,6 +1161,83 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
}
});

const applyPendingUserInputsProjection: ProjectorDefinition["apply"] = (
event,
_attachmentSideEffects,
) =>
Effect.gen(function* () {
switch (event.type) {
case "thread.activity-appended": {
const requestId =
extractActivityRequestId(event.payload.activity.payload) ??
event.metadata.requestId ??
null;
if (requestId === null) {
return;
}
const existingRow = yield* projectionPendingUserInputRepository.getByRequestId({
requestId,
});
if (event.payload.activity.kind === "user-input.resolved") {
yield* projectionPendingUserInputRepository.upsert({
requestId,
threadId: Option.isSome(existingRow)
? existingRow.value.threadId
: event.payload.threadId,
turnId: Option.isSome(existingRow)
? existingRow.value.turnId
: event.payload.activity.turnId,
status: "resolved",
createdAt: Option.isSome(existingRow)
? existingRow.value.createdAt
: event.payload.activity.createdAt,
resolvedAt: event.payload.activity.createdAt,
});
return;
}
if (event.payload.activity.kind !== "user-input.requested") {
return;
}
if (Option.isSome(existingRow) && existingRow.value.status === "resolved") {
return;
}
yield* projectionPendingUserInputRepository.upsert({
requestId,
threadId: event.payload.threadId,
turnId: event.payload.activity.turnId,
status: "pending",
createdAt: Option.isSome(existingRow)
? existingRow.value.createdAt
: event.payload.activity.createdAt,
resolvedAt: null,
});
return;
}

case "thread.user-input-response-requested": {
const existingRow = yield* projectionPendingUserInputRepository.getByRequestId({
requestId: event.payload.requestId,
});
yield* projectionPendingUserInputRepository.upsert({
requestId: event.payload.requestId,
threadId: Option.isSome(existingRow)
? existingRow.value.threadId
: event.payload.threadId,
turnId: Option.isSome(existingRow) ? existingRow.value.turnId : null,
status: "resolved",
createdAt: Option.isSome(existingRow)
? existingRow.value.createdAt
: event.payload.createdAt,
resolvedAt: event.payload.createdAt,
});
return;
}

default:
return;
}
});

const projectors: ReadonlyArray<ProjectorDefinition> = [
{
name: ORCHESTRATION_PROJECTOR_NAMES.projects,
Expand Down Expand Up @@ -1178,6 +1271,10 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
name: ORCHESTRATION_PROJECTOR_NAMES.pendingApprovals,
apply: applyPendingApprovalsProjection,
},
{
name: ORCHESTRATION_PROJECTOR_NAMES.pendingUserInputs,
apply: applyPendingUserInputsProjection,
},
{
name: ORCHESTRATION_PROJECTOR_NAMES.threads,
apply: applyThreadsProjection,
Expand Down Expand Up @@ -1282,5 +1379,6 @@ export const OrchestrationProjectionPipelineLive = Layer.effect(
Layer.provideMerge(ProjectionThreadSessionRepositoryLive),
Layer.provideMerge(ProjectionTurnRepositoryLive),
Layer.provideMerge(ProjectionPendingApprovalRepositoryLive),
Layer.provideMerge(ProjectionPendingUserInputRepositoryLive),
Layer.provideMerge(ProjectionStateRepositoryLive),
);
Loading
Loading