diff --git a/apps/server/src/serverLayers.ts b/apps/server/src/serverLayers.ts index f78474fc6..b7ee3bb6c 100644 --- a/apps/server/src/serverLayers.ts +++ b/apps/server/src/serverLayers.ts @@ -166,6 +166,7 @@ export function makeServerRuntimeServicesLayer() { const githubLayer = GitHubLive.pipe(Layer.provideMerge(GitHubCliLive)); const smeChatLayer = SmeChatServiceLive.pipe( + Layer.provideMerge(EnvironmentVariablesLive), Layer.provide(SmeKnowledgeDocumentRepositoryLive), Layer.provide(SmeConversationRepositoryLive), Layer.provide(SmeMessageRepositoryLive), diff --git a/apps/server/src/sme/Layers/SmeChatServiceLive.test.ts b/apps/server/src/sme/Layers/SmeChatServiceLive.test.ts new file mode 100644 index 000000000..a3d724886 --- /dev/null +++ b/apps/server/src/sme/Layers/SmeChatServiceLive.test.ts @@ -0,0 +1,324 @@ +import { ProjectId, SmeConversationId, type EnvironmentVariableEntry } from "@okcode/contracts"; +import { Effect, Layer, Option } from "effect"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +import { + EnvironmentVariables, + type EnvironmentVariablesShape, +} from "../../persistence/Services/EnvironmentVariables.ts"; +import { + SmeKnowledgeDocumentRepository, + type SmeKnowledgeDocumentRepositoryShape, + type SmeKnowledgeDocumentRow, +} from "../../persistence/Services/SmeKnowledgeDocuments.ts"; +import { + SmeConversationRepository, + type SmeConversationRepositoryShape, + type SmeConversationRow, +} from "../../persistence/Services/SmeConversations.ts"; +import { + SmeMessageRepository, + type SmeMessageRepositoryShape, + type SmeMessageRow, +} from "../../persistence/Services/SmeMessages.ts"; +import { SmeChatService } from "../Services/SmeChatService.ts"; +import { makeSmeChatServiceLive } from "./SmeChatServiceLive.ts"; + +const originalAnthropicEnv = { + ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY, + ANTHROPIC_AUTH_TOKEN: process.env.ANTHROPIC_AUTH_TOKEN, + ANTHROPIC_BASE_URL: process.env.ANTHROPIC_BASE_URL, +}; + +afterEach(() => { + restoreAnthropicEnv(); +}); + +function restoreAnthropicEnv() { + for (const [key, value] of Object.entries(originalAnthropicEnv)) { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } + } +} + +function setAnthropicEnv(input: { + readonly apiKey?: string; + readonly authToken?: string; + readonly baseURL?: string; +}) { + if (input.apiKey === undefined) { + delete process.env.ANTHROPIC_API_KEY; + } else { + process.env.ANTHROPIC_API_KEY = input.apiKey; + } + + if (input.authToken === undefined) { + delete process.env.ANTHROPIC_AUTH_TOKEN; + } else { + process.env.ANTHROPIC_AUTH_TOKEN = input.authToken; + } + + if (input.baseURL === undefined) { + delete process.env.ANTHROPIC_BASE_URL; + } else { + process.env.ANTHROPIC_BASE_URL = input.baseURL; + } +} + +function toEntries(record: Record): EnvironmentVariableEntry[] { + return Object.entries(record).map(([key, value]) => ({ key, value })); +} + +function makeEnvironmentVariables(persistedEnv: Record): EnvironmentVariablesShape { + const entries = toEntries(persistedEnv); + + return { + getGlobal: () => Effect.succeed({ entries }), + saveGlobal: (input) => Effect.succeed({ entries: input.entries }), + getProject: (input) => Effect.succeed({ projectId: input.projectId, entries }), + saveProject: (input) => + Effect.succeed({ + projectId: input.projectId, + entries: input.entries, + }), + resolveEnvironment: () => Effect.succeed(persistedEnv), + }; +} + +function makeDocumentRepository( + rows: ReadonlyArray = [], +): SmeKnowledgeDocumentRepositoryShape { + const documents = new Map(rows.map((row) => [row.documentId, row] as const)); + const getOption = (value: T | undefined) => + value === undefined ? Option.none() : Option.some(value); + + return { + upsert: (row) => + Effect.sync(() => { + documents.set(row.documentId, row); + }), + getById: ({ documentId }) => Effect.succeed(getOption(documents.get(documentId))), + listByProjectId: ({ projectId }) => + Effect.succeed([...documents.values()].filter((row) => row.projectId === projectId)), + deleteById: ({ documentId }) => + Effect.sync(() => { + documents.delete(documentId); + }), + }; +} + +function makeConversationRepository( + rows: ReadonlyArray, +): SmeConversationRepositoryShape { + const conversations = new Map(rows.map((row) => [row.conversationId, row] as const)); + const getOption = (value: T | undefined) => + value === undefined ? Option.none() : Option.some(value); + + return { + upsert: (row) => + Effect.sync(() => { + conversations.set(row.conversationId, row); + }), + getById: ({ conversationId }) => Effect.succeed(getOption(conversations.get(conversationId))), + listByProjectId: ({ projectId }) => + Effect.succeed([...conversations.values()].filter((row) => row.projectId === projectId)), + deleteById: ({ conversationId }) => + Effect.sync(() => { + conversations.delete(conversationId); + }), + }; +} + +function makeMessageRepository() { + const rowsByConversation = new Map(); + + const repository: SmeMessageRepositoryShape = { + upsert: (row) => + Effect.sync(() => { + const existing = rowsByConversation.get(row.conversationId) ?? []; + const next = existing.filter((message) => message.messageId !== row.messageId); + next.push(row); + rowsByConversation.set(row.conversationId, next); + }), + listByConversationId: ({ conversationId }) => + Effect.succeed(rowsByConversation.get(conversationId) ?? []), + deleteByConversationId: ({ conversationId }) => + Effect.sync(() => { + rowsByConversation.delete(conversationId); + }), + }; + + return { repository, rowsByConversation }; +} + +describe("SmeChatServiceLive", () => { + it("uses persisted Anthropic credentials for a successful send and stores the final reply", async () => { + setAnthropicEnv({ + apiKey: "process-key-that-should-not-win", + authToken: "process-token-that-should-not-win", + baseURL: "https://process-base.example", + }); + + const projectId = ProjectId.makeUnsafe("project-1"); + const conversationId = SmeConversationId.makeUnsafe("conversation-1"); + const conversationRow: SmeConversationRow = { + conversationId, + projectId, + title: "Architecture Q&A", + model: "claude-sonnet-4-6", + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + deletedAt: null, + }; + const persistedEnv = { + ANTHROPIC_API_KEY: "project-api-key", + ANTHROPIC_BASE_URL: "https://project-base.example", + }; + const { repository: messageRepo, rowsByConversation } = makeMessageRepository(); + const capturedClientOptions: Array = []; + const capturedRequests: Array = []; + + const createClient = vi.fn((options: unknown) => { + capturedClientOptions.push(options); + return { + messages: { + stream: async function* (request: unknown) { + capturedRequests.push(request); + yield { + type: "content_block_delta", + delta: { type: "text_delta", text: "Hello" }, + }; + yield { + type: "content_block_delta", + delta: { type: "text_delta", text: " world" }, + }; + }, + }, + } as never; + }); + + const layer = makeSmeChatServiceLive({ createClient }).pipe( + Layer.provideMerge( + Layer.succeed(EnvironmentVariables, makeEnvironmentVariables(persistedEnv)), + ), + Layer.provideMerge(Layer.succeed(SmeKnowledgeDocumentRepository, makeDocumentRepository())), + Layer.provideMerge( + Layer.succeed(SmeConversationRepository, makeConversationRepository([conversationRow])), + ), + Layer.provideMerge(Layer.succeed(SmeMessageRepository, messageRepo)), + ); + + const events: Array = []; + await Effect.runPromise( + Effect.gen(function* () { + const service = yield* SmeChatService; + yield* service.sendMessage( + { + conversationId, + text: "What changed in the latest design?", + }, + (event) => { + events.push(event); + }, + ); + }).pipe(Effect.provide(layer)), + ); + + expect(createClient).toHaveBeenCalledTimes(1); + expect(capturedClientOptions).toEqual([ + { + apiKey: "project-api-key", + authToken: null, + baseURL: "https://project-base.example", + }, + ]); + expect(capturedRequests).toEqual([ + { + model: "claude-sonnet-4-6", + max_tokens: 8192, + system: expect.stringContaining("knowledgeable subject matter expert assistant"), + messages: [{ role: "user", content: "What changed in the latest design?" }], + }, + ]); + expect(events).toEqual([ + { + type: "sme.message.delta", + conversationId, + messageId: expect.any(String), + text: "Hello", + }, + { + type: "sme.message.delta", + conversationId, + messageId: expect.any(String), + text: " world", + }, + { + type: "sme.message.complete", + conversationId, + messageId: expect.any(String), + text: "Hello world", + }, + ]); + + const storedMessages = rowsByConversation.get(conversationId); + expect(storedMessages).toHaveLength(2); + expect( + storedMessages?.map((message) => ({ + role: message.role, + text: message.text, + isStreaming: message.isStreaming, + })), + ).toEqual([ + { role: "user", text: "What changed in the latest design?", isStreaming: false }, + { role: "assistant", text: "Hello world", isStreaming: false }, + ]); + }); + + it("fails before persisting messages when no Anthropic credentials are available", async () => { + setAnthropicEnv({}); + + const projectId = ProjectId.makeUnsafe("project-2"); + const conversationId = SmeConversationId.makeUnsafe("conversation-2"); + const conversationRow: SmeConversationRow = { + conversationId, + projectId, + title: "Docs sync", + model: "claude-sonnet-4-6", + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + deletedAt: null, + }; + const { repository: messageRepo, rowsByConversation } = makeMessageRepository(); + const createClient = vi.fn(); + + const layer = makeSmeChatServiceLive({ createClient }).pipe( + Layer.provideMerge(Layer.succeed(EnvironmentVariables, makeEnvironmentVariables({}))), + Layer.provideMerge(Layer.succeed(SmeKnowledgeDocumentRepository, makeDocumentRepository())), + Layer.provideMerge( + Layer.succeed(SmeConversationRepository, makeConversationRepository([conversationRow])), + ), + Layer.provideMerge(Layer.succeed(SmeMessageRepository, messageRepo)), + ); + + await expect( + Effect.runPromise( + Effect.gen(function* () { + const service = yield* SmeChatService; + yield* service.sendMessage({ + conversationId, + text: "Can you summarize the docs?", + }); + }).pipe(Effect.provide(layer)), + ), + ).rejects.toThrow( + "SmeChatError in sendMessage:auth: SME Chat requires ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN.", + ); + + expect(createClient).not.toHaveBeenCalled(); + expect(rowsByConversation.get(conversationId) ?? []).toEqual([]); + }); +}); diff --git a/apps/server/src/sme/Layers/SmeChatServiceLive.ts b/apps/server/src/sme/Layers/SmeChatServiceLive.ts index 7ab3d7f0d..152946252 100644 --- a/apps/server/src/sme/Layers/SmeChatServiceLive.ts +++ b/apps/server/src/sme/Layers/SmeChatServiceLive.ts @@ -13,11 +13,13 @@ import { SME_MAX_DOCUMENTS_PER_PROJECT, SME_MAX_CONVERSATIONS_PER_PROJECT, } from "@okcode/contracts"; +import { compactNodeProcessEnv } from "@okcode/shared/environment"; import { DateTime, Effect, Layer, Option, Random, Ref } from "effect"; import crypto from "node:crypto"; import { SmeKnowledgeDocumentRepository } from "../../persistence/Services/SmeKnowledgeDocuments.ts"; import { SmeConversationRepository } from "../../persistence/Services/SmeConversations.ts"; +import { EnvironmentVariables } from "../../persistence/Services/EnvironmentVariables.ts"; import { SmeMessageRepository } from "../../persistence/Services/SmeMessages.ts"; import { SmeChatError, @@ -25,370 +27,437 @@ import { type SmeChatServiceShape, } from "../Services/SmeChatService.ts"; -const makeSmeChatService = Effect.gen(function* () { - const documentRepo = yield* SmeKnowledgeDocumentRepository; - const conversationRepo = yield* SmeConversationRepository; - const messageRepo = yield* SmeMessageRepository; +type AnthropicMessagesClient = Pick; - // Track active streaming fibers per conversation for interruption - const activeStreams = yield* Ref.make(new Map()); +interface ResolvedAnthropicClientOptions { + readonly apiKey: string | null; + readonly authToken: string | null; + readonly baseURL?: string; +} - const generateId = () => - Effect.map( - Random.nextIntBetween(0, Number.MAX_SAFE_INTEGER), - (n) => `${Date.now().toString(36)}-${n.toString(36)}`, - ); +export interface SmeChatServiceLiveOptions { + readonly createClient?: (options: ResolvedAnthropicClientOptions) => AnthropicMessagesClient; +} - const now = () => Effect.map(DateTime.now, (dt) => DateTime.formatIso(dt)); - - // ── Document Operations ───────────────────────────────────────────── - - const uploadDocument: SmeChatServiceShape["uploadDocument"] = (input) => - Effect.gen(function* () { - // Check document count limit - const existing = yield* documentRepo - .listByProjectId({ projectId: input.projectId }) - .pipe(Effect.mapError((e) => new SmeChatError("uploadDocument", e.message))); - if (existing.length >= SME_MAX_DOCUMENTS_PER_PROJECT) { - return yield* Effect.fail( - new SmeChatError( - "uploadDocument", - `Maximum ${SME_MAX_DOCUMENTS_PER_PROJECT} documents per project exceeded`, - ), - ); - } - - // Decode base64 content - const contentBuffer = Buffer.from(input.contentBase64, "base64"); - if (contentBuffer.byteLength > SME_MAX_DOCUMENT_SIZE_BYTES) { - return yield* Effect.fail( - new SmeChatError( - "uploadDocument", - `Document exceeds maximum size of ${SME_MAX_DOCUMENT_SIZE_BYTES} bytes`, - ), - ); - } - - const contentText = contentBuffer.toString("utf-8"); - const contentHash = crypto.createHash("sha256").update(contentText).digest("hex"); - - const documentId = yield* generateId(); - const timestamp = yield* now(); - - const row = { - documentId, - projectId: input.projectId, - title: input.title, - fileName: input.fileName, - mimeType: input.mimeType, - sizeBytes: contentBuffer.byteLength, - contentText, - contentHash, - createdAt: timestamp, - updatedAt: timestamp, - deletedAt: null, - }; - - yield* documentRepo - .upsert(row as any) - .pipe(Effect.mapError((e) => new SmeChatError("uploadDocument", e.message))); - - return { - documentId, - projectId: input.projectId, - title: input.title, - fileName: input.fileName, - mimeType: input.mimeType, - sizeBytes: contentBuffer.byteLength, - contentHash, - createdAt: timestamp, - updatedAt: timestamp, - deletedAt: null, - } as SmeKnowledgeDocument; - }); - - const deleteDocument: SmeChatServiceShape["deleteDocument"] = (input) => - documentRepo - .deleteById({ documentId: input.documentId }) - .pipe(Effect.mapError((e) => new SmeChatError("deleteDocument", e.message))); - - const listDocuments: SmeChatServiceShape["listDocuments"] = (input) => - documentRepo.listByProjectId({ projectId: input.projectId }).pipe( - Effect.mapError((e) => new SmeChatError("listDocuments", e.message)), - Effect.map((rows) => - rows.map( - (r) => - ({ - documentId: r.documentId, - projectId: r.projectId, - title: r.title, - fileName: r.fileName, - mimeType: r.mimeType, - sizeBytes: r.sizeBytes, - contentHash: r.contentHash, - createdAt: r.createdAt, - updatedAt: r.updatedAt, - deletedAt: r.deletedAt, - }) as SmeKnowledgeDocument, - ), - ), - ); +const SME_MISSING_ANTHROPIC_AUTH_MESSAGE = + "SME Chat requires ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN. Add one in Settings > Environment Variables (global or project), or launch OK Code with one set in the server environment."; - // ── Conversation Operations ─────────────────────────────────────── - - const createConversation: SmeChatServiceShape["createConversation"] = (input) => - Effect.gen(function* () { - const existing = yield* conversationRepo - .listByProjectId({ projectId: input.projectId }) - .pipe(Effect.mapError((e) => new SmeChatError("createConversation", e.message))); - if (existing.length >= SME_MAX_CONVERSATIONS_PER_PROJECT) { - return yield* Effect.fail( - new SmeChatError( - "createConversation", - `Maximum ${SME_MAX_CONVERSATIONS_PER_PROJECT} conversations per project exceeded`, - ), - ); - } - - const conversationId = yield* generateId(); - const timestamp = yield* now(); - - const row = { - conversationId, - projectId: input.projectId, - title: input.title, - model: input.model, - createdAt: timestamp, - updatedAt: timestamp, - deletedAt: null, - }; - - yield* conversationRepo - .upsert(row as any) - .pipe(Effect.mapError((e) => new SmeChatError("createConversation", e.message))); - - return row as SmeConversation; - }); - - const deleteConversation: SmeChatServiceShape["deleteConversation"] = (input) => - Effect.gen(function* () { - yield* messageRepo - .deleteByConversationId({ conversationId: input.conversationId }) - .pipe(Effect.mapError((e) => new SmeChatError("deleteConversation", e.message))); - yield* conversationRepo - .deleteById({ conversationId: input.conversationId }) - .pipe(Effect.mapError((e) => new SmeChatError("deleteConversation", e.message))); - }); - - const listConversations: SmeChatServiceShape["listConversations"] = (input) => - conversationRepo.listByProjectId({ projectId: input.projectId }).pipe( - Effect.mapError((e) => new SmeChatError("listConversations", e.message)), - Effect.map((rows) => - rows.map( - (r) => - ({ - conversationId: r.conversationId, - projectId: r.projectId, - title: r.title, - model: r.model, - createdAt: r.createdAt, - updatedAt: r.updatedAt, - deletedAt: r.deletedAt, - }) as SmeConversation, - ), - ), - ); +function normalizeOptionalEnvValue(value: string | undefined | null): string | null { + const trimmed = value?.trim(); + return trimmed && trimmed.length > 0 ? trimmed : null; +} - const getConversation: SmeChatServiceShape["getConversation"] = (input) => - Effect.gen(function* () { - const optConv = yield* conversationRepo - .getById({ conversationId: input.conversationId }) - .pipe(Effect.mapError((e) => new SmeChatError("getConversation", e.message))); - - if (Option.isNone(optConv)) return null; - const conv = optConv.value; - - const messages = yield* messageRepo - .listByConversationId({ conversationId: input.conversationId }) - .pipe(Effect.mapError((e) => new SmeChatError("getConversation", e.message))); - - return { - conversation: { - conversationId: conv.conversationId, - projectId: conv.projectId, - title: conv.title, - model: conv.model, - createdAt: conv.createdAt, - updatedAt: conv.updatedAt, - deletedAt: conv.deletedAt, - } as SmeConversation, - messages: messages.map( - (m) => - ({ - messageId: m.messageId, - conversationId: m.conversationId, - role: m.role, - text: m.text, - isStreaming: m.isStreaming, - createdAt: m.createdAt, - updatedAt: m.updatedAt, - }) as SmeMessage, - ), - }; - }); - - // ── Message Sending ─────────────────────────────────────────────── - - const sendMessage: SmeChatServiceShape["sendMessage"] = (input, onEvent) => - Effect.gen(function* () { - // 1. Resolve conversation - const optConv = yield* conversationRepo - .getById({ conversationId: input.conversationId }) - .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); - if (Option.isNone(optConv)) { - return yield* Effect.fail(new SmeChatError("sendMessage", "Conversation not found")); - } - const conv = optConv.value; - - // 2. Load knowledge documents - const docs = yield* documentRepo - .listByProjectId({ projectId: conv.projectId }) - .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); - - // 3. Load conversation history - const existingMessages = yield* messageRepo - .listByConversationId({ conversationId: input.conversationId }) - .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); - - // 4. Persist user message - const userMessageId = yield* generateId(); - const timestamp = yield* now(); - yield* messageRepo - .upsert({ - messageId: userMessageId, - conversationId: input.conversationId, - role: "user", - text: input.text, - isStreaming: false, +function pickAnthropicCredential(env: Record) { + const apiKey = normalizeOptionalEnvValue(env.ANTHROPIC_API_KEY); + const authToken = normalizeOptionalEnvValue(env.ANTHROPIC_AUTH_TOKEN); + if (apiKey !== null) { + return { apiKey, authToken: null } as const; + } + if (authToken !== null) { + return { apiKey: null, authToken } as const; + } + return null; +} + +export function resolveAnthropicClientOptions(input: { + readonly persistedEnv: Record; + readonly processEnv?: NodeJS.ProcessEnv; +}): ResolvedAnthropicClientOptions | null { + const persistedCredential = pickAnthropicCredential(input.persistedEnv); + const processEnvRecord = compactNodeProcessEnv(input.processEnv ?? process.env); + const processCredential = pickAnthropicCredential(processEnvRecord); + const baseURL = + normalizeOptionalEnvValue(input.persistedEnv.ANTHROPIC_BASE_URL) ?? + normalizeOptionalEnvValue(processEnvRecord.ANTHROPIC_BASE_URL) ?? + undefined; + + const credential = persistedCredential ?? processCredential; + if (credential === null) { + return null; + } + + return { + ...credential, + ...(baseURL ? { baseURL } : {}), + }; +} + +const makeSmeChatService = (options: SmeChatServiceLiveOptions = {}) => + Effect.gen(function* () { + const documentRepo = yield* SmeKnowledgeDocumentRepository; + const conversationRepo = yield* SmeConversationRepository; + const messageRepo = yield* SmeMessageRepository; + const environmentVariables = yield* EnvironmentVariables; + const createClient = + options.createClient ?? + ((clientOptions: ResolvedAnthropicClientOptions): AnthropicMessagesClient => + new Anthropic(clientOptions)); + + // Track active streaming fibers per conversation for interruption + const activeStreams = yield* Ref.make(new Map()); + + const generateId = () => + Effect.map( + Random.nextIntBetween(0, Number.MAX_SAFE_INTEGER), + (n) => `${Date.now().toString(36)}-${n.toString(36)}`, + ); + + const now = () => Effect.map(DateTime.now, (dt) => DateTime.formatIso(dt)); + + // ── Document Operations ───────────────────────────────────────────── + + const uploadDocument: SmeChatServiceShape["uploadDocument"] = (input) => + Effect.gen(function* () { + // Check document count limit + const existing = yield* documentRepo + .listByProjectId({ projectId: input.projectId }) + .pipe(Effect.mapError((e) => new SmeChatError("uploadDocument", e.message))); + if (existing.length >= SME_MAX_DOCUMENTS_PER_PROJECT) { + return yield* Effect.fail( + new SmeChatError( + "uploadDocument", + `Maximum ${SME_MAX_DOCUMENTS_PER_PROJECT} documents per project exceeded`, + ), + ); + } + + // Decode base64 content + const contentBuffer = Buffer.from(input.contentBase64, "base64"); + if (contentBuffer.byteLength > SME_MAX_DOCUMENT_SIZE_BYTES) { + return yield* Effect.fail( + new SmeChatError( + "uploadDocument", + `Document exceeds maximum size of ${SME_MAX_DOCUMENT_SIZE_BYTES} bytes`, + ), + ); + } + + const contentText = contentBuffer.toString("utf-8"); + const contentHash = crypto.createHash("sha256").update(contentText).digest("hex"); + + const documentId = yield* generateId(); + const timestamp = yield* now(); + + const row = { + documentId, + projectId: input.projectId, + title: input.title, + fileName: input.fileName, + mimeType: input.mimeType, + sizeBytes: contentBuffer.byteLength, + contentText, + contentHash, createdAt: timestamp, updatedAt: timestamp, - } as any) - .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); - - // 5. Create assistant message placeholder - const assistantMessageId = yield* generateId(); - yield* messageRepo - .upsert({ - messageId: assistantMessageId, - conversationId: input.conversationId, - role: "assistant", - text: "", - isStreaming: true, + deletedAt: null, + }; + + yield* documentRepo + .upsert(row as any) + .pipe(Effect.mapError((e) => new SmeChatError("uploadDocument", e.message))); + + return { + documentId, + projectId: input.projectId, + title: input.title, + fileName: input.fileName, + mimeType: input.mimeType, + sizeBytes: contentBuffer.byteLength, + contentHash, createdAt: timestamp, updatedAt: timestamp, - } as any) - .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); - - // 6. Build messages array for the API - const systemPrompt = buildSystemPrompt(docs); - const apiMessages: Array<{ role: "user" | "assistant"; content: string }> = []; - for (const msg of existingMessages) { - if (msg.role === "user" || msg.role === "assistant") { - apiMessages.push({ role: msg.role as "user" | "assistant", content: msg.text }); - } - } - apiMessages.push({ role: "user", content: input.text }); - - // 7. Stream completion via Anthropic Messages API - const abortController = new AbortController(); - yield* Ref.update(activeStreams, (map) => { - const newMap = new Map(map); - newMap.set(input.conversationId, abortController); - return newMap; + deletedAt: null, + } as SmeKnowledgeDocument; }); - const fullText = yield* Effect.tryPromise({ - try: async () => { - const anthropic = new Anthropic(); - let result = ""; - const stream = anthropic.messages.stream( - { - model: conv.model, - max_tokens: 8192, - system: systemPrompt, - messages: apiMessages, - }, - { signal: abortController.signal }, + const deleteDocument: SmeChatServiceShape["deleteDocument"] = (input) => + documentRepo + .deleteById({ documentId: input.documentId }) + .pipe(Effect.mapError((e) => new SmeChatError("deleteDocument", e.message))); + + const listDocuments: SmeChatServiceShape["listDocuments"] = (input) => + documentRepo.listByProjectId({ projectId: input.projectId }).pipe( + Effect.mapError((e) => new SmeChatError("listDocuments", e.message)), + Effect.map((rows) => + rows.map( + (r) => + ({ + documentId: r.documentId, + projectId: r.projectId, + title: r.title, + fileName: r.fileName, + mimeType: r.mimeType, + sizeBytes: r.sizeBytes, + contentHash: r.contentHash, + createdAt: r.createdAt, + updatedAt: r.updatedAt, + deletedAt: r.deletedAt, + }) as SmeKnowledgeDocument, + ), + ), + ); + + // ── Conversation Operations ─────────────────────────────────────── + + const createConversation: SmeChatServiceShape["createConversation"] = (input) => + Effect.gen(function* () { + const existing = yield* conversationRepo + .listByProjectId({ projectId: input.projectId }) + .pipe(Effect.mapError((e) => new SmeChatError("createConversation", e.message))); + if (existing.length >= SME_MAX_CONVERSATIONS_PER_PROJECT) { + return yield* Effect.fail( + new SmeChatError( + "createConversation", + `Maximum ${SME_MAX_CONVERSATIONS_PER_PROJECT} conversations per project exceeded`, + ), ); + } - for await (const event of stream) { - if (event.type === "content_block_delta" && event.delta.type === "text_delta") { - result += event.delta.text; - onEvent?.({ - type: "sme.message.delta", - conversationId: input.conversationId, - messageId: assistantMessageId, - text: event.delta.text, - } as any); - } - } - return result; - }, - catch: (err) => new SmeChatError("sendMessage:stream", String(err), err), - }).pipe( - Effect.ensuring( - Ref.update(activeStreams, (map) => { - const newMap = new Map(map); - newMap.delete(input.conversationId); - return newMap; - }), + const conversationId = yield* generateId(); + const timestamp = yield* now(); + + const row = { + conversationId, + projectId: input.projectId, + title: input.title, + model: input.model, + createdAt: timestamp, + updatedAt: timestamp, + deletedAt: null, + }; + + yield* conversationRepo + .upsert(row as any) + .pipe(Effect.mapError((e) => new SmeChatError("createConversation", e.message))); + + return row as SmeConversation; + }); + + const deleteConversation: SmeChatServiceShape["deleteConversation"] = (input) => + Effect.gen(function* () { + yield* messageRepo + .deleteByConversationId({ conversationId: input.conversationId }) + .pipe(Effect.mapError((e) => new SmeChatError("deleteConversation", e.message))); + yield* conversationRepo + .deleteById({ conversationId: input.conversationId }) + .pipe(Effect.mapError((e) => new SmeChatError("deleteConversation", e.message))); + }); + + const listConversations: SmeChatServiceShape["listConversations"] = (input) => + conversationRepo.listByProjectId({ projectId: input.projectId }).pipe( + Effect.mapError((e) => new SmeChatError("listConversations", e.message)), + Effect.map((rows) => + rows.map( + (r) => + ({ + conversationId: r.conversationId, + projectId: r.projectId, + title: r.title, + model: r.model, + createdAt: r.createdAt, + updatedAt: r.updatedAt, + deletedAt: r.deletedAt, + }) as SmeConversation, + ), ), ); - // 8. Finalize assistant message - const finalTimestamp = yield* now(); - yield* messageRepo - .upsert({ - messageId: assistantMessageId, + const getConversation: SmeChatServiceShape["getConversation"] = (input) => + Effect.gen(function* () { + const optConv = yield* conversationRepo + .getById({ conversationId: input.conversationId }) + .pipe(Effect.mapError((e) => new SmeChatError("getConversation", e.message))); + + if (Option.isNone(optConv)) return null; + const conv = optConv.value; + + const messages = yield* messageRepo + .listByConversationId({ conversationId: input.conversationId }) + .pipe(Effect.mapError((e) => new SmeChatError("getConversation", e.message))); + + return { + conversation: { + conversationId: conv.conversationId, + projectId: conv.projectId, + title: conv.title, + model: conv.model, + createdAt: conv.createdAt, + updatedAt: conv.updatedAt, + deletedAt: conv.deletedAt, + } as SmeConversation, + messages: messages.map( + (m) => + ({ + messageId: m.messageId, + conversationId: m.conversationId, + role: m.role, + text: m.text, + isStreaming: m.isStreaming, + createdAt: m.createdAt, + updatedAt: m.updatedAt, + }) as SmeMessage, + ), + }; + }); + + // ── Message Sending ─────────────────────────────────────────────── + + const sendMessage: SmeChatServiceShape["sendMessage"] = (input, onEvent) => + Effect.gen(function* () { + // 1. Resolve conversation + const optConv = yield* conversationRepo + .getById({ conversationId: input.conversationId }) + .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); + if (Option.isNone(optConv)) { + return yield* Effect.fail(new SmeChatError("sendMessage", "Conversation not found")); + } + const conv = optConv.value; + + // 2. Resolve Anthropic auth up front so auth failures do not persist + // speculative user/assistant messages. + const persistedEnv = yield* environmentVariables + .resolveEnvironment({ projectId: conv.projectId }) + .pipe(Effect.mapError((e) => new SmeChatError("sendMessage:env", e.message))); + const anthropicOptions = resolveAnthropicClientOptions({ + persistedEnv, + processEnv: process.env, + }); + if (anthropicOptions === null) { + return yield* Effect.fail( + new SmeChatError("sendMessage:auth", SME_MISSING_ANTHROPIC_AUTH_MESSAGE), + ); + } + const anthropic = createClient(anthropicOptions); + + // 3. Load knowledge documents + const docs = yield* documentRepo + .listByProjectId({ projectId: conv.projectId }) + .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); + + // 4. Load conversation history + const existingMessages = yield* messageRepo + .listByConversationId({ conversationId: input.conversationId }) + .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); + + // 5. Persist user message + const userMessageId = yield* generateId(); + const timestamp = yield* now(); + yield* messageRepo + .upsert({ + messageId: userMessageId, + conversationId: input.conversationId, + role: "user", + text: input.text, + isStreaming: false, + createdAt: timestamp, + updatedAt: timestamp, + } as any) + .pipe(Effect.mapError((e) => new SmeChatError("sendMessage", e.message))); + + // 6. Reserve an assistant message ID for streaming updates. We only + // persist the assistant message after a successful completion so failed + // requests do not leave behind blank "streaming" rows. + const assistantMessageId = yield* generateId(); + + // 7. Build messages array for the API + const systemPrompt = buildSystemPrompt(docs); + const apiMessages: Array<{ role: "user" | "assistant"; content: string }> = []; + for (const msg of existingMessages) { + if (msg.role === "user" || msg.role === "assistant") { + apiMessages.push({ role: msg.role as "user" | "assistant", content: msg.text }); + } + } + apiMessages.push({ role: "user", content: input.text }); + + // 8. Stream completion via Anthropic Messages API + const abortController = new AbortController(); + yield* Ref.update(activeStreams, (map) => { + const newMap = new Map(map); + newMap.set(input.conversationId, abortController); + return newMap; + }); + + const fullText = yield* Effect.tryPromise({ + try: async () => { + let result = ""; + const stream = anthropic.messages.stream( + { + model: conv.model, + max_tokens: 8192, + system: systemPrompt, + messages: apiMessages, + }, + { signal: abortController.signal }, + ); + + for await (const event of stream) { + if (event.type === "content_block_delta" && event.delta.type === "text_delta") { + result += event.delta.text; + onEvent?.({ + type: "sme.message.delta", + conversationId: input.conversationId, + messageId: assistantMessageId, + text: event.delta.text, + } as any); + } + } + return result; + }, + catch: (err) => new SmeChatError("sendMessage:stream", String(err), err), + }).pipe( + Effect.ensuring( + Ref.update(activeStreams, (map) => { + const newMap = new Map(map); + newMap.delete(input.conversationId); + return newMap; + }), + ), + ); + + // 9. Finalize assistant message + const finalTimestamp = yield* now(); + yield* messageRepo + .upsert({ + messageId: assistantMessageId, + conversationId: input.conversationId, + role: "assistant", + text: fullText, + isStreaming: false, + createdAt: timestamp, + updatedAt: finalTimestamp, + } as any) + .pipe(Effect.mapError((e) => new SmeChatError("sendMessage:finalize", e.message))); + + // 10. Emit completion event + onEvent?.({ + type: "sme.message.complete", conversationId: input.conversationId, - role: "assistant", + messageId: assistantMessageId, text: fullText, - isStreaming: false, - createdAt: timestamp, - updatedAt: finalTimestamp, - } as any) - .pipe(Effect.mapError((e) => new SmeChatError("sendMessage:finalize", e.message))); - - // 9. Emit completion event - onEvent?.({ - type: "sme.message.complete", - conversationId: input.conversationId, - messageId: assistantMessageId, - text: fullText, - } as any); - }); - - const interruptMessage: SmeChatServiceShape["interruptMessage"] = (input) => - Effect.gen(function* () { - const streams = yield* Ref.get(activeStreams); - const controller = streams.get(input.conversationId); - if (controller) { - controller.abort(); - } - }); + } as any); + }); - return { - uploadDocument, - deleteDocument, - listDocuments, - createConversation, - deleteConversation, - listConversations, - getConversation, - sendMessage, - interruptMessage, - } satisfies SmeChatServiceShape; -}); + const interruptMessage: SmeChatServiceShape["interruptMessage"] = (input) => + Effect.gen(function* () { + const streams = yield* Ref.get(activeStreams); + const controller = streams.get(input.conversationId); + if (controller) { + controller.abort(); + } + }); + + return { + uploadDocument, + deleteDocument, + listDocuments, + createConversation, + deleteConversation, + listConversations, + getConversation, + sendMessage, + interruptMessage, + } satisfies SmeChatServiceShape; + }); // ── Helpers ───────────────────────────────────────────────────────────── @@ -414,4 +483,7 @@ function buildSystemPrompt( return parts.join("\n"); } -export const SmeChatServiceLive = Layer.effect(SmeChatService, makeSmeChatService); +export const makeSmeChatServiceLive = (options: SmeChatServiceLiveOptions = {}) => + Layer.effect(SmeChatService, makeSmeChatService(options)); + +export const SmeChatServiceLive = makeSmeChatServiceLive(); diff --git a/apps/web/src/components/sme/SmeChatShell.tsx b/apps/web/src/components/sme/SmeChatShell.tsx index a90aee5b1..677446658 100644 --- a/apps/web/src/components/sme/SmeChatShell.tsx +++ b/apps/web/src/components/sme/SmeChatShell.tsx @@ -24,12 +24,16 @@ export function SmeChatShell({ }: SmeChatShellProps) { const [knowledgePanelOpen, setKnowledgePanelOpen] = useState(false); const activeConversationId = useSmeStore((s) => s.activeConversationId); + const setConversations = useSmeStore((s) => s.setConversations); + const setDocuments = useSmeStore((s) => s.setDocuments); + const setActiveConversationId = useSmeStore((s) => s.setActiveConversationId); + const appendStreamDelta = useSmeStore((s) => s.appendStreamDelta); + const completeStream = useSmeStore((s) => s.completeStream); + const clearStream = useSmeStore((s) => s.clearStream); // Load conversations and documents when project changes useEffect(() => { const api = ensureNativeApi(); - const { setConversations, setDocuments, setActiveConversationId } = - useSmeStore.getState(); void api.sme.listConversations({ projectId: project.id }).then((convs) => { setConversations(convs as any[]); }); @@ -38,7 +42,8 @@ export function SmeChatShell({ }); // Reset active conversation when switching projects setActiveConversationId(null); - }, [project.id]); + clearStream(); + }, [project.id, setConversations, setDocuments, setActiveConversationId, clearStream]); // Load messages when active conversation changes useEffect(() => { @@ -58,13 +63,13 @@ export function SmeChatShell({ const api = ensureNativeApi(); const unsubscribe = api.sme.onMessageEvent((event: SmeMessageEvent) => { if (event.type === "sme.message.delta") { - useSmeStore.getState().appendStreamDelta(event.messageId, event.text); + appendStreamDelta(event.conversationId, event.messageId, event.text); } else if (event.type === "sme.message.complete") { - useSmeStore.getState().completeStream(event.messageId, event.text); + completeStream(event.conversationId, event.messageId, event.text); } }); return unsubscribe; - }, []); + }, [appendStreamDelta, completeStream]); return (
diff --git a/apps/web/src/components/sme/SmeChatWorkspace.tsx b/apps/web/src/components/sme/SmeChatWorkspace.tsx index 01fa7dd2d..d9c57db6c 100644 --- a/apps/web/src/components/sme/SmeChatWorkspace.tsx +++ b/apps/web/src/components/sme/SmeChatWorkspace.tsx @@ -3,6 +3,7 @@ import { BookOpenIcon, SendIcon } from "lucide-react"; import type { SmeConversationId, SmeMessage, SmeMessageId } from "@okcode/contracts"; import { ensureNativeApi } from "~/nativeApi"; import { useSmeStore } from "~/smeStore"; +import { toastManager } from "~/components/ui/toast"; import { SmeMessageBubble } from "./SmeMessageBubble"; @@ -22,9 +23,12 @@ export function SmeChatWorkspace({ const messages = useSmeStore((s) => conversationId ? (s.messagesByConversation[conversationId] ?? EMPTY_MESSAGES) : EMPTY_MESSAGES, ); + const streamingConversationId = useSmeStore((s) => s.streamingConversationId); const streamingMessageId = useSmeStore((s) => s.streamingMessageId); const streamingText = useSmeStore((s) => s.streamingText); const addUserMessage = useSmeStore((s) => s.addUserMessage); + const clearStream = useSmeStore((s) => s.clearStream); + const setMessages = useSmeStore((s) => s.setMessages); const [inputText, setInputText] = useState(""); const [sending, setSending] = useState(false); const messagesEndRef = useRef(null); @@ -41,6 +45,7 @@ export function SmeChatWorkspace({ const text = inputText.trim(); setInputText(""); setSending(true); + const previousMessages = messages; // Optimistically add user message addUserMessage(conversationId, { @@ -56,17 +61,38 @@ export function SmeChatWorkspace({ try { const api = ensureNativeApi(); await api.sme.sendMessage({ conversationId: conversationId as SmeConversationId, text }); - // After the send completes, re-fetch messages to get the final state const result = await api.sme.getConversation({ conversationId: conversationId as SmeConversationId, }); if (result) { - useSmeStore.getState().setMessages(conversationId, result.messages as any[]); + setMessages(conversationId, result.messages as any[]); } + } catch (error) { + clearStream(); + + try { + const api = ensureNativeApi(); + const result = await api.sme.getConversation({ + conversationId: conversationId as SmeConversationId, + }); + if (result) { + setMessages(conversationId, result.messages as any[]); + } else { + setMessages(conversationId, previousMessages); + } + } catch { + setMessages(conversationId, previousMessages); + } + + toastManager.add({ + type: "error", + title: "SME Chat send failed", + description: error instanceof Error ? error.message : "Unknown SME Chat error.", + }); } finally { setSending(false); } - }, [conversationId, inputText, sending, addUserMessage]); + }, [conversationId, inputText, sending, messages, addUserMessage, clearStream, setMessages]); const handleKeyDown = useCallback( (e: React.KeyboardEvent) => { @@ -116,7 +142,7 @@ export function SmeChatWorkspace({ {messages.map((msg) => ( ))} - {streamingText ? ( + {streamingConversationId === conversationId && streamingText ? ( ; + streamingConversationId: string | null; streamingMessageId: string | null; streamingText: string; } @@ -15,8 +16,8 @@ interface SmeActions { setDocuments: (documents: SmeKnowledgeDocument[]) => void; setActiveConversationId: (id: string | null) => void; setMessages: (conversationId: string, messages: SmeMessage[]) => void; - appendStreamDelta: (messageId: string, text: string) => void; - completeStream: (messageId: string, text: string) => void; + appendStreamDelta: (conversationId: string, messageId: string, text: string) => void; + completeStream: (conversationId: string, messageId: string, text: string) => void; clearStream: () => void; addConversation: (conversation: SmeConversation) => void; removeConversation: (conversationId: string) => void; @@ -30,6 +31,7 @@ export const useSmeStore = create((set) => ({ documents: [], activeConversationId: null, messagesByConversation: {}, + streamingConversationId: null, streamingMessageId: null, streamingText: "", @@ -45,22 +47,27 @@ export const useSmeStore = create((set) => ({ }, })), - appendStreamDelta: (messageId, text) => + appendStreamDelta: (conversationId, messageId, text) => set((state) => ({ + streamingConversationId: conversationId, streamingMessageId: messageId, - streamingText: state.streamingText + text, + streamingText: + state.streamingConversationId === conversationId && state.streamingMessageId === messageId + ? state.streamingText + text + : text, })), - completeStream: (messageId, text) => + completeStream: (conversationId, messageId, text) => set((state) => { - const conversationId = Object.keys(state.messagesByConversation).find((cid) => - state.messagesByConversation[cid]?.some( - (m) => m.messageId === messageId || state.streamingMessageId === messageId, - ), - ); - - if (!conversationId) { - return { streamingMessageId: null, streamingText: "" }; + if ( + state.streamingMessageId !== messageId || + state.streamingConversationId !== conversationId + ) { + return { + streamingConversationId: null, + streamingMessageId: null, + streamingText: "", + }; } const messages = state.messagesByConversation[conversationId] ?? []; @@ -83,6 +90,7 @@ export const useSmeStore = create((set) => ({ } return { + streamingConversationId: null, streamingMessageId: null, streamingText: "", messagesByConversation: { @@ -92,7 +100,8 @@ export const useSmeStore = create((set) => ({ }; }), - clearStream: () => set({ streamingMessageId: null, streamingText: "" }), + clearStream: () => + set({ streamingConversationId: null, streamingMessageId: null, streamingText: "" }), addConversation: (conversation) => set((state) => ({