Skip to content

feat: Invocation metrics collection with JSONL persistence #383

@jafreck

Description

@jafreck

Summary

Add an InvocationMetricsCollector to the framework that accumulates per-invocation metrics in memory, persists them as append-only JSONL, supports resume-aware offset tracking, and computes aggregate summaries.

Motivation

The framework has TokenTracker for budget management but no general observability layer for invocation-level metrics. Consumers need post-run analytics: "Which agents were slowest? What was peak parallelism? How many retries were infrastructure vs. code errors? What was the cost breakdown by phase?"

AAMF implements a MetricsCollector with:

  • Append-only JSONL persistence (survives crashes, resume-aware)
  • Per-invocation records: agent, phase, task, duration, tokens, cost, exit code, retry count, model tier, output parsed, cached tokens, premium requests
  • Aggregate computation: totals, breakdowns by agent/phase, retry rates, parallelism-over-time buckets, escalation counts
  • Resume-aware offset tracking: on resume, skip already-written records based on a count stored in checkpoint

This is valuable for any multi-agent system and doesn't exist in the framework today.

Proposed API

InvocationMetric Record

interface InvocationMetric {
  /** Unique invocation ID. */
  invocationId: string;
  /** Agent name. */
  agent: string;
  /** Phase/stage number. */
  phase: number;
  /** Task/session ID within the phase. */
  taskId?: string;
  /** Wall-clock duration in milliseconds. */
  durationMs: number;
  /** Total tokens consumed. */
  tokens: number;
  /** Input tokens (when available). */
  inputTokens?: number;
  /** Output tokens (when available). */
  outputTokens?: number;
  /** Cached/prompt-cache input tokens. */
  cachedTokens?: number;
  /** Estimated cost in USD. */
  cost: number;
  /** Process exit code. */
  exitCode: number;
  /** Was the invocation successful? */
  success: boolean;
  /** Which retry attempt this was (1 = first try). */
  attempt: number;
  /** Was the output successfully parsed against the schema? */
  outputParsed: boolean;
  /** Model used for this invocation. */
  model?: string;
  /** Model routing tier (if model routing is in use). */
  tier?: string;
  /** ISO timestamp of invocation start. */
  timestamp: string;
  /** Infrastructure error classification, if the invocation failed with an infra error. */
  errorClass?: string;
}

MetricsCollector

class InvocationMetricsCollector {
  constructor(metricsDir: string, logger: LoggerLike);

  /** Record a single invocation metric. Appends to JSONL immediately. */
  record(metric: InvocationMetric): Promise<void>;

  /** Get all recorded metrics (from memory). */
  getAll(): InvocationMetric[];

  /** Get the count of recorded metrics (for checkpoint offset tracking). */
  getCount(): number;

  /**
   * Load metrics from persisted JSONL, optionally skipping the first N
   * records (for resume — those are already in-memory from the previous run's
   * checkpoint offset).
   */
  loadFromDisk(skipCount?: number): Promise<void>;

  /** Compute aggregate summary from all recorded metrics. */
  computeAggregate(): MetricsAggregate;
}

interface MetricsAggregate {
  totalInvocations: number;
  invocationsByAgent: Record<string, number>;
  invocationsByPhase: Record<number, number>;

  totalRetries: number;
  retriesByAgent: Record<string, number>;
  retriesByPhase: Record<number, number>;
  infraRetries: number;

  totalTokens: number;
  totalCost: number;
  totalCachedTokens: number;
  tokensByAgent: Record<string, number>;
  costByAgent: Record<string, number>;

  totalDurationMs: number;
  avgDurationByAgent: Record<string, number>;

  peakParallelInvocations: number;
  parallelismOverTime: Array<{ bucketStart: string; concurrent: number }>;

  escalationCount: number;
  successRate: number;
}

Integration Points

  • CheckpointState: Add metricsCount?: number to checkpoint state so resume knows how many JSONL records to skip
  • FleetEventBus: Emit a metric-recorded event after each record() call for real-time observability
  • ReportGenerator: computeAggregate() output is designed to feed directly into report generation

Implementation Notes

  • JSONL format (one JSON object per line) is chosen for crash safety — partial writes corrupt at most the last line, which can be discarded on load
  • record() does both in-memory append and async file append; the file write is fire-and-forget with a write queue to prevent interleaving
  • Parallelism-over-time is computed by bucketing invocation start/end timestamps into fixed-width time windows and counting overlaps
  • loadFromDisk parses the JSONL file line by line, skipping malformed lines with a warning
  • The metrics directory is separate from the checkpoint directory — metrics are append-only and never rewritten, while checkpoints are atomically replaced

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions