Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,11 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
Effect.gen(function* () {
switch (event.type) {
case "thread.message-sent": {
const existingRows = yield* projectionThreadMessageRepository.listByThreadId({
threadId: event.payload.threadId,
});
const existingMessage = existingRows.find(
(row) => row.messageId === event.payload.messageId,
);
const existingMessage = yield* projectionThreadMessageRepository
.getByMessageId({
messageId: event.payload.messageId,
})
.pipe(Effect.map(Option.getOrUndefined));
const nextText =
existingMessage && event.payload.streaming
? `${existingMessage.text}${event.payload.text}`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MessageId, ThreadId } from "@okcode/contracts";
import { assert, it } from "@effect/vitest";
import { Effect, Layer } from "effect";
import { Effect, Layer, Option } from "effect";

import { ProjectionThreadMessageRepository } from "../Services/ProjectionThreadMessages.ts";
import { ProjectionThreadMessageRepositoryLive } from "./ProjectionThreadMessages.ts";
Expand All @@ -11,6 +11,44 @@ const layer = it.layer(
);

layer("ProjectionThreadMessageRepository", (it) => {
it.effect("getByMessageId returns none when no row exists", () =>
Effect.gen(function* () {
const repository = yield* ProjectionThreadMessageRepository;
const result = yield* repository.getByMessageId({
messageId: MessageId.makeUnsafe("nonexistent-message"),
});
assert.isTrue(Option.isNone(result));
}),
);

it.effect("getByMessageId returns the matching message after upsert", () =>
Effect.gen(function* () {
const repository = yield* ProjectionThreadMessageRepository;
const threadId = ThreadId.makeUnsafe("thread-get-by-id");
const messageId = MessageId.makeUnsafe("message-get-by-id");
const createdAt = "2026-02-28T20:00:00.000Z";
const updatedAt = "2026-02-28T20:00:01.000Z";

yield* repository.upsert({
messageId,
threadId,
turnId: null,
role: "user",
text: "hello",
isStreaming: false,
createdAt,
updatedAt,
});

const result = yield* repository.getByMessageId({ messageId });
assert.isTrue(Option.isSome(result));
const message = Option.getOrThrow(result);
assert.equal(message.messageId, messageId);
assert.equal(message.text, "hello");
assert.equal(message.isStreaming, false);
}),
);

it.effect("preserves existing attachments when upsert omits attachments", () =>
Effect.gen(function* () {
const repository = yield* ProjectionThreadMessageRepository;
Expand Down
66 changes: 52 additions & 14 deletions apps/server/src/persistence/Layers/ProjectionThreadMessages.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import * as SqlClient from "effect/unstable/sql/SqlClient";
import * as SqlSchema from "effect/unstable/sql/SqlSchema";
import { Effect, Layer, Schema, Struct } from "effect";
import { Effect, Layer, Option, Schema, Struct } from "effect";
import { ChatAttachment } from "@okcode/contracts";

import { toPersistenceSqlError } from "../Errors.ts";
import {
ProjectionThreadMessageRepository,
type ProjectionThreadMessageRepositoryShape,
DeleteProjectionThreadMessagesInput,
GetProjectionThreadMessageByMessageIdInput,
ListProjectionThreadMessagesInput,
ProjectionThreadMessage,
} from "../Services/ProjectionThreadMessages.ts";
Expand All @@ -19,6 +20,25 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields(
}),
);

type ProjectionThreadMessageDbRow = typeof ProjectionThreadMessageDbRowSchema.Type;

/** Shared mapping from a DB row to the domain `ProjectionThreadMessage` shape. */
function toProjectionThreadMessage(
row: ProjectionThreadMessageDbRow,
): typeof ProjectionThreadMessage.Type {
return {
messageId: row.messageId,
threadId: row.threadId,
turnId: row.turnId,
role: row.role,
text: row.text,
isStreaming: row.isStreaming === 1,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
...(row.attachments !== null ? { attachments: row.attachments } : {}),
};
}

const makeProjectionThreadMessageRepository = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;

Expand Down Expand Up @@ -95,6 +115,27 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
`,
});

const getProjectionThreadMessageByMessageId = SqlSchema.findOneOption({
Request: GetProjectionThreadMessageByMessageIdInput,
Result: ProjectionThreadMessageDbRowSchema,
execute: ({ messageId }) =>
sql`
SELECT
message_id AS "messageId",
thread_id AS "threadId",
turn_id AS "turnId",
role,
text,
attachments_json AS "attachments",
is_streaming AS "isStreaming",
created_at AS "createdAt",
updated_at AS "updatedAt"
FROM projection_thread_messages
WHERE message_id = ${messageId}
LIMIT 1
`,
});

const deleteProjectionThreadMessageRows = SqlSchema.void({
Request: DeleteProjectionThreadMessagesInput,
execute: ({ threadId }) =>
Expand All @@ -109,24 +150,20 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
Effect.mapError(toPersistenceSqlError("ProjectionThreadMessageRepository.upsert:query")),
);

const getByMessageId: ProjectionThreadMessageRepositoryShape["getByMessageId"] = (input) =>
getProjectionThreadMessageByMessageId(input).pipe(
Effect.mapError(
toPersistenceSqlError("ProjectionThreadMessageRepository.getByMessageId:query"),
),
Effect.map(Option.map(toProjectionThreadMessage)),
);

const listByThreadId: ProjectionThreadMessageRepositoryShape["listByThreadId"] = (input) =>
listProjectionThreadMessageRows(input).pipe(
Effect.mapError(
toPersistenceSqlError("ProjectionThreadMessageRepository.listByThreadId:query"),
),
Effect.map((rows) =>
rows.map((row) => ({
messageId: row.messageId,
threadId: row.threadId,
turnId: row.turnId,
role: row.role,
text: row.text,
isStreaming: row.isStreaming === 1,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
...(row.attachments !== null ? { attachments: row.attachments } : {}),
})),
),
Effect.map((rows) => rows.map(toProjectionThreadMessage)),
);

const deleteByThreadId: ProjectionThreadMessageRepositoryShape["deleteByThreadId"] = (input) =>
Expand All @@ -138,6 +175,7 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {

return {
upsert,
getByMessageId,
listByThreadId,
deleteByThreadId,
} satisfies ProjectionThreadMessageRepositoryShape;
Expand Down
17 changes: 16 additions & 1 deletion apps/server/src/persistence/Services/ProjectionThreadMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
IsoDateTime,
} from "@okcode/contracts";
import { Schema, ServiceMap } from "effect";
import type { Effect } from "effect";
import type { Effect, Option } from "effect";

import type { ProjectionRepositoryError } from "../Errors.ts";

Expand All @@ -37,6 +37,12 @@ export const ListProjectionThreadMessagesInput = Schema.Struct({
});
export type ListProjectionThreadMessagesInput = typeof ListProjectionThreadMessagesInput.Type;

export const GetProjectionThreadMessageByMessageIdInput = Schema.Struct({
messageId: MessageId,
});
export type GetProjectionThreadMessageByMessageIdInput =
typeof GetProjectionThreadMessageByMessageIdInput.Type;

export const DeleteProjectionThreadMessagesInput = Schema.Struct({
threadId: ThreadId,
});
Expand All @@ -55,6 +61,15 @@ export interface ProjectionThreadMessageRepositoryShape {
message: ProjectionThreadMessage,
) => Effect.Effect<void, ProjectionRepositoryError>;

/**
* Look up a single projected thread message by its message ID.
*
* Returns `Option.none()` when no matching row exists.
*/
readonly getByMessageId: (
input: GetProjectionThreadMessageByMessageIdInput,
) => Effect.Effect<Option.Option<ProjectionThreadMessage>, ProjectionRepositoryError>;

/**
* List projected thread messages for a thread.
*
Expand Down
Loading