Skip to content

Latest commit

Β 

History

History
695 lines (536 loc) Β· 16.5 KB

File metadata and controls

695 lines (536 loc) Β· 16.5 KB

TraceFlow SDK v2

πŸš€ Production-ready, stateless SDK for distributed tracing

Trace your distributed systems with confidence using HTTP or Kafka transport.

✨ Features

  • πŸ“¦ Stateless - No Redis, no databases, pure event streaming
  • πŸ”€ Transport Agnostic - Use HTTP REST API or Kafka, same API
  • 🧡 Context-Aware - Automatic context propagation using AsyncLocalStorage
  • πŸ”„ Retry Logic - Built-in exponential backoff and configurable circuit breaker
  • πŸ›‘οΈ Production-Ready - Never fails your app, always safe
  • 🎯 Type-Safe - Full TypeScript support
  • πŸ“Š Ordering Guarantees - Kafka partitioning by trace_id
  • 🌊 Async-First - Works seamlessly across async boundaries
  • ☸️ Kubernetes-Ready - Graceful shutdown and auto-cleanup
  • πŸ“ Event-Based - Append-only event model
  • πŸ”Œ Framework Middleware - Built-in Express and Fastify middleware for automatic request tracing
  • πŸ₯ Health Check - Built-in connectivity verification

πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Your App      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ TraceFlow β”‚  β”‚
β”‚  β”‚    SDK    β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
    β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
    β”‚         β”‚
β”Œβ”€β”€β”€β–Όβ”€β”€β”€β” β”Œβ”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
β”‚  HTTP β”‚ β”‚  Kafka  β”‚
β”‚   API β”‚ β”‚  Topic  β”‚
β””β”€β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
    β”‚        β”‚
β”Œβ”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”
β”‚  TraceFlow     β”‚
β”‚   Service      β”‚
β”‚                β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚  ScyllaDB  β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚   Redis    β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“¦ Installation

npm install @dev.smartpricing/traceflow-sdk

Optional: For Kafka transport

npm install @confluentinc/kafka-javascript

πŸš€ Quick Start

HTTP Transport

import { TraceFlowSDK } from '@dev.smartpricing/traceflow-sdk';

const sdk = new TraceFlowSDK({
  transport: 'http',
  source: 'my-service',
  endpoint: 'http://localhost:3009',
  apiKey: 'your-api-key', // Optional
});

// Start a trace
const trace = await sdk.startTrace({
  trace_type: 'api_request',
  title: 'Process User Request',
});

// Start a step
const step = await trace.startStep({
  name: 'Validate Input',
  input: { userId: 123 },
});

// Log
await step.log('Validation successful');

// Finish step
await step.finish({ output: { valid: true } });

// Finish trace
await trace.finish({ result: { success: true } });

// Cleanup
await sdk.shutdown();

Kafka Transport

import { TraceFlowSDK } from '@dev.smartpricing/traceflow-sdk';

const sdk = new TraceFlowSDK({
  transport: 'kafka',
  source: 'my-service',
  kafka: {
    brokers: ['localhost:9092'],
    clientId: 'my-service',
    topic: 'traceflow-events',
  },
});

// Same API as HTTP!
const trace = await sdk.startTrace({
  trace_type: 'background_job',
  title: 'Process Data',
});

await trace.finish();

🎯 Key Concepts

1. Automatic Context Management

Use runWithTrace to automatically manage trace lifecycle:

await sdk.runWithTrace(
  {
    trace_type: 'data_sync',
    title: 'Sync Users',
  },
  async () => {
    // Trace context is automatically available
    const step = await sdk.startStep({ name: 'Fetch Data' });
    
    // Do work...
    await step.finish();
    
    // Trace auto-completes on return
    return { synced: 100 };
  }
);

2. Nested Operations

Steps and traces work across async boundaries:

async function processOrder(orderId: string) {
  const step = await sdk.startStep({
    name: 'Process Order',
    input: { orderId },
  });

  try {
    await validateOrder(orderId); // Can start substeps
    await chargePayment(orderId); // Can start substeps
    await shipOrder(orderId);     // Can start substeps
    
    await step.finish({ output: { status: 'shipped' } });
  } catch (error) {
    await step.fail(error);
    throw error;
  }
}

3. Cross-Service Tracing

Share trace context across services:

// Service A: Start trace
const trace = await sdk.startTrace({
  trace_type: 'user_registration',
});

// Call Service B with trace ID
await fetch('https://service-b/api/endpoint', {
  headers: {
    'X-Trace-Id': trace.trace_id,
  },
});

// Service B: Retrieve same trace
const traceId = req.headers['x-trace-id'];

const trace = await sdk.getTrace(traceId);
await trace.startStep({ name: 'Send Email' });

4. Hybrid Pattern: Context + Manual Access

Mix automatic context with manual trace access:

// Middleware: Start trace with custom ID
app.use(async (req, res, next) => {
  const traceId = req.headers['x-trace-id'] || uuidv4();
  
  await sdk.startTrace({ 
    trace_id: traceId,
    title: `${req.method} ${req.path}` 
  });
  
  req.traceId = traceId;
  next();
});

// Controller: Retrieve trace by ID
app.get('/users/:id', async (req, res) => {
  const trace = await sdk.getTrace(req.traceId);
  
  const step = await trace.startStep({ name: 'Fetch User' });
  const user = await getUserFromDB(req.params.id);
  await step.finish({ output: user });
  
  await trace.finish({ result: user });
  res.json(user);
});

// Service Layer: Deep in your code
class UserService {
  async getUser(userId: string, traceId: string) {
    // Retrieve same trace
    const trace = await sdk.getTrace(traceId);
    await trace.log('Querying database...');
    // ...
  }
}

// Or use context (if inside runWithTrace)
async function anyFunction() {
  const trace = sdk.getCurrentTrace();
  if (trace) {
    await trace.log('Has access from context!');
  }
}

5. Long-Running Processes

Prevent timeout with heartbeats:

const trace = await sdk.startTrace({
  trace_id: 'batch-job-123',
  title: 'Long Batch Job',
});

// Send heartbeat every minute
const heartbeatInterval = setInterval(() => {
  sdk.heartbeat('batch-job-123');
}, 60000);

// Do long work...
for (let i = 0; i < 100; i++) {
  await processBatch(i);
}

clearInterval(heartbeatInterval);
await trace.finish();

6. Error Handling

SDK never throws due to tracing failures:

const sdk = new TraceFlowSDK({
  transport: 'http',
  source: 'my-service',
  endpoint: 'http://localhost:3009',
  silentErrors: true, // Default: true - never throws
});

// Even if tracing fails, your app continues
const trace = await sdk.startTrace({ title: 'My Trace' });
// βœ… Always returns a valid handle, never throws

πŸ”§ Configuration

Full Configuration Options

interface TraceFlowSDKConfig {
  // Transport
  transport: 'http' | 'kafka';
  source: string;
  
  // HTTP options
  endpoint?: string;
  apiKey?: string;
  username?: string;
  password?: string;
  timeout?: number; // Default: 5000ms
  
  // Kafka options
  kafka?: {
    brokers: string[];
    clientId?: string;
    topic?: string; // Default: 'traceflow-events'
    sasl?: {
      mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
      username: string;
      password: string;
    };
    ssl?: boolean | object;
  };
  
  // Retry & reliability
  maxRetries?: number; // Default: 3
  retryDelay?: number; // Default: 1000ms
  enableCircuitBreaker?: boolean; // Default: true
  circuitBreakerThreshold?: number; // Default: 5 (failures before opening)
  circuitBreakerTimeout?: number; // Default: 60000ms (time before half-open)

  // Behavior
  autoFlushOnExit?: boolean; // Default: true
  flushTimeoutMs?: number; // Default: 5000ms
  silentErrors?: boolean; // Default: true

  // Logging
  enableLogging?: boolean; // Default: true
  logLevel?: 'debug' | 'info' | 'warn' | 'error'; // Default: 'info'
  logger?: { debug, info, warn, error }; // Custom logger
}

πŸ“š API Reference

SDK Methods

startTrace(options): Promise<TraceHandle>

Start a new trace. If trace_id is provided, it's idempotent.

const trace = await sdk.startTrace({
  trace_id?: string;        // Optional: Custom trace ID (idempotent if provided)
  trace_type?: string;      // Type of trace
  title?: string;           // Human-readable title
  description?: string;     // Description
  owner?: string;           // Owner/service
  tags?: string[];          // Tags for filtering
  metadata?: object;        // Custom metadata
  params?: any;             // Input parameters
  idempotency_key?: string; // For idempotency
  trace_timeout_ms?: number; // Custom timeout for this trace (milliseconds)
  step_timeout_ms?: number;  // Custom timeout for steps in this trace (milliseconds)
});

Custom Timeouts:

  • If not specified, the service uses its default timeout settings
  • trace_timeout_ms: Maximum time for the entire trace to complete
  • step_timeout_ms: Maximum time for each step to complete
  • Use for processes with known execution time characteristics
  • Examples: Quick APIs (5s), Batch jobs (10m), ML training (2h)

getTrace(traceId): Promise<TraceHandle>

Get an existing trace by ID. Makes HTTP call to fetch state from service.

Note: Only works with HTTP transport.

// Retrieve existing trace
const trace = await sdk.getTrace('existing-trace-id');

// Can now use it to add steps, logs, etc.
await trace.startStep({ name: 'Continue Process' });
await trace.finish();

getCurrentTrace(): TraceHandle | null

Get current trace from context (no HTTP call).

// Inside runWithTrace or after startTrace
const trace = sdk.getCurrentTrace();

if (trace) {
  await trace.log('Processing...');
}

heartbeat(traceId?): Promise<void>

Send heartbeat to update last_activity_at (prevents timeout).

Note: Only works with HTTP transport.

// Explicit trace ID
await sdk.heartbeat('trace-123');

// Or use current context
await sdk.heartbeat();

runWithTrace(options, fn): Promise<T>

Run function with automatic trace management.

const result = await sdk.runWithTrace(
  { trace_type: 'my_process', title: 'My Process' },
  async () => {
    // Your code here
    return { success: true };
  }
);

startStep(options): Promise<StepHandle>

Start a step (requires active trace context).

const step = await sdk.startStep({
  step_id?: string;     // Optional: Custom step ID
  name?: string;        // Step name
  step_type?: string;   // Type of step
  input?: any;          // Input data
  metadata?: object;    // Custom metadata
});

log(message, options): Promise<void>

Log a message (uses current context).

await sdk.log('Processing user data', {
  level: LogLevel.INFO, // DEBUG, INFO, WARN, ERROR, FATAL
  event_type?: string;
  details?: any;
});

flush(): Promise<void>

Flush all pending events.

await sdk.flush();

healthCheck(): Promise<{ ok, latencyMs, error? }>

Check connectivity to the TraceFlow backend (HTTP transport only).

shutdown(): Promise<void>

Gracefully shutdown SDK.

await sdk.shutdown();

TraceHandle Methods

await trace.finish({ result?: any, metadata?: object });
await trace.fail(error: string | Error);
await trace.cancel();
await trace.startStep(options);
await trace.log(message, options);

StepHandle Methods

await step.finish({ output?: any, metadata?: object });
await step.fail(error: string | Error);
await step.log(message, options);

🌐 Microservice Integration

See examples/microservice-example.ts for a complete example with Express.

Express Middleware (Built-in)

import { TraceFlowSDK, createExpressMiddleware } from '@dev.smartpricing/traceflow-sdk';

const sdk = new TraceFlowSDK({
  transport: 'http',
  source: 'my-api',
  endpoint: process.env.TRACEFLOW_ENDPOINT,
});

// One-liner: auto-traces all requests, extracts/propagates X-Trace-Id
app.use(createExpressMiddleware(sdk, {
  ignorePaths: ['/health', '/metrics'],
}));

// Access trace in route handlers via req.traceflowTrace
app.get('/users/:id', async (req, res) => {
  const step = await req.traceflowTrace.startStep({ name: 'Get User' });
  const user = await getUserFromDB(req.params.id);
  await step.finish({ output: user });
  res.json(user);
});

Fastify Plugin (Built-in)

import { TraceFlowSDK, traceflowFastifyPlugin } from '@dev.smartpricing/traceflow-sdk';

const sdk = new TraceFlowSDK({ /* config */ });
fastify.register(traceflowFastifyPlugin, { sdk, ignorePaths: ['/health'] });

Health Check

const result = await sdk.healthCheck();
// { ok: true, latencyMs: 12 }
// { ok: false, latencyMs: 5002, error: 'ECONNREFUSED' }

🐳 Kubernetes Deployment

The SDK handles graceful shutdown automatically:

const sdk = new TraceFlowSDK({
  transport: 'kafka',
  source: process.env.SERVICE_NAME,
  kafka: { brokers: process.env.KAFKA_BROKERS.split(',') },
  autoFlushOnExit: true, // Auto-flush on SIGTERM
  flushTimeoutMs: 5000,  // Wait up to 5s for flush
});

// SDK automatically handles:
// - SIGTERM (Kubernetes shutdown)
// - SIGINT (Ctrl+C)
// - Uncaught exceptions
// - Unhandled rejections

πŸ”’ Production Best Practices

1. Always Use Silent Errors in Production

const sdk = new TraceFlowSDK({
  // ... config
  silentErrors: true, // Never fail your app due to tracing
});

2. Enable Circuit Breaker

const sdk = new TraceFlowSDK({
  // ... config
  enableCircuitBreaker: true, // Auto-disable on repeated failures
  maxRetries: 3,
  retryDelay: 1000,
});

3. Use Environment Variables

const sdk = new TraceFlowSDK({
  transport: process.env.TRACE_TRANSPORT as 'http' | 'kafka',
  source: process.env.SERVICE_NAME!,
  endpoint: process.env.TRACEFLOW_ENDPOINT,
  kafka: process.env.KAFKA_BROKERS ? {
    brokers: process.env.KAFKA_BROKERS.split(','),
  } : undefined,
});

4. Singleton Pattern

// tracing.ts
export const traceflow = new TraceFlowSDK({ /* config */ });

// everywhere else
import { traceflow } from './tracing';

πŸ“Š Event Model

The SDK emits events, not state updates:

{
  event_id: string;
  event_type: 
    | 'trace_started'
    | 'trace_finished'
    | 'trace_failed'
    | 'trace_cancelled'
    | 'step_started'
    | 'step_finished'
    | 'step_failed'
    | 'log_emitted';
  trace_id: string;
  step_id?: string;
  timestamp: string;
  source: string;
  payload: Record<string, any>;
}

πŸ†š Migration from v1

Breaking Changes

v1 v2
TraceFlowClient TraceFlowSDK
trace() startTrace()
step() startStep()
TraceManager TraceHandle
Step class StepHandle
Direct Kafka Transport abstraction
Redis state Stateless
Status messages Event messages

Migration Example

// v1
const client = new TraceFlowClient({
  brokers: ['localhost:9092'],
  redisUrl: 'redis://localhost:6379',
}, 'my-service');

await client.connect();
const trace = await client.trace({ title: 'My Trace' });
const step = await trace.step({ name: 'My Step' });
await step.finish();
await trace.finish();

// v2
const sdk = new TraceFlowSDK({
  transport: 'kafka',
  source: 'my-service',
  kafka: { brokers: ['localhost:9092'] },
});

const trace = await sdk.startTrace({ title: 'My Trace' });
const step = await trace.startStep({ name: 'My Step' });
await step.finish();
await trace.finish();
await sdk.shutdown();

πŸ“– Examples

🀝 Contributing

Contributions are welcome! Please open an issue or PR.

πŸ“„ License

ISC Β© Smartpricing

πŸ”— Links


Built with ❀️ by Smartpricing