Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions docs/operate/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,24 @@ The crash path is covered too. `installFatalHandlers(processName)` in `src/logge

`runPipeline` (`src/core/pipeline.ts`) emits structured timing events whose `pipeline.stage` shape is pinned by the `.strict()` Zod schema in `src/core/log-fields.ts` (parallel to the ship schema below; the co-located test rejects field drift). Four event keys:

| `event` | Meaning |
| -------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `pipeline.started` | Request entered `runPipeline`. Carries the child-logger bindings (deliveryId, entity). |
| `pipeline.stage` | One stage finished; carries `stage` + `delta_ms`. Stages: trackingComment.create, token.resolve, github.fetch, prompt.build, repo.clone, executor.invoke, trackingComment.finalize, workspace.cleanup. |
| `pipeline.completed` | Success terminal line; carries `pipeline_wall_clock_ms` alongside the existing cost/turn fields. |
| `pipeline.failed` | Failure terminal line; carries `pipeline_wall_clock_ms` + the redacted `err`. |
| `event` | Meaning |
| -------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `pipeline.started` | Request entered `runPipeline`. Carries the child-logger bindings (deliveryId, entity). |
| `pipeline.stage` | One stage finished; carries `stage` + `delta_ms`. Stages: trackingComment.create, token.resolve, github.fetch, prompt.build, repo.clone, executor.invoke, trackingComment.finalize, workspace.cleanup. |
| `pipeline.completed` | Success terminal line; carries `pipeline_wall_clock_ms` alongside `costUsd` / `numTurns` and the token counters `inputTokens` / `outputTokens` / `cacheReadInputTokens` / `cacheCreationInputTokens` (issue #192). Pinned by `PipelineCompletedLogSchema`. |
| `pipeline.failed` | Failure terminal line; carries `pipeline_wall_clock_ms` + the redacted `err`. |

### Token usage and the cache hit-ratio

The executor's `Claude Agent SDK execution completed` line (`src/core/executor.ts`) and the `pipeline.completed` line carry the SDK token counters; the executor line additionally carries a `modelUsage` array (one `{ model, inputTokens, outputTokens, cacheReadInputTokens, cacheCreationInputTokens, costUsd }` entry per model). The same four scalar counters are persisted to the `executions` table (`input_tokens`, `output_tokens`, `cache_read_input_tokens`, `cache_creation_input_tokens`) plus a `model_usage` JSONB column (migration `016_executions_tokens.sql`), so per-installation / per-workflow / per-day aggregates can join them.

Cost alone is opaque: a 500 KB-prompt / 2-turn run and a 5 KB-prompt / 50-turn run can bill the same `costUsd`. Tokens disambiguate them, an oversized prompt is an `inputTokens` problem; a runaway tool-loop is an `outputTokens` + `numTurns` problem. The load-bearing metric for prompt-cache stability (#134) is the hit-ratio:

```
cache_read_input_tokens / (input_tokens + cache_read_input_tokens + cache_creation_input_tokens)
```

(The ratio is undefined when the denominator is zero, e.g. a dry-run that never called the model; guard against that in the query.) A high ratio on the second+ run of the same prompt shape confirms `PROMPT_CACHE_LAYOUT=cacheable` is working; a sudden drop in the per-installation cache-read share is the signature of a prompt-cache stability regression. Alert on `sum(cache_read_input_tokens) / sum(input_tokens + cache_read_input_tokens + cache_creation_input_tokens)` falling below its established baseline.

## GitHub API rate-limit fields

Expand Down
47 changes: 45 additions & 2 deletions src/core/executor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { query, type SDKResultMessage } from "@anthropic-ai/claude-agent-sdk";
import { type ModelUsage, query, type SDKResultMessage } from "@anthropic-ai/claude-agent-sdk";

import { config } from "../config";
import type { BotContext, ExecutionResult, McpServerConfig } from "../types";
import type { BotContext, ExecutionResult, McpServerConfig, ModelUsageEntry } from "../types";
import { redactSecrets } from "../utils/sanitize";

function isResultMessage(msg: unknown): msg is SDKResultMessage {
Expand Down Expand Up @@ -451,8 +451,11 @@ export async function executeAgent({
durationMs,
costUsd: result?.total_cost_usd,
numTurns: result?.num_turns,
inputTokens: result?.usage?.input_tokens,
outputTokens: result?.usage?.output_tokens,
cacheReadInputTokens: result?.usage?.cache_read_input_tokens,
cacheCreationInputTokens: result?.usage?.cache_creation_input_tokens,
modelUsage: compactModelUsage(result?.modelUsage),
promptCacheLayout: useCacheableLayout ? "cacheable" : "legacy",
},
"Claude Agent SDK execution completed",
Expand All @@ -461,6 +464,30 @@ export async function executeAgent({
return buildExecutionResult(result, durationMs);
}

/**
* Flatten the SDK's `modelUsage` map (keyed by model) into the compact
* per-model array stored on ExecutionResult and persisted as JSONB. Returns
* undefined when the SDK reported no per-model breakdown, including an empty
* map, so `modelUsage` is omitted from the log / WS / JSONB rather than
* persisted as `[]`. Sorted by model name for deterministic order, since
* object key order is insertion-defined (issue #192).
*/
function compactModelUsage(
modelUsage: Record<string, ModelUsage> | undefined,
): ModelUsageEntry[] | undefined {
if (modelUsage === undefined || Object.keys(modelUsage).length === 0) return undefined;
return Object.entries(modelUsage)
.map(([model, u]) => ({
model,
inputTokens: u.inputTokens,
outputTokens: u.outputTokens,
cacheReadInputTokens: u.cacheReadInputTokens,
cacheCreationInputTokens: u.cacheCreationInputTokens,
costUsd: u.costUSD,
}))
.sort((a, b) => a.model.localeCompare(b.model));
}

/** Build ExecutionResult from SDK output (exactOptionalPropertyTypes-safe). */
function buildExecutionResult(
result: SDKResultMessage | undefined,
Expand All @@ -477,6 +504,22 @@ function buildExecutionResult(
if (result?.num_turns !== undefined) {
executionResult.numTurns = result.num_turns;
}
if (result?.usage?.input_tokens !== undefined) {
executionResult.inputTokens = result.usage.input_tokens;
}
if (result?.usage?.output_tokens !== undefined) {
executionResult.outputTokens = result.usage.output_tokens;
}
if (result?.usage?.cache_read_input_tokens !== undefined) {
executionResult.cacheReadInputTokens = result.usage.cache_read_input_tokens;
}
if (result?.usage?.cache_creation_input_tokens !== undefined) {
executionResult.cacheCreationInputTokens = result.usage.cache_creation_input_tokens;
}
const modelUsage = compactModelUsage(result?.modelUsage);
if (modelUsage !== undefined) {
executionResult.modelUsage = modelUsage;
}
// SDK returned a non-success terminal result (e.g. error_max_turns,
// error_max_budget_usd) without throwing. Surface the subtype + any
// structured error strings the SDK emitted; the executor catch path
Expand Down
25 changes: 25 additions & 0 deletions src/core/log-fields.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@ export const PipelineStageLogSchema = z

export type PipelineStageLog = z.infer<typeof PipelineStageLogSchema>;

/**
* Shape of the terminal `pipeline.completed` line. The metric fields are
* `.optional()` because the SDK can omit them (e.g. a dry-run that never calls
* the model); pino drops the `undefined` keys. The token counters surface
* prompt size and the cache hit-ratio `cache_read / (input + cache_read +
* cache_creation)` (issue #192). `.strict()` so an emitter that adds an
* unpinned field, or mistypes one, trips the co-located test.
*/
export const PipelineCompletedLogSchema = z
.object({
event: z.literal(CORE_PIPELINE_LOG_EVENTS.completed),
success: z.boolean(),
durationMs: z.number().int().nonnegative().optional(),
costUsd: z.number().nonnegative().optional(),
numTurns: z.number().int().nonnegative().optional(),
inputTokens: z.number().int().nonnegative().optional(),
outputTokens: z.number().int().nonnegative().optional(),
cacheReadInputTokens: z.number().int().nonnegative().optional(),
cacheCreationInputTokens: z.number().int().nonnegative().optional(),
pipeline_wall_clock_ms: z.number().int().nonnegative(),
})
.strict();

export type PipelineCompletedLog = z.infer<typeof PipelineCompletedLogSchema>;

/**
* Emit a `pipeline.stage` event measuring `Date.now() - startedAt`. The child
* logger's bindings (deliveryId, owner, repo, entityNumber) are prepended by
Expand Down
4 changes: 4 additions & 0 deletions src/core/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,10 @@ export async function runPipeline(
durationMs: result.durationMs,
costUsd: result.costUsd,
numTurns: result.numTurns,
inputTokens: result.inputTokens,
outputTokens: result.outputTokens,
cacheReadInputTokens: result.cacheReadInputTokens,
cacheCreationInputTokens: result.cacheCreationInputTokens,
pipeline_wall_clock_ms: Date.now() - pipelineStartedAt,
},
"Request processing completed",
Expand Down
10 changes: 10 additions & 0 deletions src/daemon/job-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,16 @@ export async function executeJob(
durationMs: Date.now() - job.startedAt,
costUsd: result.costUsd,
numTurns: result.numTurns,
// SDK token usage for executions-row persistence (issue #192).
...(result.inputTokens !== undefined ? { inputTokens: result.inputTokens } : {}),
...(result.outputTokens !== undefined ? { outputTokens: result.outputTokens } : {}),
...(result.cacheReadInputTokens !== undefined
? { cacheReadInputTokens: result.cacheReadInputTokens }
: {}),
...(result.cacheCreationInputTokens !== undefined
? { cacheCreationInputTokens: result.cacheCreationInputTokens }
: {}),
...(result.modelUsage !== undefined ? { modelUsage: [...result.modelUsage] } : {}),
...(result.success ? {} : { errorMessage: "Pipeline completed with failure" }),
...(learnings.length > 0 ? { learnings } : {}),
...(deletions.length > 0 ? { deletions } : {}),
Expand Down
24 changes: 24 additions & 0 deletions src/db/migrations/016_executions_tokens.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- Migration 016: persist Claude Agent SDK token usage on executions (issue #192).
--
-- The executor reads input_tokens / output_tokens / cache_* counters and the
-- per-model modelUsage breakdown from SDKResultMessage.usage, but only
-- cost_usd / duration_ms / num_turns were persisted. Without the token counts
-- an operator cannot tell a 500 KB-prompt / 2-turn run from a 5 KB-prompt /
-- 50-turn run (identical cost_usd), nor compute the prompt-cache hit-ratio
-- cache_read / (input + cache_read + cache_creation)
-- that is the load-bearing signal for prompt-cache stability (#134).
--
-- All columns are nullable and additive: pre-existing rows stay NULL and the
-- existing dispatch_stats aggregates are unaffected.

-- BIGINT (not INTEGER): SDKResultMessage.usage is CUMULATIVE for the whole
-- session, and cache_read_input_tokens in particular accumulates as
-- cached-prompt-size x turns. A long run with a raised turn/budget cap can
-- exceed INTEGER's 2.1B ceiling, which would make the markExecutionCompleted
-- UPDATE throw `integer out of range`. BIGINT (~9.2e18) removes that ceiling.
ALTER TABLE executions
ADD COLUMN input_tokens BIGINT NULL,
ADD COLUMN output_tokens BIGINT NULL,
ADD COLUMN cache_read_input_tokens BIGINT NULL,
ADD COLUMN cache_creation_input_tokens BIGINT NULL,
ADD COLUMN model_usage JSONB NULL;
26 changes: 25 additions & 1 deletion src/orchestrator/connection-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
WS_CLOSE_CODES,
WS_ERROR_CODES,
} from "../shared/ws-messages";
import type { ModelUsageEntry } from "../types";
import { decrementActiveCount, incrementActiveCount } from "./concurrency";
import {
decrementDaemonActiveJobs,
Expand Down Expand Up @@ -1402,14 +1403,37 @@ async function finalizeExecution(
costUsd?: number | undefined;
durationMs?: number | undefined;
numTurns?: number | undefined;
inputTokens?: number | undefined;
outputTokens?: number | undefined;
cacheReadInputTokens?: number | undefined;
cacheCreationInputTokens?: number | undefined;
modelUsage?: readonly ModelUsageEntry[] | undefined;
errorMessage?: string | undefined;
},
): Promise<void> {
if (payload.success) {
const result: { costUsd?: number; durationMs?: number; numTurns?: number } = {};
const result: {
costUsd?: number;
durationMs?: number;
numTurns?: number;
inputTokens?: number;
outputTokens?: number;
cacheReadInputTokens?: number;
cacheCreationInputTokens?: number;
modelUsage?: readonly ModelUsageEntry[];
} = {};
if (payload.costUsd !== undefined) result.costUsd = payload.costUsd;
if (payload.durationMs !== undefined) result.durationMs = payload.durationMs;
if (payload.numTurns !== undefined) result.numTurns = payload.numTurns;
if (payload.inputTokens !== undefined) result.inputTokens = payload.inputTokens;
if (payload.outputTokens !== undefined) result.outputTokens = payload.outputTokens;
if (payload.cacheReadInputTokens !== undefined) {
result.cacheReadInputTokens = payload.cacheReadInputTokens;
}
if (payload.cacheCreationInputTokens !== undefined) {
result.cacheCreationInputTokens = payload.cacheCreationInputTokens;
}
if (payload.modelUsage !== undefined) result.modelUsage = payload.modelUsage;
await markExecutionCompleted(deliveryId, result);
} else {
await markExecutionFailed(deliveryId, payload.errorMessage ?? "Execution failed on daemon");
Expand Down
20 changes: 18 additions & 2 deletions src/orchestrator/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { config } from "../config";
import { getDb } from "../db";
import { logger } from "../logger";
import type { SerializableBotContext } from "../shared/daemon-types";
import type { ModelUsageEntry } from "../types";
import { decrementDaemonActiveJobs } from "./daemon-registry";

/**
Expand Down Expand Up @@ -158,17 +159,32 @@ export async function markExecutionRunning(deliveryId: string): Promise<void> {
*/
export async function markExecutionCompleted(
deliveryId: string,
result: { costUsd?: number; durationMs?: number; numTurns?: number },
result: {
costUsd?: number;
durationMs?: number;
numTurns?: number;
inputTokens?: number;
outputTokens?: number;
cacheReadInputTokens?: number;
cacheCreationInputTokens?: number;
modelUsage?: readonly ModelUsageEntry[];
},
): Promise<void> {
const db = getDb();
if (db === null) return;

// Bun.sql serialises the modelUsage array to JSONB automatically.
await db`
UPDATE executions
SET status = 'completed', completed_at = now(),
cost_usd = ${result.costUsd ?? null},
duration_ms = ${result.durationMs ?? null},
num_turns = ${result.numTurns ?? null}
num_turns = ${result.numTurns ?? null},
input_tokens = ${result.inputTokens ?? null},
output_tokens = ${result.outputTokens ?? null},
cache_read_input_tokens = ${result.cacheReadInputTokens ?? null},
cache_creation_input_tokens = ${result.cacheCreationInputTokens ?? null},
model_usage = ${result.modelUsage ?? null}
WHERE delivery_id = ${deliveryId} AND status = 'running'
`;
}
Expand Down
17 changes: 17 additions & 0 deletions src/shared/ws-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,23 @@ const jobResultSchema = z.object({
costUsd: z.number().nonnegative().optional(),
durationMs: z.number().int().nonnegative().optional(),
numTurns: z.number().int().nonnegative().optional(),
// SDK token usage, persisted to the executions row (issue #192).
inputTokens: z.number().int().nonnegative().optional(),
outputTokens: z.number().int().nonnegative().optional(),
cacheReadInputTokens: z.number().int().nonnegative().optional(),
cacheCreationInputTokens: z.number().int().nonnegative().optional(),
modelUsage: z
.array(
z.object({
model: z.string().min(1),
inputTokens: z.number().int().nonnegative(),
outputTokens: z.number().int().nonnegative(),
cacheReadInputTokens: z.number().int().nonnegative(),
cacheCreationInputTokens: z.number().int().nonnegative(),
costUsd: z.number().nonnegative(),
}),
)
.optional(),
errorMessage: z.string().optional(),
dryRun: z.boolean().optional(),
learnings: z.array(z.object({ category: z.string(), content: z.string() })).optional(),
Expand Down
24 changes: 24 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ export interface BotContext {
log: Logger;
}

/**
* Per-model token + cost breakdown, derived from SDKResultMessage.modelUsage
* (issue #192). One entry per model a single execution touched; useful when a
* path runs more than one model (e.g. Haiku subagents + an Opus main agent).
*/
export interface ModelUsageEntry {
model: string;
inputTokens: number;
outputTokens: number;
cacheReadInputTokens: number;
cacheCreationInputTokens: number;
costUsd: number;
}

/**
* Result from a Claude Agent SDK execution.
*/
Expand All @@ -95,6 +109,16 @@ export interface ExecutionResult {
durationMs?: number;
/** Number of agent turns used */
numTurns?: number;
/** Prompt (input) tokens billed for this execution (SDKResultMessage.usage). */
inputTokens?: number;
/** Completion (output) tokens billed for this execution. */
outputTokens?: number;
/** Tokens served from the prompt cache (the cache-savings numerator). */
cacheReadInputTokens?: number;
/** Tokens written to populate the prompt cache (the 2x-price surcharge). */
cacheCreationInputTokens?: number;
/** Per-model token + cost breakdown; logged in full and persisted as JSONB. */
modelUsage?: readonly ModelUsageEntry[];
/** When true, indicates this was a dry-run (no Claude execution) */
dryRun?: boolean;
/** Daemon actions collected from execution (learnings and deletions from .daemon-actions.json) */
Expand Down
Loading
Loading