diff --git a/packages/agent/src/server/__tests__/createAgentApp.test.ts b/packages/agent/src/server/__tests__/createAgentApp.test.ts index fd7ab478f..008f741b2 100644 --- a/packages/agent/src/server/__tests__/createAgentApp.test.ts +++ b/packages/agent/src/server/__tests__/createAgentApp.test.ts @@ -85,6 +85,12 @@ test('createAgentApp falls back to BORING_AGENT_TEMPLATE_PATH', async () => { test('createAgentApp can use a custom harness factory for non-pi runtimes', async () => { const workspaceRoot = await makeTempDir('boring-ui-custom-harness-') const reloadSession = vi.fn(async () => true) + const telemetryEvents: Array<{ name: string; properties?: Record }> = [] + const telemetry = { + capture(event: { name: string; properties?: Record }) { + telemetryEvents.push(event) + }, + } const harnessFactory = vi.fn(async (input) => ({ id: 'custom-test-harness', placement: 'server' as const, @@ -109,6 +115,7 @@ test('createAgentApp can use a custom harness factory for non-pi runtimes', asyn mode: 'direct', logger: false, harnessFactory, + telemetry, extraTools: [{ name: 'custom_runtime_tool', description: 'Provided to harness factory.', @@ -119,8 +126,22 @@ test('createAgentApp can use a custom harness factory for non-pi runtimes', asyn try { expect(harnessFactory).toHaveBeenCalledTimes(1) expect(harnessFactory.mock.calls[0]?.[0].cwd).toBe(workspaceRoot) + expect(harnessFactory.mock.calls[0]?.[0].telemetry).toBe(telemetry) expect(harnessFactory.mock.calls[0]?.[0].tools.map((tool: { name: string }) => tool.name)).toContain('custom_runtime_tool') + const chatRes = await app.inject({ + method: 'POST', + url: '/api/v1/agent/chat', + payload: { sessionId: 'custom', message: 'secret prompt must not be captured' }, + }) + expect(chatRes.statusCode).toBe(200) + expect(telemetryEvents.map((event) => event.name)).toEqual([ + 'agent.chat.started', + 'agent.chat.message.submitted', + 'agent.chat.completed', + ]) + expect(JSON.stringify(telemetryEvents)).not.toContain('secret prompt') + const res = await app.inject({ method: 'POST', url: '/api/v1/agent/reload', payload: { sessionId: 'custom' } }) expect(res.statusCode).toBe(200) expect(res.json()).toEqual({ ok: true, sessionId: 'custom', reloaded: true }) diff --git a/packages/agent/src/server/createAgentApp.ts b/packages/agent/src/server/createAgentApp.ts index 243ed312b..4183a8ec2 100644 --- a/packages/agent/src/server/createAgentApp.ts +++ b/packages/agent/src/server/createAgentApp.ts @@ -2,6 +2,7 @@ import Fastify, { type FastifyInstance } from 'fastify' import type { AgentTool } from '../shared/tool' import type { AgentHarnessFactory } from '../shared/harness' import type { SessionStore } from '../shared/session' +import type { TelemetrySink } from '../shared/telemetry' import { getEnv } from './config/env' import type { RuntimeModeAdapter, RuntimeModeId } from './runtime/mode' import { resolveMode, autoDetectMode } from './runtime/resolveMode' @@ -57,6 +58,8 @@ export interface CreateAgentAppOptions { pi?: PiHarnessOptions /** Optional stable namespace for file-backed session storage. */ sessionNamespace?: string + /** Optional best-effort telemetry sink supplied by an embedding host. */ + telemetry?: TelemetrySink /** Optional explicit file-backed session directory. Mostly for tests/hosts. */ sessionDir?: string /** @@ -134,6 +137,7 @@ export async function createAgentApp( sessionDir: opts.sessionDir, systemPromptAppend: opts.systemPromptAppend, systemPromptDynamic: opts.systemPromptDynamic, + telemetry: opts.telemetry, }) const sessionChangesTracker = new InMemorySessionChangesTracker() @@ -171,6 +175,7 @@ export async function createAgentApp( harness, workdir: runtimeBundle.workspace.root, sessionChangesTracker, + telemetry: opts.telemetry, }) await app.register(sessionRoutes, { sessionStore: harness.sessions as unknown as SessionStore, diff --git a/packages/agent/src/server/harness/pi-coding-agent/__tests__/tool-adapter.telemetry.test.ts b/packages/agent/src/server/harness/pi-coding-agent/__tests__/tool-adapter.telemetry.test.ts new file mode 100644 index 000000000..953805071 --- /dev/null +++ b/packages/agent/src/server/harness/pi-coding-agent/__tests__/tool-adapter.telemetry.test.ts @@ -0,0 +1,131 @@ +import { describe, expect, it, vi } from 'vitest' + +import { ErrorCode } from '../../../../shared/error-codes' +import type { AgentTool } from '../../../../shared/tool' +import type { TelemetryEvent, TelemetrySink } from '../../../../shared/telemetry' +import { adaptToolForPi } from '../tool-adapter' + +function createTelemetryRecorder(): { telemetry: TelemetrySink; events: TelemetryEvent[] } { + const events: TelemetryEvent[] = [] + return { + events, + telemetry: { + capture(event) { + events.push(event) + }, + }, + } +} + +function createTool(overrides: Partial = {}): AgentTool { + return { + name: 'bash', + description: 'test tool', + parameters: {}, + async execute() { + return { content: [{ type: 'text', text: 'ok output' }] } + }, + ...overrides, + } +} + +async function executeAdapted(tool: AgentTool, telemetry: TelemetrySink) { + const adapted = adaptToolForPi(tool, 'sess-tool', telemetry) + return await adapted.execute( + 'tool-call-1', + { command: 'cat .env', path: '/tmp/private-path' }, + new AbortController().signal, + undefined, + {} as never, + ) +} + +describe('tool adapter telemetry', () => { + it('emits safe agent.tool.completed telemetry without args or output', async () => { + const recorder = createTelemetryRecorder() + + await executeAdapted(createTool(), recorder.telemetry) + + expect(recorder.events).toHaveLength(1) + expect(recorder.events[0]).toEqual({ + name: 'agent.tool.completed', + properties: { + toolName: 'bash', + sessionId: 'sess-tool', + status: 'ok', + durationMs: expect.any(Number), + }, + }) + const serialized = JSON.stringify(recorder.events) + expect(serialized).not.toContain('cat .env') + expect(serialized).not.toContain('private-path') + expect(serialized).not.toContain('ok output') + }) + + it('emits safe agent.tool.failed telemetry for tool error results', async () => { + const recorder = createTelemetryRecorder() + const tool = createTool({ + async execute() { + return { + isError: true, + content: [{ type: 'text', text: 'secret stderr output' }], + details: { code: ErrorCode.enum.TOOL_EXECUTION_ERROR, command: 'cat .env' }, + } + }, + }) + + await expect(executeAdapted(tool, recorder.telemetry)).rejects.toThrow('secret stderr output') + + expect(recorder.events).toHaveLength(1) + expect(recorder.events[0]).toEqual({ + name: 'agent.tool.failed', + properties: { + toolName: 'bash', + sessionId: 'sess-tool', + status: 'error', + durationMs: expect.any(Number), + errorCode: ErrorCode.enum.TOOL_EXECUTION_ERROR, + }, + }) + const serialized = JSON.stringify(recorder.events) + expect(serialized).not.toContain('secret stderr output') + expect(serialized).not.toContain('cat .env') + }) + + it('emits safe agent.tool.failed telemetry for thrown tool errors', async () => { + const recorder = createTelemetryRecorder() + const tool = createTool({ + async execute() { + throw new Error('raw stack /tmp/private-path secret') + }, + }) + + await expect(executeAdapted(tool, recorder.telemetry)).rejects.toThrow('raw stack') + + expect(recorder.events).toHaveLength(1) + expect(recorder.events[0]).toEqual({ + name: 'agent.tool.failed', + properties: { + toolName: 'bash', + sessionId: 'sess-tool', + status: 'error', + durationMs: expect.any(Number), + errorCode: ErrorCode.enum.TOOL_EXECUTION_ERROR, + }, + }) + expect(JSON.stringify(recorder.events)).not.toContain('private-path') + }) + + it('telemetry sink failures do not change tool behavior', async () => { + const result = await executeAdapted(createTool(), { + capture() { + throw new Error('telemetry down') + }, + }) + + expect(result).toEqual({ + content: [{ type: 'text', text: 'ok output' }], + details: undefined, + }) + }) +}) diff --git a/packages/agent/src/server/harness/pi-coding-agent/createHarness.ts b/packages/agent/src/server/harness/pi-coding-agent/createHarness.ts index 06f2e01f9..64a0951b5 100644 --- a/packages/agent/src/server/harness/pi-coding-agent/createHarness.ts +++ b/packages/agent/src/server/harness/pi-coding-agent/createHarness.ts @@ -18,6 +18,7 @@ import { import type { AgentHarness, SendMessageInput, RunContext, MessageAttachment, FollowUpOptions } from "../../../shared/harness.js"; import { createLogger } from "../../logging.js"; import type { AgentTool } from "../../../shared/tool.js"; +import type { TelemetrySink } from "../../../shared/telemetry.js"; import type { UIMessageChunk } from "../../../shared/message.js"; import { adaptToolsForPi } from "./tool-adapter.js"; import { piEventToChunks } from "./stream-adapter.js"; @@ -327,6 +328,8 @@ export function createPiCodingAgentHarness(opts: { sessionNamespace?: string; /** Optional explicit file-backed session directory. Mostly for tests/hosts. */ sessionDir?: string; + /** Optional best-effort telemetry sink supplied by an embedding host. */ + telemetry?: TelemetrySink; }): AgentHarness { const sessionStore = new PiSessionStore(opts.cwd, { sessionNamespace: opts.sessionNamespace, @@ -473,7 +476,7 @@ export function createPiCodingAgentHarness(opts: { const { session: piSession } = await createAgentSession({ cwd: ctx.workdir, tools: [], - customTools: adaptToolsForPi(opts.tools, input.sessionId), + customTools: adaptToolsForPi(opts.tools, input.sessionId, opts.telemetry), model, thinkingLevel: input.thinkingLevel ?? "off", sessionManager, diff --git a/packages/agent/src/server/harness/pi-coding-agent/tool-adapter.ts b/packages/agent/src/server/harness/pi-coding-agent/tool-adapter.ts index 67838ac26..722641e9e 100644 --- a/packages/agent/src/server/harness/pi-coding-agent/tool-adapter.ts +++ b/packages/agent/src/server/harness/pi-coding-agent/tool-adapter.ts @@ -1,7 +1,31 @@ import type { ToolDefinition } from "@mariozechner/pi-coding-agent"; -import type { AgentTool } from "../../../shared/tool.js"; +import type { AgentTool, ToolResult } from "../../../shared/tool.js"; +import { noopTelemetry, safeCapture, type TelemetrySink } from "../../../shared/telemetry.js"; +import { ErrorCode } from "../../../shared/error-codes.js"; -export function adaptToolForPi(tool: AgentTool, sessionId?: string): ToolDefinition { +function toolTelemetryProperties( + toolName: string, + sessionId: string | undefined, + status: 'ok' | 'error', + startedAt: number, + result?: ToolResult, +): Record { + const properties: Record = { + toolName, + status, + durationMs: Date.now() - startedAt, + } + if (sessionId) properties.sessionId = sessionId + const errorCode = (result?.details as { code?: unknown } | undefined)?.code + if (status === 'error') { + properties.errorCode = ErrorCode.safeParse(errorCode).success + ? (errorCode as string) + : ErrorCode.enum.TOOL_EXECUTION_ERROR + } + return properties +} + +export function adaptToolForPi(tool: AgentTool, sessionId?: string, telemetry: TelemetrySink = noopTelemetry): ToolDefinition { return { name: tool.name, label: tool.name, @@ -9,25 +33,52 @@ export function adaptToolForPi(tool: AgentTool, sessionId?: string): ToolDefinit parameters: tool.parameters as any, promptSnippet: tool.promptSnippet ?? tool.description, async execute(toolCallId, params, signal, onUpdate, _ctx) { - const result = await tool.execute(params as Record, { - toolCallId, - abortSignal: signal ?? new AbortController().signal, - onUpdate: onUpdate - ? (partial) => onUpdate({ content: [{ type: "text", text: partial }], details: undefined }) - : undefined, - sessionId, - }); - if (result.isError) { - throw new Error(result.content.map((c) => c.text).join("\n")); + const startedAt = Date.now(); + let emittedFailure = false; + try { + const result = await tool.execute(params as Record, { + toolCallId, + abortSignal: signal ?? new AbortController().signal, + onUpdate: onUpdate + ? (partial) => onUpdate({ content: [{ type: "text", text: partial }], details: undefined }) + : undefined, + sessionId, + }); + safeCapture(telemetry, { + name: result.isError ? 'agent.tool.failed' : 'agent.tool.completed', + properties: toolTelemetryProperties( + tool.name, + sessionId, + result.isError ? 'error' : 'ok', + startedAt, + result, + ), + }); + if (result.isError) { + emittedFailure = true; + throw new Error(result.content.map((c) => c.text).join("\n")); + } + return { + content: result.content, + details: result.details, + }; + } catch (error) { + if (!emittedFailure) { + safeCapture(telemetry, { + name: 'agent.tool.failed', + properties: toolTelemetryProperties(tool.name, sessionId, 'error', startedAt), + }); + } + throw error; } - return { - content: result.content, - details: result.details, - }; }, } as ToolDefinition; } -export function adaptToolsForPi(tools: AgentTool[], sessionId?: string): ToolDefinition[] { - return tools.map((tool) => adaptToolForPi(tool, sessionId)); +export function adaptToolsForPi( + tools: AgentTool[], + sessionId?: string, + telemetry?: TelemetrySink, +): ToolDefinition[] { + return tools.map((tool) => adaptToolForPi(tool, sessionId, telemetry)); } diff --git a/packages/agent/src/server/http/routes/__tests__/chat.test.ts b/packages/agent/src/server/http/routes/__tests__/chat.test.ts index 42d3f733e..7d22590a5 100644 --- a/packages/agent/src/server/http/routes/__tests__/chat.test.ts +++ b/packages/agent/src/server/http/routes/__tests__/chat.test.ts @@ -4,6 +4,8 @@ import { chatRoutes, type ChatRouteOptions } from '../chat' import type { AgentHarness, SendMessageInput, RunContext } from '../../../../shared/harness' import type { UIMessageChunk } from '../../../../shared/message' import type { SessionStore } from '../../../../shared/session' +import { ErrorCode } from '../../../../shared/error-codes' +import type { TelemetrySink, TelemetryEvent } from '../../../../shared/telemetry' import { ERROR_CODE_VALIDATION_ERROR, ERROR_CODE_INTERNAL, @@ -45,10 +47,23 @@ function buildApp(overrides: Partial = {}) { app.register(chatRoutes, { harness: overrides.harness ?? createMockHarness(), workdir: overrides.workdir ?? '/tmp/test', + telemetry: overrides.telemetry, }) return app.ready().then(() => app) } +function createTelemetryRecorder(): { telemetry: TelemetrySink; events: TelemetryEvent[] } { + const events: TelemetryEvent[] = [] + return { + events, + telemetry: { + capture(event) { + events.push(event) + }, + }, + } +} + const validBody = { sessionId: 'sess-1', message: 'hello', @@ -88,6 +103,95 @@ describe('POST /api/v1/agent/chat', () => { await app.close() }) + test('emits safe chat telemetry for submitted and completed turns', async () => { + const recorder = createTelemetryRecorder() + const app = await buildApp({ telemetry: recorder.telemetry }) + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/agent/chat', + payload: { + sessionId: 'sess-telemetry', + message: 'secret user prompt must not be captured', + model: { provider: 'anthropic', id: 'claude-secret' }, + }, + }) + + expect(res.statusCode).toBe(200) + expect(recorder.events.map((event) => event.name)).toEqual([ + 'agent.chat.started', + 'agent.chat.message.submitted', + 'agent.chat.completed', + ]) + expect(recorder.events.at(-1)?.properties).toEqual(expect.objectContaining({ + sessionId: 'sess-telemetry', + requestId: expect.any(String), + modelProvider: 'anthropic', + status: 'ok', + durationMs: expect.any(Number), + })) + const serialized = JSON.stringify(recorder.events) + expect(serialized).not.toContain('secret user prompt') + expect(serialized).not.toContain('claude-secret') + + await app.close() + }) + + test('emits safe chat failure telemetry without changing the response', async () => { + const recorder = createTelemetryRecorder() + const rawError = new Error('raw failure with /tmp/private-path and secret') as Error & { code?: string } + rawError.code = 'SECRET_TOKEN' + const harness = createMockHarness([], { + throwOnSend: rawError, + }) + const app = await buildApp({ harness, telemetry: recorder.telemetry }) + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/agent/chat', + payload: { sessionId: 'sess-failed', message: 'secret prompt' }, + }) + + expect(res.statusCode).toBe(500) + expect(recorder.events.map((event) => event.name)).toEqual([ + 'agent.chat.started', + 'agent.chat.message.submitted', + 'agent.chat.failed', + ]) + expect(recorder.events.at(-1)?.properties).toEqual(expect.objectContaining({ + sessionId: 'sess-failed', + status: 'error', + durationMs: expect.any(Number), + errorCode: ErrorCode.enum.INTERNAL_ERROR, + })) + const serialized = JSON.stringify(recorder.events) + expect(serialized).not.toContain('secret prompt') + expect(serialized).not.toContain('private-path') + + await app.close() + }) + + test('telemetry sink failures do not break chat streaming', async () => { + const app = await buildApp({ + telemetry: { + capture() { + throw new Error('telemetry down') + }, + }, + }) + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/agent/chat', + payload: validBody, + }) + + expect(res.statusCode).toBe(200) + expect(res.body).toContain('[DONE]') + + await app.close() + }) + test('accepts optional model and thinkingLevel', async () => { const sendMessage = vi.fn(function* () { yield { type: 'start' } @@ -254,18 +358,19 @@ describe('POST /api/v1/agent/chat', () => { await app.close() }) - test('emits SSE error chunk when async iterator throws mid-stream', async () => { + test('emits SSE error chunk and failed telemetry when async iterator throws mid-stream', async () => { + const recorder = createTelemetryRecorder() const harness = createMockHarness() harness.sendMessage = function () { return (async function* () { yield { type: 'start' } yield { type: 'text-start', id: '0' } yield { type: 'text-delta', id: '0', delta: 'before' } - throw new Error('mid-stream kaboom') + throw new Error('mid-stream kaboom with /tmp/private-path secret') })() } - const app = await buildApp({ harness }) + const app = await buildApp({ harness, telemetry: recorder.telemetry }) const res = await app.inject({ method: 'POST', @@ -275,6 +380,18 @@ describe('POST /api/v1/agent/chat', () => { expect(res.statusCode).toBe(200) expect(res.body).toContain('error') + expect(recorder.events.map((event) => event.name)).toEqual([ + 'agent.chat.started', + 'agent.chat.message.submitted', + 'agent.chat.failed', + ]) + expect(recorder.events.at(-1)?.properties).toEqual(expect.objectContaining({ + sessionId: 'sess-1', + status: 'error', + errorCode: ErrorCode.enum.INTERNAL_ERROR, + durationMs: expect.any(Number), + })) + expect(JSON.stringify(recorder.events)).not.toContain('private-path') await app.close() }) diff --git a/packages/agent/src/server/http/routes/chat.ts b/packages/agent/src/server/http/routes/chat.ts index 8b9838e2d..6d00b74ef 100644 --- a/packages/agent/src/server/http/routes/chat.ts +++ b/packages/agent/src/server/http/routes/chat.ts @@ -6,6 +6,8 @@ import type { UIMessageChunk } from '../sse' import type { AgentHarness, RunContext } from '../../../shared/harness' import type { SessionCtx } from '../../../shared/session' import type { UIMessage } from '../../../shared/message' +import { ErrorCode } from '../../../shared/error-codes' +import { noopTelemetry, safeCapture, type TelemetrySink } from '../../../shared/telemetry' import { createBodyValidator, ERROR_CODE_INTERNAL, @@ -60,6 +62,21 @@ export interface ChatRouteOptions { workdir: string }> sessionChangesTracker?: SessionChangesTracker + telemetry?: TelemetrySink +} + +function addTelemetryProperty( + properties: Record, + key: string, + value: unknown, +): void { + if (typeof value === 'string' && value) properties[key] = value + if (typeof value === 'number' && Number.isFinite(value)) properties[key] = value +} + +function safeTelemetryErrorCode(value: unknown): string { + const parsed = ErrorCode.safeParse(value) + return parsed.success ? parsed.data : ErrorCode.enum.INTERNAL_ERROR } export function chatRoutes( @@ -68,6 +85,7 @@ export function chatRoutes( done: (err?: Error) => void, ): void { const { sessionChangesTracker } = opts + const telemetry = opts.telemetry ?? noopTelemetry const validateBody = createBodyValidator(chatBodySchema) const buffers = new StreamBufferStore() // Track last follow-up seq/nonce per session for dedupe detection. @@ -121,9 +139,19 @@ export function chatRoutes( const { sessionId, message, model, thinkingLevel, attachments } = request.body as ChatBody const turnId = randomUUID() + const startedAt = Date.now() + const telemetryProperties: Record = { + sessionId, + requestId: request.id, + } + addTelemetryProperty(telemetryProperties, 'workspaceId', request.workspaceContext?.workspaceId) + addTelemetryProperty(telemetryProperties, 'modelProvider', model?.provider) request.log.info({ sessionId, turnId, model, thinkingLevel }, '[chat] start') - const runtime = await resolveRuntime(request) + safeCapture(telemetry, { + name: 'agent.chat.started', + properties: telemetryProperties, + }) const abortController = new AbortController() let streamStarted = false @@ -137,14 +165,18 @@ export function chatRoutes( } }) - const ctx: RunContext = { - abortSignal: abortController.signal, - workdir: runtime.workdir, - } - const buf = buffers.create(sessionId, turnId) try { + const runtime = await resolveRuntime(request) + const ctx: RunContext = { + abortSignal: abortController.signal, + workdir: runtime.workdir, + } + safeCapture(telemetry, { + name: 'agent.chat.message.submitted', + properties: telemetryProperties, + }) const chunks = runtime.harness.sendMessage( { sessionId, message, model, thinkingLevel, attachments }, ctx, @@ -152,6 +184,7 @@ export function chatRoutes( const stream = createUIMessageStream({ async execute({ writer }: { writer: { write(chunk: UIMessageChunk): void } }) { + let streamFailed = false try { for await (const chunk of chunks) { const c = chunk as UIMessageChunk @@ -163,7 +196,17 @@ export function chatRoutes( writer.write(c) } } catch (err) { + streamFailed = true request.log.error({ err, sessionId }, '[chat] stream error') + safeCapture(telemetry, { + name: 'agent.chat.failed', + properties: { + ...telemetryProperties, + status: 'error', + durationMs: Date.now() - startedAt, + errorCode: ErrorCode.enum.INTERNAL_ERROR, + }, + }) const errChunk = { type: 'error', errorText: 'internal error', @@ -176,6 +219,16 @@ export function chatRoutes( // and may arrive before markComplete's callback) sees the flag // and does NOT abort an already-finished stream. streamCompleted = true + if (!streamFailed) { + safeCapture(telemetry, { + name: 'agent.chat.completed', + properties: { + ...telemetryProperties, + status: 'ok', + durationMs: Date.now() - startedAt, + }, + }) + } buf.markComplete(() => buffers.evict(sessionId, turnId)) } }, @@ -199,6 +252,15 @@ export function chatRoutes( return } catch (err) { request.log.error({ err, sessionId }, '[chat] error') + safeCapture(telemetry, { + name: 'agent.chat.failed', + properties: { + ...telemetryProperties, + status: 'error', + durationMs: Date.now() - startedAt, + errorCode: safeTelemetryErrorCode((err as { code?: unknown })?.code), + }, + }) buf.markComplete(() => buffers.evict(sessionId, turnId)) if (streamStarted) return return reply.code(500).send({ diff --git a/packages/agent/src/server/registerAgentRoutes.ts b/packages/agent/src/server/registerAgentRoutes.ts index 563cb9732..f0e9addca 100644 --- a/packages/agent/src/server/registerAgentRoutes.ts +++ b/packages/agent/src/server/registerAgentRoutes.ts @@ -4,6 +4,7 @@ import type { AgentTool } from '../shared/tool' import type { AgentHarnessFactory } from '../shared/harness' import type { SessionStore } from '../shared/session' import type { SandboxHandleStore } from '../shared/sandbox-handle-store' +import type { TelemetrySink } from '../shared/telemetry' import { AuthStorage, ModelRegistry } from '@mariozechner/pi-coding-agent' import { getEnv } from './config/env' import type { RuntimeBundle, RuntimeModeAdapter, RuntimeModeId } from './runtime/mode' @@ -175,6 +176,8 @@ export interface RegisterAgentRoutesOptions { request?: FastifyRequest }) => PiHarnessOptions | undefined | Promise sessionNamespace?: string + /** Optional best-effort telemetry sink supplied by an embedding host. */ + telemetry?: TelemetrySink getSessionNamespace?: (ctx: { workspaceId: string workspaceRoot: string @@ -333,6 +336,7 @@ export const registerAgentRoutes: FastifyPluginAsync cwd: root, sessionNamespace: scope.sessionNamespace, systemPromptAppend: opts.systemPromptAppend, + telemetry: opts.telemetry, }) const readyTracker = new ReadyStatusTracker({ sandboxReady: resolvedMode !== 'vercel-sandbox', @@ -543,6 +547,7 @@ export const registerAgentRoutes: FastifyPluginAsync } }, sessionChangesTracker, + telemetry: opts.telemetry, }) await app.register(sessionRoutes, { getSessionStore: async (request) => { diff --git a/packages/agent/src/shared/__tests__/telemetry.test.ts b/packages/agent/src/shared/__tests__/telemetry.test.ts new file mode 100644 index 000000000..fdb56eb5a --- /dev/null +++ b/packages/agent/src/shared/__tests__/telemetry.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, expectTypeOf, it, vi } from 'vitest' + +import { + noopTelemetry, + safeCapture, + type TelemetryEvent, + type TelemetrySink, +} from '../telemetry' + +describe('telemetry contract', () => { + const event: TelemetryEvent = { + name: 'telemetry.test', + distinctId: 'user-1', + properties: { status: 'ok' }, + } + + it('noop capture accepts events without side effects', () => { + expect(() => noopTelemetry.capture(event)).not.toThrow() + }) + + it('safeCapture forwards normal captures', () => { + const capture = vi.fn() + const telemetry: TelemetrySink = { capture } + + safeCapture(telemetry, event) + + expect(capture).toHaveBeenCalledWith(event) + }) + + it('safeCapture swallows synchronous capture failures', () => { + const telemetry: TelemetrySink = { + capture() { + throw new Error('sync telemetry failure') + }, + } + + expect(() => safeCapture(telemetry, event)).not.toThrow() + }) + + it('safeCapture swallows asynchronous capture rejections', async () => { + const telemetry: TelemetrySink = { + capture: vi.fn().mockRejectedValue(new Error('async telemetry failure')), + } + + expect(() => safeCapture(telemetry, event)).not.toThrow() + await Promise.resolve() + }) + + it('allows sinks to expose an optional flush hook', async () => { + const flush = vi.fn().mockResolvedValue(undefined) + const telemetry: TelemetrySink = { + capture() {}, + flush, + } + + await telemetry.flush?.() + + expect(flush).toHaveBeenCalledOnce() + }) + + it('keeps the TelemetrySink shape structural', () => { + expectTypeOf().toEqualTypeOf<{ + capture: (event: TelemetryEvent) => void | Promise + flush?: () => void | Promise + }>() + }) +}) diff --git a/packages/agent/src/shared/harness.ts b/packages/agent/src/shared/harness.ts index 991fd1f36..b92018e0d 100644 --- a/packages/agent/src/shared/harness.ts +++ b/packages/agent/src/shared/harness.ts @@ -1,5 +1,6 @@ import type { UIMessageChunk } from './message' import type { SessionStore } from './session' +import type { TelemetrySink } from './telemetry' import type { AgentTool } from './tool' export interface AgentHarnessFactoryInput { @@ -15,6 +16,8 @@ export interface AgentHarnessFactoryInput { * prompt context without a workspace-injected harness extension. */ systemPromptDynamic?: () => string | undefined | Promise + /** Host-provided telemetry sink. Optional and best-effort; harnesses may ignore it. */ + telemetry?: TelemetrySink } export type AgentHarnessFactory = (input: AgentHarnessFactoryInput) => AgentHarness | Promise diff --git a/packages/agent/src/shared/index.ts b/packages/agent/src/shared/index.ts index e92034bbf..e627a8287 100644 --- a/packages/agent/src/shared/index.ts +++ b/packages/agent/src/shared/index.ts @@ -22,6 +22,8 @@ export type { } from './session' export type { UIMessage, UIMessageChunk } from './message' export type { FileSearch } from './file-search' +export type { TelemetryEvent, TelemetrySink } from './telemetry' +export { noopTelemetry, safeCapture } from './telemetry' export type { SandboxHandleRecord, SandboxHandleStore, diff --git a/packages/agent/src/shared/telemetry.ts b/packages/agent/src/shared/telemetry.ts new file mode 100644 index 000000000..d7579ec52 --- /dev/null +++ b/packages/agent/src/shared/telemetry.ts @@ -0,0 +1,20 @@ +export interface TelemetrySink { + capture(event: TelemetryEvent): void | Promise + flush?(): void | Promise +} + +export interface TelemetryEvent { + name: string + distinctId?: string + properties?: Record +} + +export const noopTelemetry: TelemetrySink = { + capture() {}, +} + +export function safeCapture(telemetry: TelemetrySink, event: TelemetryEvent): void { + try { + void Promise.resolve(telemetry.capture(event)).catch(() => {}) + } catch {} +} diff --git a/packages/core/docs/CORE.md b/packages/core/docs/CORE.md index 959a73aea..79b9b3fb9 100644 --- a/packages/core/docs/CORE.md +++ b/packages/core/docs/CORE.md @@ -326,6 +326,65 @@ invite_ttl_days = 7 | `BODY_LIMIT_BYTES` | no | Override Fastify body limit (default `16777216` / 16MB). | | `SESSION_TTL_SECONDS` | no | Session cookie max-age (default `60*60*24*30` / 30 days, matches v1). | | `SESSION_COOKIE_SECURE` | no | Force `Secure` cookie flag (default `true` when `BETTER_AUTH_URL` is https). | +| `BORING_TELEMETRY_ENABLED` | no | Explicit opt-in for core DB telemetry. Must be `true`; unset/`false` means no-op telemetry. | + +### Core DB telemetry (v1) + +Core-composed apps can store safe telemetry events in the core database with one env var. Core owns the DB-backed sink and resolves telemetry inside `createCoreWorkspaceAgentServer`; child apps do not need vendor setup code. + +Enable telemetry: + +```bash +BORING_TELEMETRY_ENABLED=true +``` + +Disable telemetry: + +```bash +# Either omit BORING_TELEMETRY_ENABLED, or set it explicitly: +BORING_TELEMETRY_ENABLED=false +``` + +When enabled, sanitized rows are inserted into the `telemetry_events` table. `app_id` uses `CoreConfig.appId`, so multiple apps sharing one database can still be filtered without event-name prefixes. + +Implemented v1 events: + +| Area | Events | +|---|---| +| Core | `app.opened`, `server.request.failed` | +| Agent chat | `agent.chat.started`, `agent.chat.message.submitted`, `agent.chat.completed`, `agent.chat.failed` | +| Agent tools | `agent.tool.completed`, `agent.tool.failed` | + +Allowed event properties are intentionally low-cardinality operational metadata only: + +- `workspaceId` +- `sessionId` +- `requestId` +- `runtimeMode` +- `modelProvider` +- `toolName` +- `panelId` +- `commandId` +- `status` +- `durationMs` +- `errorCode` +- `packageName` +- `packageVersion` + +Privacy exclusions: v1 does not capture prompts, assistant output, file contents, command strings, stdout/stderr, raw file paths, raw errors, stack traces, headers, cookies, tokens, full env dumps, or secret values. Core sanitizes properties before inserting into the database, and telemetry failures are best-effort only: they do not break app, chat, or tool flows. + +Package boundary: agent/workspace remain storage-free. Core passes a tiny structural telemetry sink into composed agent code; `@hachej/boring-agent` standalone remains no-op unless an embedder explicitly provides a sink. + +Deferred scope: browser-originated telemetry, workspace frontend events (`workspace.opened`, `workspace.panel.opened`, `workspace.command.executed`), auth hook telemetry, plugin-error telemetry, UI-command telemetry, tool-started events, external SaaS adapters, OpenTelemetry/OTLP, routing/multi-account selection, retention/cleanup policy, and mandatory lifecycle flush wiring are not shipped in v1. + +Focused validation commands: + +```bash +pnpm --filter @hachej/boring-core test src/shared/__tests__/telemetry.test.ts src/server/telemetry/__tests__ src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry.test.ts src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry-smoke.test.ts +pnpm --filter @hachej/boring-agent test src/shared/__tests__/telemetry.test.ts src/server/http/routes/__tests__/chat.test.ts src/server/harness/pi-coding-agent/__tests__/tool-adapter.telemetry.test.ts src/server/__tests__/createAgentApp.test.ts +pnpm --filter @hachej/boring-workspace test src/shared/__tests__/telemetry.test.ts +pnpm lint:invariants +``` ### `CoreConfig` type diff --git a/packages/core/docs/plans/app-level-telemetry-plan.md b/packages/core/docs/plans/app-level-telemetry-plan.md new file mode 100644 index 000000000..7cade19dd --- /dev/null +++ b/packages/core/docs/plans/app-level-telemetry-plan.md @@ -0,0 +1,107 @@ +# Core DB Telemetry Plan for boring-ui-v2 + +**Decision:** v1 telemetry stores sanitized server-side events in the core database. No PostHog, no browser endpoint, no external telemetry vendor. + +## Common path + +Child apps that use `createCoreWorkspaceAgentServer()` enable telemetry with one env var: + +```bash +BORING_TELEMETRY_ENABLED=true +``` + +If the env var is omitted or set to anything else, telemetry is a no-op. Core uses `CoreConfig.appId` as the `app_id` column, so several apps sharing one database can still filter their own events. + +## Boundaries + +- `@hachej/boring-core` owns persistence and sanitization. +- `@hachej/boring-agent` and `@hachej/boring-workspace` stay DB/vendor-free. +- Agent/workspace receive only a tiny structural `TelemetrySink` with `capture()` and optional `flush()`. +- Standalone agent remains no-op unless an embedder injects a sink. + +## Data model + +Table: `telemetry_events` + +| Column | Purpose | +|---|---| +| `id` | row id | +| `app_id` | `CoreConfig.appId` | +| `event_name` | sanitized event name, e.g. `agent.chat.started` | +| `distinct_id` | sanitized user id or `anonymous` | +| `properties` | sanitized low-cardinality metadata JSON | +| `created_at` | insert timestamp | + +Indexes: + +- `(app_id, created_at)` for app-scoped time windows +- `event_name` for event filtering + +## V1 events + +| Area | Events | +|---|---| +| Core | `app.opened`, `server.request.failed` | +| Agent chat | `agent.chat.started`, `agent.chat.message.submitted`, `agent.chat.completed`, `agent.chat.failed` | +| Agent tools | `agent.tool.completed`, `agent.tool.failed` | + +Workspace browser/frontend events are deferred. + +## Allowed properties + +Only these low-cardinality properties are stored: + +- `workspaceId` +- `sessionId` +- `requestId` +- `runtimeMode` +- `modelProvider` +- `toolName` +- `panelId` +- `commandId` +- `status` +- `durationMs` +- `errorCode` +- `packageName` +- `packageVersion` + +Unknown keys are dropped. + +## Privacy exclusions + +Never capture: + +- prompts +- assistant output +- file contents +- command strings +- stdout/stderr +- raw file paths +- raw errors or stack traces +- headers/cookies/tokens +- env dumps +- secrets + +The DB sink sanitizes event names, distinct ids, and properties centrally before insertion. Insert failures are swallowed so telemetry never changes product behavior. + +## Deferred + +- Browser-originated telemetry +- Workspace frontend events +- Auth hook telemetry +- Plugin-error telemetry +- UI-command telemetry +- Tool-started events +- External SaaS adapters +- OpenTelemetry/OTLP +- Retention/cleanup policy +- Lifecycle flush wiring + +## Validation + +```bash +pnpm --filter @hachej/boring-core test src/shared/__tests__/telemetry.test.ts src/server/telemetry/__tests__ src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry.test.ts src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry-smoke.test.ts +pnpm --filter @hachej/boring-agent test src/shared/__tests__/telemetry.test.ts src/server/http/routes/__tests__/chat.test.ts src/server/harness/pi-coding-agent/__tests__/tool-adapter.telemetry.test.ts src/server/__tests__/createAgentApp.test.ts +pnpm --filter @hachej/boring-workspace test src/shared/__tests__/telemetry.test.ts +pnpm lint:invariants +``` diff --git a/packages/core/drizzle/0010_telemetry_events.sql b/packages/core/drizzle/0010_telemetry_events.sql new file mode 100644 index 000000000..00526806f --- /dev/null +++ b/packages/core/drizzle/0010_telemetry_events.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS "telemetry_events" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "app_id" text NOT NULL, + "event_name" text NOT NULL, + "distinct_id" text DEFAULT 'anonymous' NOT NULL, + "properties" jsonb DEFAULT '{}'::jsonb NOT NULL, + "created_at" timestamp DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "telemetry_events_app_created_at_idx" ON "telemetry_events" USING btree ("app_id", "created_at"); +--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "telemetry_events_event_name_idx" ON "telemetry_events" USING btree ("event_name"); diff --git a/packages/core/drizzle/meta/_journal.json b/packages/core/drizzle/meta/_journal.json index fa04be658..25570fccd 100644 --- a/packages/core/drizzle/meta/_journal.json +++ b/packages/core/drizzle/meta/_journal.json @@ -71,6 +71,13 @@ "when": 1777536000000, "tag": "0009_workspace_runtime_resources", "breakpoints": true + }, + { + "idx": 10, + "version": "7", + "when": 1779537600000, + "tag": "0010_telemetry_events", + "breakpoints": true } ] } diff --git a/packages/core/package.json b/packages/core/package.json index 8a4594d2b..66a853e54 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -66,12 +66,12 @@ "drizzle:migrate": "drizzle-kit migrate --config drizzle.config.ts" }, "dependencies": { - "@hachej/boring-agent": "workspace:*", - "@hachej/boring-ui-kit": "workspace:*", - "@hachej/boring-workspace": "workspace:*", "@fastify/cors": "^11.0.0", "@fastify/helmet": "^13.0.1", "@fastify/rate-limit": "^10.3.0", + "@hachej/boring-agent": "workspace:*", + "@hachej/boring-ui-kit": "workspace:*", + "@hachej/boring-workspace": "workspace:*", "@hookform/resolvers": "^5.2.2", "@react-email/components": "^1.0.12", "@react-email/render": "^2.0.7", diff --git a/packages/core/src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry-smoke.test.ts b/packages/core/src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry-smoke.test.ts new file mode 100644 index 000000000..2e01f642e --- /dev/null +++ b/packages/core/src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry-smoke.test.ts @@ -0,0 +1,357 @@ +import { mkdir, mkdtemp, writeFile } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import Fastify from 'fastify' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +import type { CoreConfig } from '../../../shared/types.js' + +type TelemetryRow = { + appId: string + eventName: string + distinctId: string + properties?: Record +} + +const dbMock = vi.hoisted(() => { + const rows: TelemetryRow[] = [] + const values = vi.fn((row: TelemetryRow) => { + rows.push(row) + return Promise.resolve() + }) + const insert = vi.fn(() => ({ values })) + return { rows, insert, values } +}) + +const smokeLogs = vi.hoisted(() => ({ + entries: [] as Array>, +})) + +vi.mock('@hachej/boring-agent/server', () => ({ + compactPiPackages: (packages: unknown[]) => packages, + registerAgentRoutes: async (app: { post: (path: string, handler: () => Promise) => void }, opts: { telemetry?: { capture: (event: { name: string; distinctId?: string; properties?: Record }) => void | Promise } }) => { + app.post('/__telemetry-smoke/agent-turn', async () => { + opts.telemetry?.capture({ + name: 'agent.chat.started', + distinctId: 'user_123', + properties: { + workspaceId: 'workspace_1', + sessionId: 'session_1', + requestId: 'request_1', + modelProvider: 'anthropic', + prompt: 'secret prompt must not be captured', + rawPath: '/tmp/private-path', + headers: { authorization: 'Bearer secret-token' }, + }, + }) + opts.telemetry?.capture({ + name: 'agent.chat.message.submitted', + distinctId: 'user_123', + properties: { + workspaceId: 'workspace_1', + sessionId: 'session_1', + requestId: 'request_1', + command: 'cat .env', + stdout: 'secret command output', + }, + }) + opts.telemetry?.capture({ + name: 'agent.tool.completed', + distinctId: 'user_123', + properties: { + workspaceId: 'workspace_1', + sessionId: 'session_1', + toolName: 'bash', + status: 'ok', + durationMs: 12, + commandOutput: 'assistant/file content must not be captured', + }, + }) + opts.telemetry?.capture({ + name: 'agent.chat.completed', + distinctId: 'user_123', + properties: { + workspaceId: 'workspace_1', + sessionId: 'session_1', + status: 'ok', + durationMs: 34, + assistantOutput: 'secret assistant output must not be captured', + }, + }) + return { ok: true } + }) + + app.post('/__telemetry-smoke/agent-failed-turn', async () => { + opts.telemetry?.capture({ + name: 'agent.chat.failed', + properties: { + workspaceId: 'workspace_1', + sessionId: 'session_failed', + status: 'error', + durationMs: 5, + errorCode: 'INTERNAL_ERROR', + stack: 'Error: stack with /tmp/private-path and secret-token', + }, + }) + opts.telemetry?.capture({ + name: 'agent.tool.failed', + properties: { + workspaceId: 'workspace_1', + sessionId: 'session_failed', + toolName: 'bash', + status: 'error', + durationMs: 6, + errorCode: 'TOOL_EXECUTION_ERROR', + stderr: 'secret stderr', + }, + }) + return { ok: true } + }) + }, +})) + +vi.mock('@hachej/boring-workspace/app/server', () => ({ + collectWorkspaceAgentServerPlugins: () => ({ + agentOptions: { + extraTools: [], + pi: undefined, + systemPromptAppend: undefined, + }, + preservedUiStateKeys: [], + provisioningContributions: [], + routeContributions: [], + }), + hasDirServerPlugin: () => false, + provisionWorkspaceAgentServer: vi.fn(), + readWorkspacePluginPackagePiSnapshot: () => ({ + additionalSkillPaths: [], + extensionFactories: [], + extensionPaths: [], + packages: [], + systemPromptAppend: undefined, + }), + resolveDefaultWorkspacePluginPackagePaths: () => [], + resolveOnePluginEntry: async (entry: unknown) => entry, +})) + +vi.mock('@hachej/boring-workspace/server', () => ({ + createInMemoryBridge: () => ({ + drainCommands: vi.fn(), + getState: vi.fn(), + postCommand: vi.fn(), + setState: vi.fn(), + subscribeCommands: vi.fn(), + }), + createWorkspaceUiTools: () => [], + uiRoutes: async () => {}, +})) + +vi.mock('../../../server/auth/index.js', () => ({ + authHook: async () => {}, + createAuth: () => ({ + handler: vi.fn(), + }), +})) + +vi.mock('../../../server/app/index.js', () => ({ + createCoreApp: async (config: CoreConfig) => { + const app = Fastify({ logger: false }) + app.decorate('config', config) + return app + }, + registerRoutes: async () => {}, +})) + +vi.mock('../../../server/routes/index.js', () => ({ + registerInviteRoutes: async () => {}, + registerMemberRoutes: async () => {}, + registerSettingsRoutes: async () => {}, + registerWorkspaceRoutes: async () => {}, +})) + +vi.mock('../../../server/db/index.js', () => ({ + createDatabase: () => ({ + db: dbMock, + sql: { end: vi.fn() }, + }), + PostgresUserStore: class PostgresUserStore {}, + PostgresWorkspaceStore: class PostgresWorkspaceStore {}, +})) + +vi.mock('../../../server/config/index.js', () => ({ + loadConfig: async () => ({ + appId: 'test-app', + auth: { url: 'http://localhost:3000' }, + encryption: { workspaceSettingsKey: 'test-key' }, + stores: 'postgres', + }), +})) + +vi.mock('../../../server/runtime/index.js', () => ({ + WorkspaceRuntimeSandboxHandleStore: class WorkspaceRuntimeSandboxHandleStore {}, +})) + +const { createCoreWorkspaceAgentServer } = await import('../createCoreWorkspaceAgentServer.js') + +function resetTelemetryEnv(): void { + delete process.env.BORING_TELEMETRY_ENABLED +} + +async function flushTelemetry(): Promise { + await Promise.resolve() + await Promise.resolve() +} + +async function createBuiltFrontendRoot(): Promise { + const appRoot = await mkdtemp(join(tmpdir(), 'boring-core-telemetry-smoke-')) + const frontDir = join(appRoot, 'dist', 'front') + await mkdir(frontDir, { recursive: true }) + await writeFile(join(frontDir, 'index.html'), 'telemetry smoke') + return appRoot +} + +function currentRows(): TelemetryRow[] { + return dbMock.rows +} + +function logSmoke(label: string, rows: TelemetryRow[]): Record { + const summary = { + label, + env: { + enabled: process.env.BORING_TELEMETRY_ENABLED ?? '', + sink: process.env.BORING_TELEMETRY_ENABLED === 'true' ? 'db' : 'noop', + }, + capturedCount: rows.length, + eventNames: rows.map((row) => row.eventName), + distinctIdKinds: rows.map((row) => (row.distinctId === 'anonymous' ? 'anonymous' : typeof row.distinctId)), + sanitizedPropertyKeys: rows.map((row) => Object.keys(row.properties ?? {}).sort()), + } + smokeLogs.entries.push(summary) + console.info('[telemetry-smoke]', JSON.stringify(summary)) + return summary +} + +function expectNoSensitiveTelemetryText(value: unknown): void { + const serialized = JSON.stringify(value) + expect(serialized).not.toContain('secret prompt') + expect(serialized).not.toContain('secret assistant') + expect(serialized).not.toContain('assistant/file content') + expect(serialized).not.toContain('cat .env') + expect(serialized).not.toContain('secret command output') + expect(serialized).not.toContain('/tmp/private-path') + expect(serialized).not.toContain('secret-token') + expect(serialized).not.toContain('secret stderr') +} + +describe('telemetry v1 env-only DB smoke', () => { + beforeEach(() => { + resetTelemetryEnv() + dbMock.rows.length = 0 + dbMock.insert.mockClear() + dbMock.values.mockClear() + smokeLogs.entries.length = 0 + }) + + afterEach(() => { + resetTelemetryEnv() + vi.clearAllMocks() + }) + + it('captures representative events from core-composed env setup with redacted DB rows/logs', async () => { + process.env.BORING_TELEMETRY_ENABLED = 'true' + + const app = await createCoreWorkspaceAgentServer({ + appRoot: await createBuiltFrontendRoot(), + serveFrontend: true, + }) + app.get('/__telemetry-smoke/server-failure/private-path', async () => { + throw new Error('raw server failure with /tmp/private-path and secret-token') + }) + + try { + const shell = await app.inject({ method: 'GET', url: '/workspace/private-path?token=secret-token' }) + const chat = await app.inject({ method: 'POST', url: '/__telemetry-smoke/agent-turn' }) + const failedTurn = await app.inject({ method: 'POST', url: '/__telemetry-smoke/agent-failed-turn' }) + const serverFailure = await app.inject({ method: 'GET', url: '/__telemetry-smoke/server-failure/private-path?token=secret-token' }) + await flushTelemetry() + + expect(shell.statusCode).toBe(200) + expect(chat.statusCode).toBe(200) + expect(failedTurn.statusCode).toBe(200) + expect(serverFailure.statusCode).toBe(500) + + const rows = currentRows() + const smokeLog = logSmoke('enabled-db-core-composed', rows) + + expect(rows.map((row) => row.eventName)).toEqual(expect.arrayContaining([ + 'app.opened', + 'agent.chat.started', + 'agent.chat.message.submitted', + 'agent.tool.completed', + 'agent.chat.completed', + 'agent.chat.failed', + 'agent.tool.failed', + 'server.request.failed', + ])) + expect(rows.every((row) => row.appId === 'test-app')).toBe(true) + expect(rows.find((row) => row.eventName === 'agent.chat.started')?.properties).toEqual({ + workspaceId: 'workspace_1', + sessionId: 'session_1', + requestId: 'request_1', + modelProvider: 'anthropic', + }) + expect(rows.find((row) => row.eventName === 'agent.tool.completed')?.properties).toEqual({ + workspaceId: 'workspace_1', + sessionId: 'session_1', + toolName: 'bash', + status: 'ok', + durationMs: 12, + }) + expect(smokeLog).toMatchObject({ + env: { + enabled: 'true', + sink: 'db', + }, + capturedCount: rows.length, + }) + expectNoSensitiveTelemetryText(rows) + expectNoSensitiveTelemetryText(smokeLogs.entries) + } finally { + await app.close() + } + }) + + it.each([ + ['unset', undefined], + ['false', 'false'], + ])('captures zero events when telemetry is %s', async (_label, enabled) => { + if (enabled) process.env.BORING_TELEMETRY_ENABLED = enabled + + const app = await createCoreWorkspaceAgentServer({ + appRoot: await createBuiltFrontendRoot(), + serveFrontend: true, + }) + + try { + await app.inject({ method: 'GET', url: '/' }) + await app.inject({ method: 'POST', url: '/__telemetry-smoke/agent-turn' }) + await flushTelemetry() + + const rows = currentRows() + const smokeLog = logSmoke(`disabled-${enabled ?? 'unset'}`, rows) + + expect(rows).toHaveLength(0) + expect(smokeLog).toMatchObject({ + env: { + enabled: enabled ?? '', + sink: 'noop', + }, + capturedCount: 0, + eventNames: [], + }) + expectNoSensitiveTelemetryText(smokeLogs.entries) + } finally { + await app.close() + } + }) +}) diff --git a/packages/core/src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry.test.ts b/packages/core/src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry.test.ts new file mode 100644 index 000000000..828b0b2ca --- /dev/null +++ b/packages/core/src/app/server/__tests__/createCoreWorkspaceAgentServer.telemetry.test.ts @@ -0,0 +1,291 @@ +import { mkdir, mkdtemp, writeFile } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import Fastify from 'fastify' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +import { ERROR_CODES } from '../../../shared/errors.js' +import type { TelemetrySink } from '../../../shared/telemetry.js' +import type { CoreConfig } from '../../../shared/types.js' + +const agentMock = vi.hoisted(() => ({ + registerOptions: [] as Array>, +})) + +const coreAppMock = vi.hoisted(() => ({ + debugLogs: [] as unknown[][], +})) + +const dbMock = vi.hoisted(() => { + const rows: Array> = [] + const values = vi.fn((row: Record) => { + rows.push(row) + return Promise.resolve() + }) + const insert = vi.fn(() => ({ values })) + return { rows, insert, values } +}) + +vi.mock('@hachej/boring-agent/server', () => ({ + compactPiPackages: (packages: unknown[]) => packages, + registerAgentRoutes: async (_app: unknown, opts: Record) => { + agentMock.registerOptions.push(opts) + }, +})) + +vi.mock('@hachej/boring-workspace/app/server', () => ({ + collectWorkspaceAgentServerPlugins: () => ({ + agentOptions: { + extraTools: [], + pi: undefined, + systemPromptAppend: undefined, + }, + preservedUiStateKeys: [], + provisioningContributions: [], + routeContributions: [], + }), + hasDirServerPlugin: () => false, + provisionWorkspaceAgentServer: vi.fn(), + readWorkspacePluginPackagePiSnapshot: () => ({ + additionalSkillPaths: [], + extensionFactories: [], + extensionPaths: [], + packages: [], + systemPromptAppend: undefined, + }), + resolveDefaultWorkspacePluginPackagePaths: () => [], + resolveOnePluginEntry: async (entry: unknown) => entry, +})) + +vi.mock('@hachej/boring-workspace/server', () => ({ + createInMemoryBridge: () => ({ + drainCommands: vi.fn(), + getState: vi.fn(), + postCommand: vi.fn(), + setState: vi.fn(), + subscribeCommands: vi.fn(), + }), + createWorkspaceUiTools: () => [], + uiRoutes: async () => {}, +})) + +vi.mock('../../../server/auth/index.js', () => ({ + authHook: async () => {}, + createAuth: () => ({ + handler: vi.fn(), + }), +})) + +vi.mock('../../../server/app/index.js', () => ({ + createCoreApp: async (config: CoreConfig) => { + const app = Fastify({ logger: false }) + app.decorate('config', config) + app.log.debug = (...args: unknown[]) => { + coreAppMock.debugLogs.push(args) + } + return app + }, + registerRoutes: async () => {}, +})) + +vi.mock('../../../server/routes/index.js', () => ({ + registerInviteRoutes: async () => {}, + registerMemberRoutes: async () => {}, + registerSettingsRoutes: async () => {}, + registerWorkspaceRoutes: async () => {}, +})) + +vi.mock('../../../server/db/index.js', () => ({ + createDatabase: () => ({ + db: dbMock, + sql: { end: vi.fn() }, + }), + PostgresUserStore: class PostgresUserStore {}, + PostgresWorkspaceStore: class PostgresWorkspaceStore {}, +})) + +vi.mock('../../../server/config/index.js', () => ({ + loadConfig: async () => ({ + appId: 'test-app', + auth: { url: 'http://localhost:3000' }, + encryption: { workspaceSettingsKey: 'test-key' }, + stores: 'postgres', + }), +})) + +vi.mock('../../../server/runtime/index.js', () => ({ + WorkspaceRuntimeSandboxHandleStore: class WorkspaceRuntimeSandboxHandleStore {}, +})) + +const { createCoreWorkspaceAgentServer } = await import('../createCoreWorkspaceAgentServer.js') + +function resetTelemetryEnv(): void { + delete process.env.BORING_TELEMETRY_ENABLED +} + +async function flushTelemetry(): Promise { + await Promise.resolve() + await Promise.resolve() +} + +async function createBuiltFrontendRoot(): Promise { + const appRoot = await mkdtemp(join(tmpdir(), 'boring-core-telemetry-')) + const frontDir = join(appRoot, 'dist', 'front') + await mkdir(frontDir, { recursive: true }) + await writeFile(join(frontDir, 'index.html'), 'app') + return appRoot +} + +describe('createCoreWorkspaceAgentServer telemetry wiring', () => { + beforeEach(() => { + resetTelemetryEnv() + agentMock.registerOptions.length = 0 + coreAppMock.debugLogs.length = 0 + dbMock.rows.length = 0 + dbMock.insert.mockClear() + dbMock.values.mockClear() + }) + + afterEach(() => { + resetTelemetryEnv() + vi.clearAllMocks() + }) + + it('uses the core DB telemetry env helper by default and passes the sink to agent routes', async () => { + process.env.BORING_TELEMETRY_ENABLED = 'true' + + const app = await createCoreWorkspaceAgentServer({ serveFrontend: false }) + try { + const telemetry = agentMock.registerOptions[0]?.telemetry as TelemetrySink | undefined + telemetry?.capture({ + name: 'agent.chat.started', + distinctId: 'user_123', + properties: { workspaceId: 'workspace_1', prompt: 'do not send' }, + }) + await flushTelemetry() + + expect(agentMock.registerOptions, 'agent routes should be registered once').toHaveLength(1) + expect(telemetry, 'resolved telemetry sink should be passed to agent routes').toBeDefined() + expect(dbMock.rows).toEqual([ + { + appId: 'test-app', + eventName: 'agent.chat.started', + distinctId: 'user_123', + properties: { workspaceId: 'workspace_1' }, + }, + ]) + expect(JSON.stringify(dbMock.rows)).not.toContain('do not send') + expect(coreAppMock.debugLogs).toContainEqual([ + { telemetry: { source: 'db-env' } }, + 'resolved telemetry sink', + ]) + } finally { + await app.close() + } + }) + + it('lets a custom telemetry sink override DB helper creation', async () => { + process.env.BORING_TELEMETRY_ENABLED = 'true' + const customTelemetry: TelemetrySink = { capture: vi.fn() } + + const app = await createCoreWorkspaceAgentServer({ + serveFrontend: false, + telemetry: customTelemetry, + }) + try { + expect(agentMock.registerOptions[0]?.telemetry).toBe(customTelemetry) + expect(dbMock.insert, 'custom sink should bypass DB telemetry helper').not.toHaveBeenCalled() + expect(coreAppMock.debugLogs).toContainEqual([ + { telemetry: { source: 'custom' } }, + 'resolved telemetry sink', + ]) + } finally { + await app.close() + } + }) + + it('passes noop telemetry when env telemetry is disabled', async () => { + const app = await createCoreWorkspaceAgentServer({ serveFrontend: false }) + try { + const telemetry = agentMock.registerOptions[0]?.telemetry as TelemetrySink | undefined + telemetry?.capture({ name: 'app.opened' }) + await flushTelemetry() + + expect(telemetry, 'disabled env still passes a safe noop sink').toBeDefined() + expect(dbMock.rows).toEqual([]) + expect(coreAppMock.debugLogs).toContainEqual([ + { telemetry: { source: 'noop-env' } }, + 'resolved telemetry sink', + ]) + } finally { + await app.close() + } + }) + + it('emits app.opened when the server shell is served without raw URL data', async () => { + const capture = vi.fn() + const app = await createCoreWorkspaceAgentServer({ + appRoot: await createBuiltFrontendRoot(), + serveFrontend: true, + telemetry: { capture }, + }) + try { + const res = await app.inject({ method: 'GET', url: '/workspace/private-path?token=secret' }) + + expect(res.statusCode).toBe(200) + expect(capture).toHaveBeenCalledWith({ + name: 'app.opened', + properties: { requestId: expect.any(String) }, + }) + expect(JSON.stringify(capture.mock.calls)).not.toContain('private-path') + expect(JSON.stringify(capture.mock.calls)).not.toContain('secret') + } finally { + await app.close() + } + }) + + it('emits server.request.failed with stable metadata only', async () => { + const capture = vi.fn() + const app = await createCoreWorkspaceAgentServer({ serveFrontend: false, telemetry: { capture } }) + app.get('/boom/private-path', async () => { + throw new Error('raw secret failure with /tmp/private-path') + }) + try { + const res = await app.inject({ method: 'GET', url: '/boom/private-path?cookie=secret' }) + + expect(res.statusCode).toBe(500) + expect(capture).toHaveBeenCalledWith({ + name: 'server.request.failed', + properties: { + requestId: expect.any(String), + status: 500, + errorCode: ERROR_CODES.INTERNAL_ERROR, + }, + }) + expect(JSON.stringify(capture.mock.calls)).not.toContain('private-path') + expect(JSON.stringify(capture.mock.calls)).not.toContain('secret') + } finally { + await app.close() + } + }) + + it('keeps serving the shell when telemetry capture fails', async () => { + const app = await createCoreWorkspaceAgentServer({ + appRoot: await createBuiltFrontendRoot(), + serveFrontend: true, + telemetry: { + capture() { + throw new Error('telemetry sink down') + }, + }, + }) + try { + const res = await app.inject({ method: 'GET', url: '/' }) + + expect(res.statusCode).toBe(200) + expect(res.body).toContain('') + } finally { + await app.close() + } + }) +}) diff --git a/packages/core/src/app/server/createCoreWorkspaceAgentServer.ts b/packages/core/src/app/server/createCoreWorkspaceAgentServer.ts index c11b07fa7..40345b6f2 100644 --- a/packages/core/src/app/server/createCoreWorkspaceAgentServer.ts +++ b/packages/core/src/app/server/createCoreWorkspaceAgentServer.ts @@ -29,6 +29,8 @@ import { import type { FastifyInstance } from 'fastify' import type postgres from 'postgres' import type { CoreConfig } from '../../shared/types.js' +import { ERROR_CODES } from '../../shared/errors.js' +import { safeCapture, type TelemetrySink } from '../../shared/telemetry.js' import { authHook, createAuth, @@ -54,6 +56,7 @@ import { } from '../../server/db/index.js' import { loadConfig, type LoadConfigOptions } from '../../server/config/index.js' import { WorkspaceRuntimeSandboxHandleStore } from '../../server/runtime/index.js' +import { createDatabaseTelemetryFromEnv } from '../../server/telemetry/db.js' const MIME_TYPES: Record = { '.css': 'text/css; charset=utf-8', @@ -127,6 +130,8 @@ export interface CreateCoreWorkspaceAgentServerOptions extraTools?: RegisterAgentRoutesOptions['extraTools'] systemPromptAppend?: string serveFrontend?: boolean + /** Optional best-effort telemetry sink. Defaults to core's DB-backed env helper. */ + telemetry?: TelemetrySink } type AgentPiOptions = RegisterAgentRoutesOptions['pi'] @@ -390,9 +395,31 @@ async function registerAuthProxy(app: CoreWorkspaceAgentServer) { }) } +function captureAppOpened(telemetry: TelemetrySink, requestId: string): void { + safeCapture(telemetry, { + name: 'app.opened', + properties: { requestId }, + }) +} + +function registerTelemetryHooks(app: CoreWorkspaceAgentServer, telemetry: TelemetrySink): void { + app.addHook('onResponse', async (request, reply) => { + if (reply.statusCode < 500) return + safeCapture(telemetry, { + name: 'server.request.failed', + properties: { + requestId: request.id, + status: reply.statusCode, + errorCode: ERROR_CODES.INTERNAL_ERROR, + }, + }) + }) +} + async function registerFrontendAuthPages( app: CoreWorkspaceAgentServer, appRoot: string, + telemetry: TelemetrySink, ) { const frontDistDir = path.resolve(appRoot, 'dist/front') const indexPath = path.resolve(frontDistDir, 'index.html') @@ -407,6 +434,7 @@ async function registerFrontendAuthPages( } } const html = await readFile(indexPath, 'utf-8') + captureAppOpened(telemetry, request.id) reply.type('text/html; charset=utf-8') return reply.send(injectCspNonceIntoHtml(html, request.cspNonce)) }) @@ -416,6 +444,7 @@ async function registerFrontendAuthPages( async function registerFrontendFallback( app: CoreWorkspaceAgentServer, appRoot: string, + telemetry: TelemetrySink, ) { const frontDistDir = path.resolve(appRoot, 'dist/front') const indexPath = path.resolve(frontDistDir, 'index.html') @@ -430,6 +459,7 @@ async function registerFrontendFallback( } const html = await readFile(indexPath, 'utf-8') + captureAppOpened(telemetry, request.id) reply.type('text/html; charset=utf-8') return reply.send(injectCspNonceIntoHtml(html, request.cspNonce)) }) @@ -461,6 +491,7 @@ async function registerFrontendFallback( } const html = await readFile(indexPath, 'utf-8') + captureAppOpened(telemetry, request.id) reply.type('text/html; charset=utf-8') return reply.send(injectCspNonceIntoHtml(html, request.cspNonce)) }) @@ -549,11 +580,20 @@ export async function createCoreWorkspaceAgentServer( const serveFrontend = options.serveFrontend ?? (process.env.NODE_ENV !== 'development' && Boolean(appRoot)) const workspaceRoot = options.workspaceRoot ?? process.cwd() + const telemetrySource = options.telemetry + ? 'custom' + : process.env.BORING_TELEMETRY_ENABLED === 'true' + ? 'db-env' + : 'noop-env' + const telemetry = options.telemetry ?? createDatabaseTelemetryFromEnv(db, { appId: config.appId }, process.env) + app.log.debug({ telemetry: { source: telemetrySource } }, 'resolved telemetry sink') + + registerTelemetryHooks(app, telemetry) await registerCoreRoutes({ app, sql, db, userStore, workspaceStore }) if (serveFrontend && appRoot) { - await registerFrontendAuthPages(app, appRoot) + await registerFrontendAuthPages(app, appRoot, telemetry) } await registerAuthProxy(app) @@ -693,6 +733,7 @@ export async function createCoreWorkspaceAgentServer( getWorkspaceId: resolveWorkspaceId, getWorkspaceRoot: resolveRoot, registerHealthRoute: options.registerHealthRoute ?? false, + telemetry, }) await app.register(uiRoutes, { @@ -705,7 +746,7 @@ export async function createCoreWorkspaceAgentServer( } if (serveFrontend && appRoot) { - await registerFrontendFallback(app, appRoot) + await registerFrontendFallback(app, appRoot, telemetry) } return app diff --git a/packages/core/src/server/db/schema.ts b/packages/core/src/server/db/schema.ts index d026ae10d..0fd154a3b 100644 --- a/packages/core/src/server/db/schema.ts +++ b/packages/core/src/server/db/schema.ts @@ -268,3 +268,21 @@ export const idempotencyKeys = pgTable( index('idempotency_keys_created_at_idx').on(table.createdAt), ], ) + +export const telemetryEvents = pgTable( + 'telemetry_events', + { + id: uuid('id') + .default(sql`gen_random_uuid()`) + .primaryKey(), + appId: text('app_id').notNull(), + eventName: text('event_name').notNull(), + distinctId: text('distinct_id').notNull().default('anonymous'), + properties: jsonb('properties').notNull().default({}), + createdAt: timestamp('created_at').defaultNow().notNull(), + }, + (table) => [ + index('telemetry_events_app_created_at_idx').on(table.appId, table.createdAt), + index('telemetry_events_event_name_idx').on(table.eventName), + ], +) diff --git a/packages/core/src/server/telemetry/__tests__/db.test.ts b/packages/core/src/server/telemetry/__tests__/db.test.ts new file mode 100644 index 000000000..4d92d92a1 --- /dev/null +++ b/packages/core/src/server/telemetry/__tests__/db.test.ts @@ -0,0 +1,225 @@ +import { describe, expect, it } from 'vitest' + +import { + createDatabaseTelemetry, + createDatabaseTelemetryFromEnv, + sanitizeTelemetryDistinctId, + sanitizeTelemetryEventName, + sanitizeTelemetryProperties, +} from '../db' +import type { Database } from '../../db/index' + +type InsertRow = Record + +function createFakeDb(options: { fail?: 'sync' | 'async' } = {}): { db: Database; rows: InsertRow[] } { + const rows: InsertRow[] = [] + const db = { + insert() { + if (options.fail === 'sync') throw new Error('db down') + return { + values(row: InsertRow) { + if (options.fail === 'async') return Promise.reject(new Error('db down')) + rows.push(row) + return Promise.resolve() + }, + } + }, + } as unknown as Database + return { db, rows } +} + +async function flushTelemetry(): Promise { + await Promise.resolve() + await Promise.resolve() +} + +describe('createDatabaseTelemetryFromEnv', () => { + it('uses noop telemetry when BORING_TELEMETRY_ENABLED is unset', async () => { + const { db, rows } = createFakeDb() + const telemetry = createDatabaseTelemetryFromEnv(db, { appId: 'core-app' }, {}) + + telemetry.capture({ name: 'app.opened' }) + await flushTelemetry() + + expect(rows).toEqual([]) + }) + + it('uses noop telemetry when BORING_TELEMETRY_ENABLED is false', async () => { + const { db, rows } = createFakeDb() + const telemetry = createDatabaseTelemetryFromEnv(db, { appId: 'core-app' }, { + BORING_TELEMETRY_ENABLED: 'false', + }) + + telemetry.capture({ name: 'app.opened' }) + await flushTelemetry() + + expect(rows).toEqual([]) + }) + + it('stores sanitized events in the core database when enabled', async () => { + const { db, rows } = createFakeDb() + const telemetry = createDatabaseTelemetryFromEnv(db, { appId: 'core-app' }, { + BORING_TELEMETRY_ENABLED: 'true', + }) + + telemetry.capture({ + name: 'agent.chat.started', + distinctId: 'user_123', + properties: { + workspaceId: 'workspace_1', + sessionId: 'session_1', + durationMs: 12, + prompt: 'secret prompt must not be stored', + command: 'cat .env', + path: '/tmp/private-path', + }, + }) + await flushTelemetry() + + expect(rows).toEqual([ + { + appId: 'core-app', + eventName: 'agent.chat.started', + distinctId: 'user_123', + properties: { + workspaceId: 'workspace_1', + sessionId: 'session_1', + durationMs: 12, + }, + }, + ]) + const serialized = JSON.stringify(rows) + expect(serialized).not.toContain('secret prompt') + expect(serialized).not.toContain('cat .env') + expect(serialized).not.toContain('private-path') + }) +}) + +describe('createDatabaseTelemetry', () => { + it('drops unsafe event names instead of writing rows', async () => { + const { db, rows } = createFakeDb() + const telemetry = createDatabaseTelemetry(db, { appId: 'core-app' }) + + telemetry.capture({ name: 'secret.token./tmp/private' }) + await flushTelemetry() + + expect(rows).toEqual([]) + }) + + it('falls back to anonymous for unsafe distinct ids', async () => { + const { db, rows } = createFakeDb() + const telemetry = createDatabaseTelemetry(db, { appId: 'core-app' }) + + telemetry.capture({ name: 'app.opened', distinctId: 'user@example.com' }) + await flushTelemetry() + + expect(rows[0]).toMatchObject({ distinctId: 'anonymous' }) + }) + + it('swallows sync and async database insert failures', async () => { + const syncTelemetry = createDatabaseTelemetry(createFakeDb({ fail: 'sync' }).db, { appId: 'core-app' }) + const asyncTelemetry = createDatabaseTelemetry(createFakeDb({ fail: 'async' }).db, { appId: 'core-app' }) + + expect(() => syncTelemetry.capture({ name: 'app.opened' })).not.toThrow() + expect(() => asyncTelemetry.capture({ name: 'app.opened' })).not.toThrow() + await flushTelemetry() + }) +}) + +describe('sanitizeTelemetryEventName', () => { + it('keeps dotted event names and drops unsafe values', () => { + expect(sanitizeTelemetryEventName('agent.chat.started')).toBe('agent.chat.started') + expect(sanitizeTelemetryEventName('secret.token./tmp/private')).toBeUndefined() + expect(sanitizeTelemetryEventName('../escape')).toBeUndefined() + }) +}) + +describe('sanitizeTelemetryDistinctId', () => { + it('keeps safe ids and falls back for emails, tokens, and paths', () => { + expect(sanitizeTelemetryDistinctId('user_123')).toBe('user_123') + expect(sanitizeTelemetryDistinctId('user@example.com')).toBe('anonymous') + expect(sanitizeTelemetryDistinctId('Bearer secret-token')).toBe('anonymous') + expect(sanitizeTelemetryDistinctId('/tmp/private-path')).toBe('anonymous') + expect(sanitizeTelemetryDistinctId(undefined)).toBe('anonymous') + }) +}) + +describe('sanitizeTelemetryProperties', () => { + it('keeps only allowlisted primitive telemetry properties', () => { + expect( + sanitizeTelemetryProperties({ + workspaceId: 'workspace_1', + sessionId: 'session_1', + requestId: 'request_1', + runtimeMode: 'local', + modelProvider: 'anthropic', + toolName: 'bash', + panelId: 'files', + commandId: 'open', + status: 'ok', + durationMs: 42, + errorCode: 'WORKSPACE_NOT_READY', + packageName: '@hachej/boring-core', + packageVersion: '0.1.0', + invalidDuration: Number.NaN, + prompt: 'secret prompt', + assistantOutput: 'secret answer', + command: 'cat .env', + stdout: 'secret output', + path: '/tmp/private', + stack: 'stack trace', + headers: { authorization: 'Bearer secret' }, + nestedAllowedKey: { workspaceId: 'nested' }, + env: 'SECRET=value', + objectValueOnAllowedKey: { unsafe: true }, + durationMsUnsafe: Number.POSITIVE_INFINITY, + }), + ).toEqual({ + workspaceId: 'workspace_1', + sessionId: 'session_1', + requestId: 'request_1', + runtimeMode: 'local', + modelProvider: 'anthropic', + toolName: 'bash', + panelId: 'files', + commandId: 'open', + status: 'ok', + durationMs: 42, + errorCode: 'WORKSPACE_NOT_READY', + packageName: '@hachej/boring-core', + packageVersion: '0.1.0', + }) + }) + + it('drops suspicious strings even on allowlisted keys', () => { + expect( + sanitizeTelemetryProperties({ + requestId: '/tmp/private-path', + sessionId: 'Bearer secret-token', + workspaceId: 'sk_live_abc123', + toolName: 'ghp_abc123', + modelProvider: 'anthropic', + errorCode: 'SECRET_TOKEN', + packageName: '@hachej/boring-core', + }), + ).toEqual({ + modelProvider: 'anthropic', + packageName: '@hachej/boring-core', + }) + }) + + it('keeps lower-case stable core error codes', () => { + expect(sanitizeTelemetryProperties({ errorCode: 'internal_error' })).toEqual({ + errorCode: 'internal_error', + }) + }) + + it('drops non-finite numbers even on allowlisted keys', () => { + expect( + sanitizeTelemetryProperties({ + durationMs: Number.POSITIVE_INFINITY, + requestId: 'request_1', + }), + ).toEqual({ requestId: 'request_1' }) + }) +}) diff --git a/packages/core/src/server/telemetry/db.ts b/packages/core/src/server/telemetry/db.ts new file mode 100644 index 000000000..8ff82ad1f --- /dev/null +++ b/packages/core/src/server/telemetry/db.ts @@ -0,0 +1,132 @@ +import { noopTelemetry, type TelemetryEvent, type TelemetrySink } from '../../shared/telemetry.js' +import { telemetryEvents } from '../db/schema.js' +import type { Database } from '../db/index.js' + +const EVENT_NAME_PATTERN = /^[a-z][a-z0-9_-]*(?:\.[a-z][a-z0-9_-]*){0,8}$/ +const SAFE_ID_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_.:-]{0,127}$/ +const SAFE_SLUG_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_.:-]{0,63}$/ +const SAFE_STATUS_PATTERN = /^[a-z][a-z0-9_-]{0,31}$/ +const SAFE_ERROR_CODE_PATTERN = /^[A-Za-z][A-Za-z0-9_:-]{0,63}$/ +const SAFE_PACKAGE_NAME_PATTERN = /^(?:@[A-Za-z0-9_.-]+\/)?[A-Za-z0-9_.-]{1,96}$/ +const SAFE_PACKAGE_VERSION_PATTERN = /^v?\d+\.\d+\.\d+(?:[-+][A-Za-z0-9.-]+)?$/ +const SUSPICIOUS_STRING_PATTERN = /(?:secret|token|bearer|password|api[_-]?key|private|\.env|sk[_-](?:live|test)|ghp_|github_pat_|glpat-|xox[baprs]-|AKIA|ASIA|ya29\.|eyJ|phc_|npm_)/i + +const ALLOWED_PROPERTY_KEYS = new Set([ + 'workspaceId', + 'sessionId', + 'requestId', + 'runtimeMode', + 'modelProvider', + 'toolName', + 'panelId', + 'commandId', + 'status', + 'durationMs', + 'errorCode', + 'packageName', + 'packageVersion', +]) + +type SafeTelemetryProperty = string | number | boolean | null + +export interface CreateDatabaseTelemetryOptions { + appId: string + enabled?: boolean +} + +export function createDatabaseTelemetryFromEnv( + db: Database, + options: { appId: string }, + env: NodeJS.ProcessEnv = process.env, +): TelemetrySink { + if (env.BORING_TELEMETRY_ENABLED !== 'true') return noopTelemetry + return createDatabaseTelemetry(db, { appId: options.appId, enabled: true }) +} + +export function createDatabaseTelemetry( + db: Database, + options: CreateDatabaseTelemetryOptions, +): TelemetrySink { + if (options.enabled === false) return noopTelemetry + + return { + capture(event: TelemetryEvent) { + const eventName = sanitizeTelemetryEventName(event.name) + if (!eventName) return + + const row = { + appId: options.appId, + eventName, + distinctId: sanitizeTelemetryDistinctId(event.distinctId), + properties: sanitizeTelemetryProperties(event.properties), + } + + try { + void Promise.resolve(db.insert(telemetryEvents).values(row)).catch(() => {}) + } catch {} + }, + } +} + +export function sanitizeTelemetryEventName(value: string): string | undefined { + if (value.length > 128) return undefined + if (SUSPICIOUS_STRING_PATTERN.test(value)) return undefined + return EVENT_NAME_PATTERN.test(value) ? value : undefined +} + +export function sanitizeTelemetryDistinctId(value: string | undefined): string { + if (!value) return 'anonymous' + return sanitizeTelemetryString('distinctId', value) ?? 'anonymous' +} + +export function sanitizeTelemetryProperties( + properties: Record | undefined, +): Record { + const sanitized: Record = {} + if (!properties) return sanitized + + for (const [key, value] of Object.entries(properties)) { + if (!ALLOWED_PROPERTY_KEYS.has(key)) continue + const sanitizedValue = sanitizeTelemetryProperty(key, value) + if (sanitizedValue === undefined) continue + sanitized[key] = sanitizedValue + } + + return sanitized +} + +function sanitizeTelemetryProperty(key: string, value: unknown): SafeTelemetryProperty | undefined { + if (value === null || typeof value === 'boolean') return value + if (typeof value === 'number') return Number.isFinite(value) ? value : undefined + if (typeof value !== 'string') return undefined + return sanitizeTelemetryString(key, value) +} + +function sanitizeTelemetryString(key: string, value: string): string | undefined { + if (value.length === 0) return undefined + if (SUSPICIOUS_STRING_PATTERN.test(value)) return undefined + + switch (key) { + case 'workspaceId': + case 'sessionId': + case 'requestId': + case 'distinctId': + return SAFE_ID_PATTERN.test(value) ? value : undefined + case 'runtimeMode': + case 'modelProvider': + case 'toolName': + case 'panelId': + case 'commandId': + return SAFE_SLUG_PATTERN.test(value) ? value : undefined + case 'status': + return SAFE_STATUS_PATTERN.test(value) ? value : undefined + case 'errorCode': + return SAFE_ERROR_CODE_PATTERN.test(value) ? value : undefined + case 'packageName': + return SAFE_PACKAGE_NAME_PATTERN.test(value) ? value : undefined + case 'packageVersion': + return SAFE_PACKAGE_VERSION_PATTERN.test(value) ? value : undefined + default: + return undefined + } +} diff --git a/packages/core/src/shared/__tests__/telemetry.test.ts b/packages/core/src/shared/__tests__/telemetry.test.ts new file mode 100644 index 000000000..fdb56eb5a --- /dev/null +++ b/packages/core/src/shared/__tests__/telemetry.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, expectTypeOf, it, vi } from 'vitest' + +import { + noopTelemetry, + safeCapture, + type TelemetryEvent, + type TelemetrySink, +} from '../telemetry' + +describe('telemetry contract', () => { + const event: TelemetryEvent = { + name: 'telemetry.test', + distinctId: 'user-1', + properties: { status: 'ok' }, + } + + it('noop capture accepts events without side effects', () => { + expect(() => noopTelemetry.capture(event)).not.toThrow() + }) + + it('safeCapture forwards normal captures', () => { + const capture = vi.fn() + const telemetry: TelemetrySink = { capture } + + safeCapture(telemetry, event) + + expect(capture).toHaveBeenCalledWith(event) + }) + + it('safeCapture swallows synchronous capture failures', () => { + const telemetry: TelemetrySink = { + capture() { + throw new Error('sync telemetry failure') + }, + } + + expect(() => safeCapture(telemetry, event)).not.toThrow() + }) + + it('safeCapture swallows asynchronous capture rejections', async () => { + const telemetry: TelemetrySink = { + capture: vi.fn().mockRejectedValue(new Error('async telemetry failure')), + } + + expect(() => safeCapture(telemetry, event)).not.toThrow() + await Promise.resolve() + }) + + it('allows sinks to expose an optional flush hook', async () => { + const flush = vi.fn().mockResolvedValue(undefined) + const telemetry: TelemetrySink = { + capture() {}, + flush, + } + + await telemetry.flush?.() + + expect(flush).toHaveBeenCalledOnce() + }) + + it('keeps the TelemetrySink shape structural', () => { + expectTypeOf().toEqualTypeOf<{ + capture: (event: TelemetryEvent) => void | Promise + flush?: () => void | Promise + }>() + }) +}) diff --git a/packages/core/src/shared/index.ts b/packages/core/src/shared/index.ts index ed7e9a500..3def1b461 100644 --- a/packages/core/src/shared/index.ts +++ b/packages/core/src/shared/index.ts @@ -22,3 +22,6 @@ export { ConfigValidationError, } from './errors.js' export type { ErrorCode } from './errors.js' + +export { noopTelemetry, safeCapture } from './telemetry.js' +export type { TelemetryEvent, TelemetrySink } from './telemetry.js' diff --git a/packages/core/src/shared/telemetry.ts b/packages/core/src/shared/telemetry.ts new file mode 100644 index 000000000..d7579ec52 --- /dev/null +++ b/packages/core/src/shared/telemetry.ts @@ -0,0 +1,20 @@ +export interface TelemetrySink { + capture(event: TelemetryEvent): void | Promise + flush?(): void | Promise +} + +export interface TelemetryEvent { + name: string + distinctId?: string + properties?: Record +} + +export const noopTelemetry: TelemetrySink = { + capture() {}, +} + +export function safeCapture(telemetry: TelemetrySink, event: TelemetryEvent): void { + try { + void Promise.resolve(telemetry.capture(event)).catch(() => {}) + } catch {} +} diff --git a/packages/workspace/src/shared/__tests__/telemetry.test.ts b/packages/workspace/src/shared/__tests__/telemetry.test.ts new file mode 100644 index 000000000..27eb47a8d --- /dev/null +++ b/packages/workspace/src/shared/__tests__/telemetry.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, expectTypeOf, it, vi } from "vitest" + +import { + noopTelemetry, + safeCapture, + type TelemetryEvent, + type TelemetrySink, +} from "../telemetry" + +describe("telemetry contract", () => { + const event: TelemetryEvent = { + name: "telemetry.test", + distinctId: "user-1", + properties: { status: "ok" }, + } + + it("noop capture accepts events without side effects", () => { + expect(() => noopTelemetry.capture(event)).not.toThrow() + }) + + it("safeCapture forwards normal captures", () => { + const capture = vi.fn() + const telemetry: TelemetrySink = { capture } + + safeCapture(telemetry, event) + + expect(capture).toHaveBeenCalledWith(event) + }) + + it("safeCapture swallows synchronous capture failures", () => { + const telemetry: TelemetrySink = { + capture() { + throw new Error("sync telemetry failure") + }, + } + + expect(() => safeCapture(telemetry, event)).not.toThrow() + }) + + it("safeCapture swallows asynchronous capture rejections", async () => { + const telemetry: TelemetrySink = { + capture: vi.fn().mockRejectedValue(new Error("async telemetry failure")), + } + + expect(() => safeCapture(telemetry, event)).not.toThrow() + await Promise.resolve() + }) + + it("allows sinks to expose an optional flush hook", async () => { + const flush = vi.fn().mockResolvedValue(undefined) + const telemetry: TelemetrySink = { + capture() {}, + flush, + } + + await telemetry.flush?.() + + expect(flush).toHaveBeenCalledOnce() + }) + + it("keeps the TelemetrySink shape structural", () => { + expectTypeOf().toEqualTypeOf<{ + capture: (event: TelemetryEvent) => void | Promise + flush?: () => void | Promise + }>() + }) +}) diff --git a/packages/workspace/src/shared/index.ts b/packages/workspace/src/shared/index.ts index 32a9cabf9..e30efd703 100644 --- a/packages/workspace/src/shared/index.ts +++ b/packages/workspace/src/shared/index.ts @@ -22,3 +22,5 @@ export type { export { WORKSPACE_OPEN_PATH_SURFACE_KIND } from "./types/surface" export { definePanel } from "./types/panel" export type { AgentTool, JSONSchema, ToolExecContext, ToolResult } from "./types/agent-tool" +export type { TelemetryEvent, TelemetrySink } from "./telemetry" +export { noopTelemetry, safeCapture } from "./telemetry" diff --git a/packages/workspace/src/shared/telemetry.ts b/packages/workspace/src/shared/telemetry.ts new file mode 100644 index 000000000..d7579ec52 --- /dev/null +++ b/packages/workspace/src/shared/telemetry.ts @@ -0,0 +1,20 @@ +export interface TelemetrySink { + capture(event: TelemetryEvent): void | Promise + flush?(): void | Promise +} + +export interface TelemetryEvent { + name: string + distinctId?: string + properties?: Record +} + +export const noopTelemetry: TelemetrySink = { + capture() {}, +} + +export function safeCapture(telemetry: TelemetrySink, event: TelemetryEvent): void { + try { + void Promise.resolve(telemetry.capture(event)).catch(() => {}) + } catch {} +}