From 958e96680a886b24a1ab55f530d7de95a959f6c7 Mon Sep 17 00:00:00 2001 From: sasha Date: Fri, 12 Jun 2026 01:45:06 +0300 Subject: [PATCH] feat: per-session usage/cost aggregation helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consumers migrating from bespoke chat UIs showed per-session usage (tokens, cache, cost, context %) by reading Pi session JSONL files from disk. That path died with the containerized agent-server: sessions now live inside the container, so every consumer renders zeros. The data was already on the wire — AssistantMessage.usage arrives via both getSessionMessages and message_end SSE events — but agent-client had no notion of usage at all. Add a pure aggregateSessionUsage(messages, {contextWindow, costRates}) fold over the transcript: token totals, cost (recalculated from consumer-supplied per-million rates when LiteLLM-routed models report zero cost), cache-hit rate, and context-window utilization anchored on the last clean assistant turn (aborted/errored turns still count toward spend but not context; compaction marks context unknown until the next assistant turn re-measures it). Closes #2. Co-Authored-By: Claude Fable 5 --- src/core/__tests__/usage.test.ts | 247 +++++++++++++++++++++++++++++++ src/core/usage.ts | 232 +++++++++++++++++++++++++++++ src/index.ts | 9 ++ 3 files changed, 488 insertions(+) create mode 100644 src/core/__tests__/usage.test.ts create mode 100644 src/core/usage.ts diff --git a/src/core/__tests__/usage.test.ts b/src/core/__tests__/usage.test.ts new file mode 100644 index 0000000..13e7da2 --- /dev/null +++ b/src/core/__tests__/usage.test.ts @@ -0,0 +1,247 @@ +import { describe, expect, it } from "vitest"; +import type { AgentMessage } from "../types"; +import { + aggregateSessionUsage, + emptySessionUsageMetrics, + type UsageCostRates, +} from "../usage"; + +type AssistantOverrides = { + usage?: Partial["usage"]>>; + cost?: Partial["usage"]["cost"]>; + stopReason?: Extract["stopReason"]; + content?: Extract["content"]; + provider?: string; + model?: string; +}; + +function assistant(overrides: AssistantOverrides = {}): AgentMessage { + return { + role: "assistant", + content: overrides.content ?? [], + api: "openai-completions", + provider: overrides.provider ?? "litellm", + model: overrides.model ?? "openai/gpt-5.5", + stopReason: overrides.stopReason ?? "stop", + timestamp: 1, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + ...overrides.usage, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + ...overrides.cost, + }, + }, + }; +} + +function user(text: string): AgentMessage { + return { role: "user", content: text, timestamp: 1 }; +} + +function toolResult(text: string): AgentMessage { + return { + role: "toolResult", + toolCallId: "t1", + toolName: "read", + content: [{ type: "text", text }], + isError: false, + timestamp: 1, + }; +} + +const gpt55Rates: UsageCostRates = { + input: 1.25, + output: 10, + cacheRead: 0.125, + cacheWrite: 0, +}; + +describe("emptySessionUsageMetrics", () => { + it("returns zeroed metrics with a full context window", () => { + const metrics = emptySessionUsageMetrics(128_000); + + expect(metrics.tokens).toEqual({ + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }); + expect(metrics.cost.total).toBe(0); + expect(metrics.context).toEqual({ + tokens: 0, + contextWindow: 128_000, + percent: 0, + remainingTokens: 128_000, + remainingPercent: 100, + }); + expect(metrics.cacheHitPercent).toBe(0); + expect(metrics.modelRef).toBeNull(); + }); +}); + +describe("aggregateSessionUsage", () => { + it("returns empty metrics for an empty history", () => { + const metrics = aggregateSessionUsage([], { contextWindow: 128_000 }); + + expect(metrics.tokens.total).toBe(0); + expect(metrics.context.contextWindow).toBe(128_000); + expect(metrics.cacheHitPercent).toBe(0); + }); + + it("aggregates tokens, cost and message counts from assistant messages", () => { + const messages: AgentMessage[] = [ + user("Build a dashboard"), + assistant({ + content: [{ type: "toolCall", id: "t1", name: "read", arguments: {} }], + usage: { input: 100, output: 40, cacheRead: 900, cacheWrite: 20, totalTokens: 1060 }, + cost: { input: 0.0001, output: 0.0004, cacheRead: 0.00009, cacheWrite: 0, total: 0.00059 }, + }), + toolResult("file contents"), + ]; + + const metrics = aggregateSessionUsage(messages, { contextWindow: 128_000 }); + + expect(metrics.tokens).toEqual({ + input: 100, + output: 40, + cacheRead: 900, + cacheWrite: 20, + total: 1060, + }); + expect(metrics.cost.total).toBeCloseTo(0.00059); + expect(metrics.cacheHitPercent).toBeCloseTo((900 / 1020) * 100); + expect(metrics.userMessages).toBe(1); + expect(metrics.assistantMessages).toBe(1); + expect(metrics.toolCalls).toBe(1); + expect(metrics.toolResults).toBe(1); + expect(metrics.context.tokens).toBeGreaterThan(1060); + expect(metrics.context.remainingPercent).toBeLessThan(100); + }); + + it("recalculates cost from rates when the wire cost is zero", () => { + const messages: AgentMessage[] = [ + assistant({ + usage: { input: 1_000_000, output: 100_000, cacheRead: 500_000, totalTokens: 1_600_000 }, + }), + ]; + + const metrics = aggregateSessionUsage(messages, { + contextWindow: 2_000_000, + costRates: gpt55Rates, + }); + + expect(metrics.cost.input).toBeCloseTo(1.25); + expect(metrics.cost.output).toBeCloseTo(1); + expect(metrics.cost.cacheRead).toBeCloseTo(0.0625); + expect(metrics.cost.total).toBeCloseTo(2.3125); + }); + + it("prefers the wire cost over rates when it is non-zero", () => { + const messages: AgentMessage[] = [ + assistant({ + usage: { input: 1_000_000, totalTokens: 1_000_000 }, + cost: { input: 0.5, total: 0.5 }, + }), + ]; + + const metrics = aggregateSessionUsage(messages, { costRates: gpt55Rates }); + + expect(metrics.cost.total).toBeCloseTo(0.5); + }); + + it("derives context from the last assistant usage plus trailing message estimates", () => { + const messages: AgentMessage[] = [ + user("root"), + assistant({ usage: { input: 10, output: 5, totalTokens: 15 } }), + assistant({ usage: { input: 20, output: 10, totalTokens: 30 } }), + ]; + + const metrics = aggregateSessionUsage(messages, { contextWindow: 100 }); + + expect(metrics.context.tokens).toBe(30); + expect(metrics.context.percent).toBe(30); + expect(metrics.context.remainingTokens).toBe(70); + expect(metrics.context.remainingPercent).toBe(70); + }); + + it("estimates trailing messages that follow the last assistant usage", () => { + const messages: AgentMessage[] = [ + assistant({ usage: { input: 10, output: 5, totalTokens: 15 } }), + user("x".repeat(400)), + ]; + + const metrics = aggregateSessionUsage(messages, { contextWindow: 1_000 }); + + expect(metrics.context.tokens).toBe(15 + 100); + }); + + it("ignores aborted and errored assistant messages for the context anchor", () => { + const messages: AgentMessage[] = [ + assistant({ usage: { input: 10, output: 5, totalTokens: 15 } }), + assistant({ usage: { input: 90, output: 5, totalTokens: 95 }, stopReason: "aborted" }), + ]; + + const metrics = aggregateSessionUsage(messages, { contextWindow: 100 }); + + // Spend still counts both turns; context anchors on the last clean turn. + expect(metrics.tokens.total).toBe(110); + expect(metrics.context.tokens).toBe(15); + }); + + it("marks context as unknown after compaction until the next assistant usage", () => { + const messages: AgentMessage[] = [ + user("old"), + assistant({ usage: { input: 80, output: 10, totalTokens: 90 } }), + { role: "compactionSummary", summary: "compacted", tokensBefore: 90, timestamp: 2 }, + user("after"), + ]; + + const metrics = aggregateSessionUsage(messages, { contextWindow: 100 }); + + expect(metrics.context.tokens).toBeNull(); + expect(metrics.context.percent).toBeNull(); + }); + + it("restores context once an assistant turn lands after compaction", () => { + const messages: AgentMessage[] = [ + assistant({ usage: { input: 80, output: 10, totalTokens: 90 } }), + { role: "compactionSummary", summary: "compacted", tokensBefore: 90, timestamp: 2 }, + assistant({ usage: { input: 20, output: 5, totalTokens: 25 } }), + ]; + + const metrics = aggregateSessionUsage(messages, { contextWindow: 100 }); + + expect(metrics.context.tokens).toBe(25); + }); + + it("reports an unknown context when no window is provided", () => { + const metrics = aggregateSessionUsage([assistant({ usage: { input: 10, totalTokens: 10 } })]); + + expect(metrics.context).toEqual({ + tokens: null, + contextWindow: 0, + percent: null, + remainingTokens: null, + remainingPercent: null, + }); + }); + + it("takes the model ref from the latest assistant message", () => { + const messages: AgentMessage[] = [ + assistant({ provider: "litellm", model: "openai/gpt-5.5" }), + assistant({ provider: "codex-proxy", model: "openai/gpt-5.5" }), + ]; + + expect(aggregateSessionUsage(messages).modelRef).toBe("codex-proxy/openai/gpt-5.5"); + }); +}); diff --git a/src/core/usage.ts b/src/core/usage.ts new file mode 100644 index 0000000..2787161 --- /dev/null +++ b/src/core/usage.ts @@ -0,0 +1,232 @@ +/** + * Per-session usage/cost aggregation. + * + * A pure fold over the transcript returned by `getSessionMessages` (or kept + * up to date from `message_end` wire events): token totals, monetary cost, + * cache-hit rate and context-window utilization. + * + * Cost normally comes straight off the wire (`AssistantMessage.usage.cost`), + * but custom LiteLLM-routed models often report zero cost. Consumers can pass + * per-million-token `costRates` to recalculate in that case; the wire cost + * wins whenever it is non-zero. + */ +import type { AgentMessage } from "./types"; + +type AssistantMessage = Extract; +type WireUsage = AssistantMessage["usage"]; + +export type UsageTokens = { + input: number; + output: number; + cacheRead: number; + cacheWrite: number; + total: number; +}; + +export type UsageCost = { + input: number; + output: number; + cacheRead: number; + cacheWrite: number; + total: number; +}; + +/** USD per million tokens, used to recalculate cost when the wire reports zero. */ +export type UsageCostRates = Omit; + +export type ContextUsage = { + /** Estimated tokens currently in the context window; null when unknown. */ + tokens: number | null; + contextWindow: number; + percent: number | null; + remainingTokens: number | null; + remainingPercent: number | null; +}; + +export type SessionUsageMetrics = { + /** `provider/model` of the latest assistant turn; null before the first one. */ + modelRef: string | null; + tokens: UsageTokens; + cost: UsageCost; + context: ContextUsage; + cacheHitPercent: number; + userMessages: number; + assistantMessages: number; + toolCalls: number; + toolResults: number; +}; + +export type AggregateSessionUsageOptions = { + contextWindow?: number; + costRates?: UsageCostRates; +}; + +const emptyTokens: UsageTokens = { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }; +const emptyCost: UsageCost = { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }; + +export function emptySessionUsageMetrics(contextWindow = 0): SessionUsageMetrics { + return { + modelRef: null, + tokens: { ...emptyTokens }, + cost: { ...emptyCost }, + context: { + tokens: 0, + contextWindow, + percent: 0, + remainingTokens: contextWindow, + remainingPercent: 100, + }, + cacheHitPercent: 0, + userMessages: 0, + assistantMessages: 0, + toolCalls: 0, + toolResults: 0, + }; +} + +function hasCost(cost: UsageCost): boolean { + return ( + cost.input > 0 || cost.output > 0 || cost.cacheRead > 0 || cost.cacheWrite > 0 || cost.total > 0 + ); +} + +function costFromRates(tokens: UsageTokens, rates: UsageCostRates | undefined): UsageCost { + if (!rates) return { ...emptyCost }; + + const input = (tokens.input / 1_000_000) * rates.input; + const output = (tokens.output / 1_000_000) * rates.output; + const cacheRead = (tokens.cacheRead / 1_000_000) * rates.cacheRead; + const cacheWrite = (tokens.cacheWrite / 1_000_000) * rates.cacheWrite; + return { input, output, cacheRead, cacheWrite, total: input + output + cacheRead + cacheWrite }; +} + +function contextTokens(usage: WireUsage): number { + return usage.totalTokens || usage.input + usage.output + usage.cacheRead + usage.cacheWrite; +} + +/** A finished turn whose usage reflects the real context size. */ +function isCleanAssistantTurn(message: AssistantMessage): boolean { + return message.stopReason !== "aborted" && message.stopReason !== "error"; +} + +function contentCharCount(content: unknown): number { + if (typeof content === "string") return content.length; + if (!Array.isArray(content)) return 0; + + return content.reduce((total, block) => { + if (!block || typeof block !== "object") return total; + const item = block as Record; + if (typeof item.text === "string") return total + item.text.length; + if (typeof item.thinking === "string") return total + item.thinking.length; + if (item.type === "image") return total + 4_800; + if (typeof item.arguments === "string") return total + item.arguments.length; + if (item.arguments && typeof item.arguments === "object") { + return total + JSON.stringify(item.arguments).length; + } + return total; + }, 0); +} + +function estimateTokens(message: AgentMessage): number { + const content = "content" in message ? message.content : undefined; + return Math.ceil(contentCharCount(content) / 4); +} + +function lastCleanAssistantIndex(messages: AgentMessage[]): number { + for (let index = messages.length - 1; index >= 0; index -= 1) { + const message = messages[index]; + if (message?.role === "assistant" && isCleanAssistantTurn(message)) return index; + } + return -1; +} + +function lastCompactionIndex(messages: AgentMessage[]): number { + for (let index = messages.length - 1; index >= 0; index -= 1) { + if (messages[index]?.role === "compactionSummary") return index; + } + return -1; +} + +function unknownContext(contextWindow: number): ContextUsage { + return { + tokens: null, + contextWindow, + percent: null, + remainingTokens: null, + remainingPercent: null, + }; +} + +function contextUsage(messages: AgentMessage[], contextWindow: number): ContextUsage { + if (contextWindow <= 0) return unknownContext(0); + + const anchorIndex = lastCleanAssistantIndex(messages); + // After a compaction the next assistant turn re-measures the window; until + // then any previous usage overstates it, so report the context as unknown. + if (lastCompactionIndex(messages) > anchorIndex) return unknownContext(contextWindow); + + const anchor = anchorIndex >= 0 ? (messages[anchorIndex] as AssistantMessage) : undefined; + const trailing = messages.slice(anchorIndex + 1); + const tokens = + (anchor ? contextTokens(anchor.usage) : 0) + + trailing.reduce((total, message) => total + estimateTokens(message), 0); + const percent = (tokens / contextWindow) * 100; + + return { + tokens, + contextWindow, + percent, + remainingTokens: Math.max(0, contextWindow - tokens), + remainingPercent: Math.max(0, 100 - percent), + }; +} + +export function aggregateSessionUsage( + messages: AgentMessage[], + options: AggregateSessionUsageOptions = {}, +): SessionUsageMetrics { + const contextWindow = options.contextWindow ?? 0; + const tokens = { ...emptyTokens }; + const cost = { ...emptyCost }; + let modelRef: string | null = null; + let userMessages = 0; + let assistantMessages = 0; + let toolCalls = 0; + let toolResults = 0; + + for (const message of messages) { + if (message.role === "user") userMessages += 1; + if (message.role === "toolResult") toolResults += 1; + if (message.role !== "assistant") continue; + + assistantMessages += 1; + modelRef = `${message.provider}/${message.model}`; + toolCalls += message.content.filter((block) => block.type === "toolCall").length; + + const { usage } = message; + tokens.input += usage.input; + tokens.output += usage.output; + tokens.cacheRead += usage.cacheRead; + tokens.cacheWrite += usage.cacheWrite; + cost.input += usage.cost.input; + cost.output += usage.cost.output; + cost.cacheRead += usage.cost.cacheRead; + cost.cacheWrite += usage.cost.cacheWrite; + cost.total += usage.cost.total; + } + + tokens.total = tokens.input + tokens.output + tokens.cacheRead + tokens.cacheWrite; + const promptTokens = tokens.input + tokens.cacheRead + tokens.cacheWrite; + + return { + modelRef, + tokens, + cost: hasCost(cost) ? cost : costFromRates(tokens, options.costRates), + context: contextUsage(messages, contextWindow), + cacheHitPercent: promptTokens > 0 ? (tokens.cacheRead / promptTokens) * 100 : 0, + userMessages, + assistantMessages, + toolCalls, + toolResults, + }; +} diff --git a/src/index.ts b/src/index.ts index b2d65cc..9917e89 100644 --- a/src/index.ts +++ b/src/index.ts @@ -52,6 +52,15 @@ export type { SessionState, SessionStatus, } from './core/types'; +export { aggregateSessionUsage, emptySessionUsageMetrics } from './core/usage'; +export type { + SessionUsageMetrics, + UsageTokens, + UsageCost, + UsageCostRates, + ContextUsage, + AggregateSessionUsageOptions, +} from './core/usage'; export type { components as AgentServerSchema, paths as AgentServerPaths } from './core/agent-server.generated'; // React layer --------------------------------------------------------------