From 4f0e0bcda11156f5f833ce97e36e48e3a7f3c5bd Mon Sep 17 00:00:00 2001 From: Val Alexander Date: Thu, 2 Apr 2026 16:05:12 -0500 Subject: [PATCH] Use direct message lookup for streaming projections - Add getByMessageId to projection thread message repository - Reuse row mapping for single and list queries - Update thread-message tests for the new lookup --- .../Layers/ProjectionPipeline.ts | 11 ++-- .../Layers/ProjectionThreadMessages.test.ts | 40 ++++++++++- .../Layers/ProjectionThreadMessages.ts | 66 +++++++++++++++---- .../Services/ProjectionThreadMessages.ts | 17 ++++- 4 files changed, 112 insertions(+), 22 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 8fc9c2132..b1886ab30 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -564,12 +564,11 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { Effect.gen(function* () { switch (event.type) { case "thread.message-sent": { - const existingRows = yield* projectionThreadMessageRepository.listByThreadId({ - threadId: event.payload.threadId, - }); - const existingMessage = existingRows.find( - (row) => row.messageId === event.payload.messageId, - ); + const existingMessage = yield* projectionThreadMessageRepository + .getByMessageId({ + messageId: event.payload.messageId, + }) + .pipe(Effect.map(Option.getOrUndefined)); const nextText = existingMessage && event.payload.streaming ? `${existingMessage.text}${event.payload.text}` diff --git a/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts b/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts index cdd192f52..3fc97a609 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts @@ -1,6 +1,6 @@ import { MessageId, ThreadId } from "@okcode/contracts"; import { assert, it } from "@effect/vitest"; -import { Effect, Layer } from "effect"; +import { Effect, Layer, Option } from "effect"; import { ProjectionThreadMessageRepository } from "../Services/ProjectionThreadMessages.ts"; import { ProjectionThreadMessageRepositoryLive } from "./ProjectionThreadMessages.ts"; @@ -11,6 +11,44 @@ const layer = it.layer( ); layer("ProjectionThreadMessageRepository", (it) => { + it.effect("getByMessageId returns none when no row exists", () => + Effect.gen(function* () { + const repository = yield* ProjectionThreadMessageRepository; + const result = yield* repository.getByMessageId({ + messageId: MessageId.makeUnsafe("nonexistent-message"), + }); + assert.isTrue(Option.isNone(result)); + }), + ); + + it.effect("getByMessageId returns the matching message after upsert", () => + Effect.gen(function* () { + const repository = yield* ProjectionThreadMessageRepository; + const threadId = ThreadId.makeUnsafe("thread-get-by-id"); + const messageId = MessageId.makeUnsafe("message-get-by-id"); + const createdAt = "2026-02-28T20:00:00.000Z"; + const updatedAt = "2026-02-28T20:00:01.000Z"; + + yield* repository.upsert({ + messageId, + threadId, + turnId: null, + role: "user", + text: "hello", + isStreaming: false, + createdAt, + updatedAt, + }); + + const result = yield* repository.getByMessageId({ messageId }); + assert.isTrue(Option.isSome(result)); + const message = Option.getOrThrow(result); + assert.equal(message.messageId, messageId); + assert.equal(message.text, "hello"); + assert.equal(message.isStreaming, false); + }), + ); + it.effect("preserves existing attachments when upsert omits attachments", () => Effect.gen(function* () { const repository = yield* ProjectionThreadMessageRepository; diff --git a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts index f41b64222..438b182a4 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts @@ -1,6 +1,6 @@ import * as SqlClient from "effect/unstable/sql/SqlClient"; import * as SqlSchema from "effect/unstable/sql/SqlSchema"; -import { Effect, Layer, Schema, Struct } from "effect"; +import { Effect, Layer, Option, Schema, Struct } from "effect"; import { ChatAttachment } from "@okcode/contracts"; import { toPersistenceSqlError } from "../Errors.ts"; @@ -8,6 +8,7 @@ import { ProjectionThreadMessageRepository, type ProjectionThreadMessageRepositoryShape, DeleteProjectionThreadMessagesInput, + GetProjectionThreadMessageByMessageIdInput, ListProjectionThreadMessagesInput, ProjectionThreadMessage, } from "../Services/ProjectionThreadMessages.ts"; @@ -19,6 +20,25 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields( }), ); +type ProjectionThreadMessageDbRow = typeof ProjectionThreadMessageDbRowSchema.Type; + +/** Shared mapping from a DB row to the domain `ProjectionThreadMessage` shape. */ +function toProjectionThreadMessage( + row: ProjectionThreadMessageDbRow, +): typeof ProjectionThreadMessage.Type { + return { + messageId: row.messageId, + threadId: row.threadId, + turnId: row.turnId, + role: row.role, + text: row.text, + isStreaming: row.isStreaming === 1, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + ...(row.attachments !== null ? { attachments: row.attachments } : {}), + }; +} + const makeProjectionThreadMessageRepository = Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; @@ -95,6 +115,27 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { `, }); + const getProjectionThreadMessageByMessageId = SqlSchema.findOneOption({ + Request: GetProjectionThreadMessageByMessageIdInput, + Result: ProjectionThreadMessageDbRowSchema, + execute: ({ messageId }) => + sql` + SELECT + message_id AS "messageId", + thread_id AS "threadId", + turn_id AS "turnId", + role, + text, + attachments_json AS "attachments", + is_streaming AS "isStreaming", + created_at AS "createdAt", + updated_at AS "updatedAt" + FROM projection_thread_messages + WHERE message_id = ${messageId} + LIMIT 1 + `, + }); + const deleteProjectionThreadMessageRows = SqlSchema.void({ Request: DeleteProjectionThreadMessagesInput, execute: ({ threadId }) => @@ -109,24 +150,20 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { Effect.mapError(toPersistenceSqlError("ProjectionThreadMessageRepository.upsert:query")), ); + const getByMessageId: ProjectionThreadMessageRepositoryShape["getByMessageId"] = (input) => + getProjectionThreadMessageByMessageId(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadMessageRepository.getByMessageId:query"), + ), + Effect.map(Option.map(toProjectionThreadMessage)), + ); + const listByThreadId: ProjectionThreadMessageRepositoryShape["listByThreadId"] = (input) => listProjectionThreadMessageRows(input).pipe( Effect.mapError( toPersistenceSqlError("ProjectionThreadMessageRepository.listByThreadId:query"), ), - Effect.map((rows) => - rows.map((row) => ({ - messageId: row.messageId, - threadId: row.threadId, - turnId: row.turnId, - role: row.role, - text: row.text, - isStreaming: row.isStreaming === 1, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - ...(row.attachments !== null ? { attachments: row.attachments } : {}), - })), - ), + Effect.map((rows) => rows.map(toProjectionThreadMessage)), ); const deleteByThreadId: ProjectionThreadMessageRepositoryShape["deleteByThreadId"] = (input) => @@ -138,6 +175,7 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { return { upsert, + getByMessageId, listByThreadId, deleteByThreadId, } satisfies ProjectionThreadMessageRepositoryShape; diff --git a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts index ba622ce8a..3607388a8 100644 --- a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts @@ -15,7 +15,7 @@ import { IsoDateTime, } from "@okcode/contracts"; import { Schema, ServiceMap } from "effect"; -import type { Effect } from "effect"; +import type { Effect, Option } from "effect"; import type { ProjectionRepositoryError } from "../Errors.ts"; @@ -37,6 +37,12 @@ export const ListProjectionThreadMessagesInput = Schema.Struct({ }); export type ListProjectionThreadMessagesInput = typeof ListProjectionThreadMessagesInput.Type; +export const GetProjectionThreadMessageByMessageIdInput = Schema.Struct({ + messageId: MessageId, +}); +export type GetProjectionThreadMessageByMessageIdInput = + typeof GetProjectionThreadMessageByMessageIdInput.Type; + export const DeleteProjectionThreadMessagesInput = Schema.Struct({ threadId: ThreadId, }); @@ -55,6 +61,15 @@ export interface ProjectionThreadMessageRepositoryShape { message: ProjectionThreadMessage, ) => Effect.Effect; + /** + * Look up a single projected thread message by its message ID. + * + * Returns `Option.none()` when no matching row exists. + */ + readonly getByMessageId: ( + input: GetProjectionThreadMessageByMessageIdInput, + ) => Effect.Effect, ProjectionRepositoryError>; + /** * List projected thread messages for a thread. *