diff --git a/src/monitoring/sla/index.ts b/src/monitoring/sla/index.ts new file mode 100644 index 0000000..8a6c52f --- /dev/null +++ b/src/monitoring/sla/index.ts @@ -0,0 +1,6 @@ +/** + * SLA Monitoring Module + * Centralized export for all SLA monitoring functionality + */ + +export * from './stellar'; diff --git a/src/monitoring/sla/stellar/README.md b/src/monitoring/sla/stellar/README.md new file mode 100644 index 0000000..6938059 --- /dev/null +++ b/src/monitoring/sla/stellar/README.md @@ -0,0 +1,433 @@ +/** + * Stellar Bridge SLA Monitoring - Implementation Guide + */ + +# Stellar Bridge SLA Monitoring + +## Overview + +The Stellar Bridge SLA (Service Level Agreement) Monitoring system provides comprehensive tracking and reporting of bridge provider performance metrics, including uptime, latency, and reliability measurements. + +## Features + +✅ **Uptime Tracking** - Monitor bridge provider availability +✅ **Latency Monitoring** - Track response times with percentile calculations +✅ **Reliability Scoring** - Calculate reliability scores based on request success rates +✅ **SLA Compliance Checking** - Detect violations against configured thresholds +✅ **Report Generation** - Generate reports in multiple formats (text, JSON, CSV, HTML) +✅ **Historical Data Storage** - Maintain historical metrics for trend analysis +✅ **Anomaly Detection** - Identify unusual latency spikes +✅ **Event Emitters** - Real-time event notifications for violations and status changes + +## Installation + +The SLA monitoring module is located at `src/monitoring/sla/stellar/` and can be imported as: + +```typescript +import { + StellarBridgeSlaMonitor, + exportReport, + calculateComplianceScore, +} from '@/monitoring/sla/stellar'; +``` + +## Quick Start + +### 1. Create a Monitor Instance + +```typescript +import { StellarBridgeSlaMonitor } from '@/monitoring/sla/stellar'; + +const monitor = new StellarBridgeSlaMonitor({ + checkIntervalMs: 60_000, // Check every minute + timeoutMs: 5_000, // 5-second timeout + thresholds: { + uptimePercentage: 99.9, + maxLatencyMs: 1000, + minReliability: 0.95, + }, + autoReportGeneration: true, + reportIntervalMs: 86_400_000, // Daily reports + enableHistoricalData: true, +}); +``` + +### 2. Register Providers + +```typescript +// Define probe functions that test provider health +const stellarBridgeProbe = async () => { + const startTime = performance.now(); + try { + const response = await fetch('https://stellar-bridge-api.example.com/health'); + const latencyMs = performance.now() - startTime; + return { + success: response.ok, + latencyMs, + error: !response.ok ? 'Health check failed' : undefined, + }; + } catch (error) { + return { + success: false, + latencyMs: performance.now() - startTime, + error: error instanceof Error ? error.message : String(error), + }; + } +}; + +monitor.registerProvider('stellar-bridge-primary', stellarBridgeProbe); +monitor.registerProvider('stellar-bridge-backup', backupBridgeProbe); +``` + +### 3. Start Monitoring + +```typescript +monitor.startMonitoring(); + +// Listen for violations +monitor.on('violation', ({ violation, providerId }) => { + console.log(`SLA Violation on ${providerId}:`, violation); + // Send alerts, notifications, etc. +}); + +// Listen for status changes +monitor.on('status-change', (status) => { + console.log(`Status changed for ${status.providerId}:`, status.status); +}); + +// Listen for generated reports +monitor.on('report-generated', (report) => { + console.log(`Report generated for ${report.providerId}`); + // Save report, send email, etc. +}); +``` + +### 4. Retrieve Metrics and Reports + +```typescript +// Get current metrics for a provider +const metrics = monitor.getMetrics('stellar-bridge-primary'); +console.log(`Uptime: ${metrics?.uptime}%`); +console.log(`P99 Latency: ${metrics?.p99LatencyMs}ms`); + +// Get status +const status = monitor.getStatus('stellar-bridge-primary'); +console.log(`Compliance Status: ${status?.status}`); + +// Generate a report +const report = monitor.generateReport('stellar-bridge-primary'); + +// Export in different formats +import { exportReport } from '@/monitoring/sla/stellar'; + +const textReport = exportReport(report, 'text'); +const jsonReport = exportReport(report, 'json'); +const csvReport = exportReport(report, 'csv'); +const htmlReport = exportReport(report, 'html'); +``` + +## API Reference + +### StellarBridgeSlaMonitor + +#### Constructor + +```typescript +constructor(config: StellarBridgeSlaMonitorConfig) +``` + +**Config Options:** +- `checkIntervalMs` (default: 60,000) - Interval between provider checks +- `timeoutMs` (default: 5,000) - Timeout for probe execution +- `maxMeasurements` (default: 10,000) - Max measurements to keep in memory +- `thresholds` - SLA thresholds for compliance +- `autoReportGeneration` (default: true) - Auto-generate reports +- `reportIntervalMs` (default: 86,400,000) - Report generation interval +- `enableHistoricalData` (default: true) - Store historical data +- `onViolation` - Callback for violations +- `onStatusChange` - Callback for status changes +- `onError` - Error handler + +#### Methods + +```typescript +// Provider Management +registerProvider(providerId: string, probe: SLAProbe): void +unregisterProvider(providerId: string): boolean + +// Monitoring Control +startMonitoring(): void +stopMonitoring(): void +reset(): void + +// Probe Execution +checkAll(): Promise +checkProvider(providerId: string): Promise + +// Data Retrieval +getStatus(providerId: string): SLAStatus | null +getAllStatuses(): SLAStatus[] +getMetrics(providerId: string): Partial | null +getHistoricalData(providerId: string): SLAHistoricalData | null + +// Report Generation +generateReport(providerId: string): SLAReport | null +generateAllReports(): Promise + +// Historical Data +addHistoricalDailyMetrics(providerId: string, date: Date, metrics: Partial): void +``` + +#### Events + +```typescript +monitor.on('status-change', (status: SLAStatus) => {}) +monitor.on('violation', ({ violation, providerId }) => {}) +monitor.on('report-generated', (report: SLAReport) => {}) +``` + +### Utility Functions + +#### Metrics Utilities + +```typescript +import { + calculatePercentile, + calculateMovingAverage, + calculateStandardDeviation, + detectAnomalies, + calculateComplianceScore, + calculateMetricsTrend, + formatMetricsForDisplay, + aggregateMetrics, + checkSlaCompliance, +} from '@/monitoring/sla/stellar'; + +// Calculate P95 and P99 latencies +const p95 = calculatePercentile(latencies, 95); +const p99 = calculatePercentile(latencies, 99); + +// Detect anomalies +const anomalies = detectAnomalies(measurements, 2); // Z-score threshold + +// Calculate compliance score (0-100) +const score = calculateComplianceScore(metrics, thresholds); + +// Check if compliant +const isCompliant = checkSlaCompliance(metrics, thresholds); +``` + +#### Report Generation + +```typescript +import { exportReport } from '@/monitoring/sla/stellar'; + +const textReport = exportReport(report, 'text'); // Plain text +const jsonReport = exportReport(report, 'json'); // JSON +const csvReport = exportReport(report, 'csv'); // CSV data +const htmlReport = exportReport(report, 'html'); // HTML document +``` + +## Examples + +### Example 1: Basic Setup with Horizon Health Checks + +```typescript +import { StellarBridgeSlaMonitor } from '@/monitoring/sla/stellar'; + +const monitor = new StellarBridgeSlaMonitor({ + checkIntervalMs: 30_000, + thresholds: { + uptimePercentage: 99.5, + maxLatencyMs: 800, + minReliability: 0.98, + }, +}); + +// Probe Stellar Horizon +const horizonProbe = async () => { + const startTime = performance.now(); + try { + const response = await fetch('https://horizon.stellar.org/'); + return { + success: response.ok, + latencyMs: performance.now() - startTime, + }; + } catch (error) { + return { + success: false, + latencyMs: performance.now() - startTime, + error: error instanceof Error ? error.message : String(error), + }; + } +}; + +monitor.registerProvider('stellar-public', horizonProbe); +monitor.startMonitoring(); +``` + +### Example 2: Custom Alerts + +```typescript +monitor.on('violation', ({ violation, providerId }) => { + if (violation.severity === 'critical') { + // Send critical alert + console.error(`🚨 CRITICAL: ${providerId} - ${violation.metric} violation!`); + sendSlackAlert(`Critical SLA violation on ${providerId}`); + escalateToOncall(providerId, violation); + } else { + // Send warning + console.warn(`⚠️ WARNING: ${providerId} - ${violation.metric} at risk`); + sendEmailNotification(`SLA warning on ${providerId}`); + } +}); + +monitor.on('status-change', ({ providerId, previousStatus, currentStatus }) => { + if (previousStatus === 'compliant' && currentStatus === 'breached') { + logIncident(providerId, 'SLA Breached'); + } else if (currentStatus === 'compliant' && previousStatus !== 'compliant') { + resolveIncident(providerId); + } +}); +``` + +### Example 3: Generating and Storing Reports + +```typescript +import { exportReport } from '@/monitoring/sla/stellar'; +import fs from 'fs/promises'; + +monitor.on('report-generated', async (report) => { + const date = new Date().toISOString().split('T')[0]; + const dir = `./sla-reports/${report.providerId}`; + + await fs.mkdir(dir, { recursive: true }); + + // Save all formats + await fs.writeFile( + `${dir}/${date}-report.txt`, + exportReport(report, 'text'), + ); + await fs.writeFile( + `${dir}/${date}-report.json`, + exportReport(report, 'json'), + ); + await fs.writeFile( + `${dir}/${date}-report.csv`, + exportReport(report, 'csv'), + ); + await fs.writeFile( + `${dir}/${date}-report.html`, + exportReport(report, 'html'), + ); + + console.log(`Reports saved for ${report.providerId} on ${date}`); +}); +``` + +### Example 4: Trend Analysis + +```typescript +import { calculateMetricsTrend } from '@/monitoring/sla/stellar'; + +let previousMetrics = null; + +setInterval(() => { + const currentMetrics = monitor.getMetrics('stellar-bridge-primary'); + + if (previousMetrics && currentMetrics) { + const trend = calculateMetricsTrend(currentMetrics, previousMetrics); + + console.log('Trend Analysis:'); + console.log(` Uptime trend: ${trend.uptimeTrend > 0 ? '📈' : '📉'} ${trend.uptimeTrend.toFixed(2)}%`); + console.log(` Latency trend: ${trend.latencyTrend < 0 ? '📈' : '📉'} ${trend.latencyTrend.toFixed(0)}ms`); + console.log(` Reliability trend: ${trend.reliabilityTrend > 0 ? '📈' : '📉'} ${(trend.reliabilityTrend * 100).toFixed(2)}%`); + } + + previousMetrics = currentMetrics; +}, 300_000); // Every 5 minutes +``` + +## SLA Thresholds Configuration + +Recommended thresholds for production Stellar bridges: + +```typescript +const productionThresholds = { + uptimePercentage: 99.95, // 99.95% uptime + maxLatencyMs: 500, // 500ms max P99 latency + minReliability: 0.9995, // 99.95% reliability + minThroughput: 100, // 100 requests/second minimum +}; + +const stagingThresholds = { + uptimePercentage: 99.5, // 99.5% uptime + maxLatencyMs: 1000, // 1000ms max P99 latency + minReliability: 0.99, // 99% reliability + minThroughput: 50, // 50 requests/second minimum +}; +``` + +## Testing + +Run the test suite: + +```bash +npm run test src/monitoring/sla/stellar +``` + +Tests are located in `src/monitoring/sla/stellar/__tests__/` + +## Performance Considerations + +- **Memory Usage**: Stores up to `maxMeasurements` per provider (default 10,000) +- **CPU Usage**: Check interval and timeout affect CPU load +- **Network**: Probe functions should be efficient to minimize network impact +- **Scalability**: Can monitor up to hundreds of providers with minimal overhead + +## Troubleshooting + +### Reports Not Generating +- Ensure `autoReportGeneration: true` in config +- Check that providers have recorded measurements +- Verify `reportIntervalMs` is appropriate + +### Violations Not Detected +- Confirm thresholds are set in config +- Check measurement data is being recorded +- Verify violation callbacks are registered + +### High Memory Usage +- Reduce `maxMeasurements` value +- Increase frequency of archiving historical data +- Monitor fewer providers simultaneously + +## Integration with BridgeWise + +To integrate SLA monitoring into the BridgeWise platform: + +1. Import the monitor in your bridge provider service +2. Register all bridge providers with their health check probes +3. Connect event handlers to your logging/alerting systems +4. Expose SLA endpoints via REST API +5. Display metrics in monitoring dashboard + +Example integration: + +```typescript +import { StellarBridgeSlaMonitor } from '@/monitoring/sla/stellar'; +import { BridgeProviderManager } from '@/packages/bridge-providers'; + +export function setupSlaMonitoring() { + const monitor = new StellarBridgeSlaMonitor(getSlaConfig()); + const providers = BridgeProviderManager.getInstance(); + + // Register all providers + for (const [name, provider] of providers.getAll()) { + monitor.registerProvider(name, createProbe(provider)); + } + + monitor.startMonitoring(); + return monitor; +} +``` diff --git a/src/monitoring/sla/stellar/__tests__/metrics-utils.spec.ts b/src/monitoring/sla/stellar/__tests__/metrics-utils.spec.ts new file mode 100644 index 0000000..76ae39e --- /dev/null +++ b/src/monitoring/sla/stellar/__tests__/metrics-utils.spec.ts @@ -0,0 +1,266 @@ +/** + * SLA Metrics Utilities Tests + */ + +import { describe, it, expect } from 'vitest'; +import { + calculatePercentile, + calculateMovingAverage, + calculateStandardDeviation, + detectAnomalies, + calculateComplianceScore, + calculateMetricsTrend, + formatMetricsForDisplay, + aggregateMetrics, + checkSlaCompliance, + estimateRecoveryTime, +} from '../metrics-utils'; +import type { SLAMeasurement, SLAMetrics } from '../types'; + +describe('SLA Metrics Utils', () => { + describe('calculatePercentile', () => { + it('should calculate percentiles correctly', () => { + const values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + expect(calculatePercentile(values, 50)).toBe(5.5); + expect(calculatePercentile(values, 95)).toBeGreaterThan(9); + expect(calculatePercentile(values, 99)).toBeGreaterThan(9.8); + }); + + it('should handle single value', () => { + expect(calculatePercentile([100], 50)).toBe(100); + }); + + it('should handle empty array', () => { + expect(calculatePercentile([], 50)).toBe(0); + }); + }); + + describe('calculateMovingAverage', () => { + it('should calculate moving average', () => { + const measurements: SLAMeasurement[] = [ + { timestamp: new Date(), available: true, latencyMs: 100 }, + { timestamp: new Date(), available: true, latencyMs: 200 }, + { timestamp: new Date(), available: true, latencyMs: 300 }, + ]; + + const avg = calculateMovingAverage(measurements, 3); + expect(avg).toBe(200); + }); + + it('should ignore failed measurements', () => { + const measurements: SLAMeasurement[] = [ + { timestamp: new Date(), available: true, latencyMs: 100 }, + { timestamp: new Date(), available: false, latencyMs: 0 }, + { timestamp: new Date(), available: true, latencyMs: 300 }, + ]; + + const avg = calculateMovingAverage(measurements, 3); + expect(avg).toBe(200); + }); + + it('should handle empty measurements', () => { + expect(calculateMovingAverage([], 10)).toBe(0); + }); + }); + + describe('calculateStandardDeviation', () => { + it('should calculate standard deviation', () => { + const measurements: SLAMeasurement[] = [ + { timestamp: new Date(), available: true, latencyMs: 100 }, + { timestamp: new Date(), available: true, latencyMs: 200 }, + { timestamp: new Date(), available: true, latencyMs: 300 }, + { timestamp: new Date(), available: true, latencyMs: 400 }, + { timestamp: new Date(), available: true, latencyMs: 500 }, + ]; + + const stdDev = calculateStandardDeviation(measurements); + expect(stdDev).toBeGreaterThan(0); + expect(stdDev).toBeLessThan(200); + }); + + it('should return 0 for single measurement', () => { + const measurements: SLAMeasurement[] = [ + { timestamp: new Date(), available: true, latencyMs: 100 }, + ]; + + expect(calculateStandardDeviation(measurements)).toBe(0); + }); + }); + + describe('detectAnomalies', () => { + it('should detect latency anomalies', () => { + const measurements: SLAMeasurement[] = [ + { timestamp: new Date(), available: true, latencyMs: 100 }, + { timestamp: new Date(), available: true, latencyMs: 110 }, + { timestamp: new Date(), available: true, latencyMs: 105 }, + { timestamp: new Date(), available: true, latencyMs: 5000 }, // Anomaly + ]; + + const anomalies = detectAnomalies(measurements, 2); + expect(anomalies.length).toBeGreaterThan(0); + expect(anomalies[0].latencyMs).toBe(5000); + }); + + it('should handle empty measurements', () => { + expect(detectAnomalies([], 2)).toEqual([]); + }); + }); + + describe('calculateComplianceScore', () => { + it('should calculate full compliance score', () => { + const metrics: Partial = { + uptime: 100, + p99LatencyMs: 500, + reliability: 1, + }; + + const score = calculateComplianceScore(metrics, { + uptimePercentage: 99.9, + maxLatencyMs: 1000, + minReliability: 0.95, + }); + + expect(score).toBe(100); + }); + + it('should deduct for violations', () => { + const metrics: Partial = { + uptime: 95, + p99LatencyMs: 2000, + reliability: 0.8, + }; + + const score = calculateComplianceScore(metrics, { + uptimePercentage: 99.9, + maxLatencyMs: 1000, + minReliability: 0.95, + }); + + expect(score).toBeLessThan(100); + expect(score).toBeGreaterThanOrEqual(0); + }); + }); + + describe('calculateMetricsTrend', () => { + it('should calculate positive trends', () => { + const current: Partial = { + uptime: 99.5, + avgLatencyMs: 100, + reliability: 0.98, + }; + + const previous: Partial = { + uptime: 99.0, + avgLatencyMs: 150, + reliability: 0.95, + }; + + const trend = calculateMetricsTrend(current, previous); + expect(trend.uptimeTrend).toBeGreaterThan(0); + expect(trend.latencyTrend).toBeLessThan(0); // Lower latency is better + expect(trend.reliabilityTrend).toBeGreaterThan(0); + }); + }); + + describe('formatMetricsForDisplay', () => { + it('should format metrics correctly', () => { + const metrics: Partial = { + uptime: 99.5, + availability: 0.995, + avgLatencyMs: 123.456, + reliability: 0.99, + }; + + const formatted = formatMetricsForDisplay(metrics); + expect(formatted.uptime).toContain('99.50'); + expect(formatted.uptime).toContain('%'); + expect(formatted.avgLatency).toContain('ms'); + }); + }); + + describe('aggregateMetrics', () => { + it('should aggregate multiple metrics', () => { + const metricsList: Array> = [ + { + totalRequests: 100, + successfulRequests: 99, + failedRequests: 1, + uptime: 99, + availability: 0.99, + avgLatencyMs: 100, + }, + { + totalRequests: 100, + successfulRequests: 98, + failedRequests: 2, + uptime: 98, + availability: 0.98, + avgLatencyMs: 150, + }, + ]; + + const aggregated = aggregateMetrics(metricsList); + expect(aggregated.totalRequests).toBe(200); + expect(aggregated.successfulRequests).toBe(197); + expect(aggregated.uptime).toBeLessThan(100); + }); + + it('should handle empty metrics list', () => { + const aggregated = aggregateMetrics([]); + expect(aggregated.uptime).toBe(0); + expect(aggregated.totalRequests).toBe(0); + }); + }); + + describe('checkSlaCompliance', () => { + it('should return true for compliant metrics', () => { + const metrics: Partial = { + uptime: 99.95, + p99LatencyMs: 500, + reliability: 0.99, + }; + + const compliant = checkSlaCompliance(metrics, { + uptimePercentage: 99.9, + maxLatencyMs: 1000, + minReliability: 0.95, + }); + + expect(compliant).toBe(true); + }); + + it('should return false for non-compliant metrics', () => { + const metrics: Partial = { + uptime: 95, + p99LatencyMs: 2000, + reliability: 0.8, + }; + + const compliant = checkSlaCompliance(metrics, { + uptimePercentage: 99.9, + maxLatencyMs: 1000, + minReliability: 0.95, + }); + + expect(compliant).toBe(false); + }); + }); + + describe('estimateRecoveryTime', () => { + it('should estimate recovery time for improving trends', () => { + // If violations are decreasing, estimate recovery time + const recoveryTime = estimateRecoveryTime(10, -2, 5); // 10 violations, -2 trend, 5 threshold + expect(recoveryTime).toBeGreaterThan(0); + }); + + it('should return null for stable or worsening trends', () => { + expect(estimateRecoveryTime(10, 0, 5)).toBeNull(); + expect(estimateRecoveryTime(10, 2, 5)).toBeNull(); + }); + + it('should return 0 if already at threshold', () => { + const recoveryTime = estimateRecoveryTime(3, -2, 5); // Already below threshold + expect(recoveryTime).toBe(0); + }); + }); +}); diff --git a/src/monitoring/sla/stellar/__tests__/stellar-bridge-sla-monitor.spec.ts b/src/monitoring/sla/stellar/__tests__/stellar-bridge-sla-monitor.spec.ts new file mode 100644 index 0000000..717a1d0 --- /dev/null +++ b/src/monitoring/sla/stellar/__tests__/stellar-bridge-sla-monitor.spec.ts @@ -0,0 +1,393 @@ +/** + * Stellar Bridge SLA Monitor Tests + */ + +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { StellarBridgeSlaMonitor } from '../stellar-bridge-sla-monitor'; +import type { SLAMeasurement, SLAReport, SLAThresholds } from '../types'; + +describe('StellarBridgeSlaMonitor', () => { + let monitor: StellarBridgeSlaMonitor; + let testProbeResults: SLAMeasurement[]; + + beforeEach(() => { + testProbeResults = []; + monitor = new StellarBridgeSlaMonitor({ + checkIntervalMs: 100, + timeoutMs: 1000, + thresholds: { + uptimePercentage: 99.5, + maxLatencyMs: 500, + minReliability: 0.99, + }, + autoReportGeneration: false, + }); + }); + + afterEach(() => { + monitor.reset(); + }); + + describe('Provider Registration', () => { + it('should register a provider', () => { + const probe = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + monitor.registerProvider('stellar-bridge-1', probe); + + const status = monitor.getStatus('stellar-bridge-1'); + expect(status).toBeDefined(); + expect(status?.providerId).toBe('stellar-bridge-1'); + }); + + it('should unregister a provider', () => { + const probe = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + monitor.registerProvider('stellar-bridge-1', probe); + + const unregistered = monitor.unregisterProvider('stellar-bridge-1'); + expect(unregistered).toBe(true); + expect(monitor.getStatus('stellar-bridge-1')).toBeNull(); + }); + + it('should not unregister non-existent provider', () => { + const unregistered = monitor.unregisterProvider('non-existent'); + expect(unregistered).toBe(false); + }); + }); + + describe('SLA Monitoring', () => { + it('should track successful measurements', async () => { + const probe = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < 10; i++) { + await monitor.checkProvider('provider-1'); + } + + const metrics = monitor.getMetrics('provider-1'); + expect(metrics?.totalRequests).toBe(10); + expect(metrics?.successfulRequests).toBe(10); + expect(metrics?.failedRequests).toBe(0); + expect(metrics?.uptime).toBe(100); + }); + + it('should track failed measurements', async () => { + const probe = vi + .fn() + .mockResolvedValueOnce({ success: true, latencyMs: 100 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }) + .mockResolvedValueOnce({ success: true, latencyMs: 150 }); + + monitor.registerProvider('provider-1', probe); + + await monitor.checkProvider('provider-1'); + await monitor.checkProvider('provider-1'); + await monitor.checkProvider('provider-1'); + + const metrics = monitor.getMetrics('provider-1'); + expect(metrics?.totalRequests).toBe(3); + expect(metrics?.successfulRequests).toBe(2); + expect(metrics?.failedRequests).toBe(1); + expect(metrics?.uptime).toBeCloseTo(66.67, 1); + }); + + it('should calculate latency percentiles correctly', async () => { + const latencies = [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]; + let latencyIndex = 0; + + const probe = vi.fn().mockImplementation(() => + Promise.resolve({ + success: true, + latencyMs: latencies[latencyIndex++], + }), + ); + + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < latencies.length; i++) { + await monitor.checkProvider('provider-1'); + } + + const metrics = monitor.getMetrics('provider-1'); + expect(metrics?.minLatencyMs).toBe(100); + expect(metrics?.maxLatencyMs).toBe(1000); + expect(metrics?.avgLatencyMs).toBeCloseTo(550, 0); + expect(metrics?.p95LatencyMs).toBeGreaterThan(800); + expect(metrics?.p99LatencyMs).toBeGreaterThan(900); + }); + }); + + describe('SLA Violations', () => { + it('should detect uptime violations', async () => { + const probe = vi + .fn() + .mockResolvedValueOnce({ success: true, latencyMs: 100 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }); + + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < 3; i++) { + await monitor.checkProvider('provider-1'); + } + + const status = monitor.getStatus('provider-1'); + expect(status?.violations.length).toBeGreaterThan(0); + const uptimeViolation = status?.violations.find((v) => v.metric === 'uptime'); + expect(uptimeViolation).toBeDefined(); + }); + + it('should detect latency violations', async () => { + const probe = vi.fn().mockResolvedValue({ success: true, latencyMs: 1000 }); + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < 5; i++) { + await monitor.checkProvider('provider-1'); + } + + const status = monitor.getStatus('provider-1'); + const latencyViolation = status?.violations.find((v) => v.metric === 'latency'); + expect(latencyViolation).toBeDefined(); + }); + + it('should classify violations as critical or warning', async () => { + const probe = vi + .fn() + .mockResolvedValueOnce({ success: true, latencyMs: 100 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }); + + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < 3; i++) { + await monitor.checkProvider('provider-1'); + } + + const status = monitor.getStatus('provider-1'); + const violations = status?.violations || []; + + expect(violations.some((v) => v.severity === 'critical')).toBe(true); + }); + }); + + describe('Status Management', () => { + it('should track status changes', async () => { + const probe = vi + .fn() + .mockResolvedValueOnce({ success: true, latencyMs: 100 }); + + monitor.registerProvider('provider-1', probe); + let statusChangeCount = 0; + + monitor.on('status-change', () => { + statusChangeCount++; + }); + + await monitor.checkProvider('provider-1'); + expect(statusChangeCount).toBeGreaterThanOrEqual(0); + }); + + it('should return all provider statuses', async () => { + const probe1 = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + const probe2 = vi.fn().mockResolvedValue({ success: true, latencyMs: 150 }); + + monitor.registerProvider('provider-1', probe1); + monitor.registerProvider('provider-2', probe2); + + await monitor.checkProvider('provider-1'); + await monitor.checkProvider('provider-2'); + + const allStatuses = monitor.getAllStatuses(); + expect(allStatuses.length).toBe(2); + }); + }); + + describe('Report Generation', () => { + it('should generate a report', async () => { + const probe = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < 10; i++) { + await monitor.checkProvider('provider-1'); + } + + const report = monitor.generateReport('provider-1'); + expect(report).toBeDefined(); + expect(report?.providerId).toBe('provider-1'); + expect(report?.metrics.totalRequests).toBe(10); + }); + + it('should generate report with violations', async () => { + const probe = vi + .fn() + .mockResolvedValueOnce({ success: true, latencyMs: 100 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }); + + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < 3; i++) { + await monitor.checkProvider('provider-1'); + } + + const report = monitor.generateReport('provider-1'); + expect(report?.violations.length).toBeGreaterThan(0); + }); + + it('should generate reports for all providers', async () => { + const probe1 = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + const probe2 = vi.fn().mockResolvedValue({ success: true, latencyMs: 150 }); + + monitor.registerProvider('provider-1', probe1); + monitor.registerProvider('provider-2', probe2); + + for (let i = 0; i < 5; i++) { + await monitor.checkProvider('provider-1'); + await monitor.checkProvider('provider-2'); + } + + const reports = await monitor.generateAllReports(); + expect(reports.length).toBe(2); + }); + + it('should include recommendations in reports', async () => { + const probe = vi + .fn() + .mockResolvedValueOnce({ success: true, latencyMs: 100 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }); + + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < 3; i++) { + await monitor.checkProvider('provider-1'); + } + + const report = monitor.generateReport('provider-1'); + expect(report?.recommendations.length).toBeGreaterThan(0); + }); + }); + + describe('Monitoring Lifecycle', () => { + it('should start and stop monitoring', async () => { + const probe = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + monitor.registerProvider('provider-1', probe); + + monitor.startMonitoring(); + expect(probe).toHaveBeenCalled(); + + monitor.stopMonitoring(); + const initialCallCount = probe.mock.calls.length; + await new Promise((resolve) => setTimeout(resolve, 150)); + expect(probe.mock.calls.length).toBe(initialCallCount); + }); + + it('should handle timeouts gracefully', async () => { + const probe = vi + .fn() + .mockImplementation( + () => + new Promise((resolve) => { + setTimeout(() => resolve({ success: true, latencyMs: 100 }), 2000); + }), + ); + + monitor.registerProvider('provider-1', probe); + const measurement = await monitor.checkProvider('provider-1'); + + expect(measurement?.available).toBe(false); + expect(measurement?.latencyMs).toBe(1000); // timeout value + }); + + it('should reset all data', async () => { + const probe = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + monitor.registerProvider('provider-1', probe); + + await monitor.checkProvider('provider-1'); + monitor.reset(); + + expect(monitor.getStatus('provider-1')).toBeNull(); + expect(monitor.getMetrics('provider-1')).toBeNull(); + }); + }); + + describe('Error Handling', () => { + it('should handle probe errors', async () => { + const probe = vi + .fn() + .mockRejectedValue(new Error('Probe failed')); + + monitor.registerProvider('provider-1', probe); + const measurement = await monitor.checkProvider('provider-1'); + + expect(measurement?.available).toBe(false); + expect(measurement?.errorMessage).toContain('Probe failed'); + }); + + it('should emit violation events', async () => { + const probe = vi + .fn() + .mockResolvedValueOnce({ success: true, latencyMs: 100 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }) + .mockResolvedValueOnce({ success: false, latencyMs: 0 }); + + let violationCount = 0; + monitor.on('violation', () => { + violationCount++; + }); + + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < 3; i++) { + await monitor.checkProvider('provider-1'); + } + + expect(violationCount).toBeGreaterThan(0); + }); + + it('should emit report-generated events', async () => { + const probe = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + let reportCount = 0; + + monitor.on('report-generated', () => { + reportCount++; + }); + + monitor.registerProvider('provider-1', probe); + + for (let i = 0; i < 5; i++) { + await monitor.checkProvider('provider-1'); + } + + monitor.generateReport('provider-1'); + expect(reportCount).toBe(1); + }); + }); + + describe('Metrics Calculation', () => { + it('should calculate reliability correctly', async () => { + const successProbe = vi.fn().mockResolvedValue({ success: true, latencyMs: 100 }); + const failProbe = vi.fn().mockResolvedValue({ success: false, latencyMs: 0 }); + + monitor.registerProvider('reliable', successProbe); + monitor.registerProvider('unreliable', failProbe); + + for (let i = 0; i < 10; i++) { + await monitor.checkProvider('reliable'); + await monitor.checkProvider('unreliable'); + } + + const reliableMetrics = monitor.getMetrics('reliable'); + const unreliableMetrics = monitor.getMetrics('unreliable'); + + expect(reliableMetrics?.reliability).toBe(1); + expect(unreliableMetrics?.reliability).toBe(0); + }); + + it('should handle empty measurements', () => { + monitor.registerProvider('provider-1', vi.fn()); + + const metrics = monitor.getMetrics('provider-1'); + expect(metrics?.totalRequests).toBe(0); + expect(metrics?.uptime).toBe(100); + }); + }); +}); diff --git a/src/monitoring/sla/stellar/index.ts b/src/monitoring/sla/stellar/index.ts new file mode 100644 index 0000000..10cadfc --- /dev/null +++ b/src/monitoring/sla/stellar/index.ts @@ -0,0 +1,42 @@ +/** + * Stellar Bridge SLA Monitoring Module + * Main export for SLA monitoring functionality + */ + +export { + StellarBridgeSlaMonitor, +} from './stellar-bridge-sla-monitor'; + +export { + calculatePercentile, + calculateMovingAverage, + calculateStandardDeviation, + detectAnomalies, + calculateComplianceScore, + calculateMetricsTrend, + formatMetricsForDisplay, + aggregateMetrics, + checkSlaCompliance, + estimateRecoveryTime, +} from './metrics-utils'; + +export { + generateTextReport, + generateJsonReport, + generateCsvReport, + generateHtmlReport, + exportReport, +} from './report-generator'; + +export type { + SLAMetric, + SLAThresholds, + SLAMeasurement, + SLAMetrics, + SLAStatus, + SLAViolation, + SLAReport, + SLAHistoricalData, + StellarBridgeSlaMonitorConfig, + SLAProbe, +} from './types'; diff --git a/src/monitoring/sla/stellar/metrics-utils.ts b/src/monitoring/sla/stellar/metrics-utils.ts new file mode 100644 index 0000000..d812fdf --- /dev/null +++ b/src/monitoring/sla/stellar/metrics-utils.ts @@ -0,0 +1,302 @@ +/** + * SLA Metrics Collection Utilities + * Helper functions for collecting and aggregating SLA metrics + */ + +import type { + SLAMetrics, + SLAMeasurement, + SLAThresholds, +} from './types'; + +/** + * Calculate percentile from sorted array + */ +export function calculatePercentile( + values: number[], + percentile: number, +): number { + if (values.length === 0) return 0; + if (values.length === 1) return values[0]; + + const sorted = [...values].sort((a, b) => a - b); + const index = (percentile / 100) * (sorted.length - 1); + const lower = Math.floor(index); + const upper = Math.ceil(index); + const weight = index % 1; + + if (lower === upper) { + return sorted[lower]; + } + + return sorted[lower] * (1 - weight) + sorted[upper] * weight; +} + +/** + * Calculate moving average of latencies + */ +export function calculateMovingAverage( + measurements: SLAMeasurement[], + windowSize: number = 100, +): number { + if (measurements.length === 0) return 0; + + const recentMeasurements = measurements.slice(-windowSize); + const latencies = recentMeasurements + .filter((m) => m.available) + .map((m) => m.latencyMs); + + if (latencies.length === 0) return 0; + + return latencies.reduce((a, b) => a + b, 0) / latencies.length; +} + +/** + * Calculate standard deviation of latencies + */ +export function calculateStandardDeviation( + measurements: SLAMeasurement[], +): number { + const latencies = measurements + .filter((m) => m.available) + .map((m) => m.latencyMs); + + if (latencies.length < 2) return 0; + + const mean = latencies.reduce((a, b) => a + b, 0) / latencies.length; + const variance = + latencies.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / + latencies.length; + + return Math.sqrt(variance); +} + +/** + * Detect anomalies in measurements using z-score method + */ +export function detectAnomalies( + measurements: SLAMeasurement[], + threshold: number = 2, +): SLAMeasurement[] { + const latencies = measurements + .filter((m) => m.available) + .map((m) => m.latencyMs); + + if (latencies.length < 2) return []; + + const mean = latencies.reduce((a, b) => a + b, 0) / latencies.length; + const stdDev = calculateStandardDeviation(measurements); + + if (stdDev === 0) return []; + + return measurements.filter((measurement) => { + if (!measurement.available) return false; + const zScore = Math.abs((measurement.latencyMs - mean) / stdDev); + return zScore > threshold; + }); +} + +/** + * Calculate SLA compliance score (0-100) + */ +export function calculateComplianceScore( + metrics: Partial, + thresholds: SLAThresholds, +): number { + let score = 100; + + // Deduct points for uptime violation + if ( + metrics.uptime !== undefined && + metrics.uptime < thresholds.uptimePercentage + ) { + const uptimeDiff = thresholds.uptimePercentage - metrics.uptime; + score -= Math.min(uptimeDiff * 10, 40); // Max 40 points deduction + } + + // Deduct points for latency violation + if ( + metrics.p99LatencyMs !== undefined && + metrics.p99LatencyMs > thresholds.maxLatencyMs + ) { + const latencyRatio = metrics.p99LatencyMs / thresholds.maxLatencyMs; + score -= Math.min((latencyRatio - 1) * 20, 30); // Max 30 points deduction + } + + // Deduct points for reliability violation + if ( + metrics.reliability !== undefined && + metrics.reliability < thresholds.minReliability + ) { + const reliabilityDiff = thresholds.minReliability - metrics.reliability; + score -= Math.min(reliabilityDiff * 100, 30); // Max 30 points deduction + } + + return Math.max(0, score); +} + +/** + * Calculate trends in metrics + */ +export function calculateMetricsTrend( + current: Partial, + previous: Partial, +): { + uptimeTrend: number; + latencyTrend: number; + reliabilityTrend: number; +} { + return { + uptimeTrend: current.uptime && previous.uptime ? current.uptime - previous.uptime : 0, + latencyTrend: + current.avgLatencyMs && previous.avgLatencyMs + ? current.avgLatencyMs - previous.avgLatencyMs + : 0, + reliabilityTrend: + current.reliability && previous.reliability + ? current.reliability - previous.reliability + : 0, + }; +} + +/** + * Format metrics for display + */ +export function formatMetricsForDisplay(metrics: Partial): Record { + return { + uptime: `${(metrics.uptime || 0).toFixed(2)}%`, + availability: `${((metrics.availability || 0) * 100).toFixed(2)}%`, + avgLatency: `${(metrics.avgLatencyMs || 0).toFixed(2)}ms`, + p99Latency: `${(metrics.p99LatencyMs || 0).toFixed(2)}ms`, + p95Latency: `${(metrics.p95LatencyMs || 0).toFixed(2)}ms`, + minLatency: `${(metrics.minLatencyMs || 0).toFixed(2)}ms`, + maxLatency: `${(metrics.maxLatencyMs || 0).toFixed(2)}ms`, + totalRequests: `${metrics.totalRequests || 0}`, + successfulRequests: `${metrics.successfulRequests || 0}`, + failedRequests: `${metrics.failedRequests || 0}`, + reliability: `${((metrics.reliability || 0) * 100).toFixed(2)}%`, + }; +} + +/** + * Aggregate metrics from multiple measurement periods + */ +export function aggregateMetrics( + metricsList: Array>, +): Partial { + if (metricsList.length === 0) { + return { + uptime: 0, + availability: 0, + avgLatencyMs: 0, + p99LatencyMs: 0, + p95LatencyMs: 0, + minLatencyMs: 0, + maxLatencyMs: 0, + totalRequests: 0, + successfulRequests: 0, + failedRequests: 0, + reliability: 0, + }; + } + + const totalRequests = metricsList.reduce((sum, m) => sum + (m.totalRequests || 0), 0); + const totalSuccessful = metricsList.reduce( + (sum, m) => sum + (m.successfulRequests || 0), + 0, + ); + const totalFailed = metricsList.reduce((sum, m) => sum + (m.failedRequests || 0), 0); + + const availability = totalRequests > 0 ? totalSuccessful / totalRequests : 0; + const uptime = availability * 100; + const reliability = availability; + + const avgLatencies = metricsList + .map((m) => m.avgLatencyMs || 0) + .filter((v) => v > 0); + const avgLatencyMs = avgLatencies.length > 0 + ? avgLatencies.reduce((a, b) => a + b, 0) / avgLatencies.length + : 0; + + const p99Latencies = metricsList + .map((m) => m.p99LatencyMs || 0) + .filter((v) => v > 0); + const p99LatencyMs = p99Latencies.length > 0 + ? Math.max(...p99Latencies) + : 0; + + const p95Latencies = metricsList + .map((m) => m.p95LatencyMs || 0) + .filter((v) => v > 0); + const p95LatencyMs = p95Latencies.length > 0 + ? Math.max(...p95Latencies) + : 0; + + const minLatencies = metricsList + .map((m) => m.minLatencyMs || 0) + .filter((v) => v > 0); + const minLatencyMs = minLatencies.length > 0 + ? Math.min(...minLatencies) + : 0; + + const maxLatencies = metricsList + .map((m) => m.maxLatencyMs || 0) + .filter((v) => v > 0); + const maxLatencyMs = maxLatencies.length > 0 + ? Math.max(...maxLatencies) + : 0; + + return { + uptime, + availability, + avgLatencyMs, + p99LatencyMs, + p95LatencyMs, + minLatencyMs, + maxLatencyMs, + totalRequests, + successfulRequests: totalSuccessful, + failedRequests: totalFailed, + reliability, + }; +} + +/** + * Check if metrics meet SLA thresholds + */ +export function checkSlaCompliance( + metrics: Partial, + thresholds: SLAThresholds, +): boolean { + return ( + (metrics.uptime === undefined || metrics.uptime >= thresholds.uptimePercentage) && + (metrics.p99LatencyMs === undefined || + metrics.p99LatencyMs <= thresholds.maxLatencyMs) && + (metrics.reliability === undefined || + metrics.reliability >= thresholds.minReliability) && + true // throughput check could be added here + ); +} + +/** + * Calculate expected recovery time based on current trend + */ +export function estimateRecoveryTime( + violations: number, + trend: number, + threshold: number, +): number | null { + if (violations === 0 || trend === 0 || trend >= 0) { + return null; // Not recovering or already recovered + } + + // Estimate time to recover based on negative trend + const improvementNeeded = violations - threshold; + if (improvementNeeded <= 0) { + return 0; // Already at threshold + } + + // Rough estimate: time = improvement needed / rate of improvement + const estimatedMinutes = Math.ceil(Math.abs(improvementNeeded / trend)); + return estimatedMinutes * 60 * 1000; // Convert to milliseconds +} diff --git a/src/monitoring/sla/stellar/report-generator.ts b/src/monitoring/sla/stellar/report-generator.ts new file mode 100644 index 0000000..15f0c12 --- /dev/null +++ b/src/monitoring/sla/stellar/report-generator.ts @@ -0,0 +1,388 @@ +/** + * SLA Report Generator + * Utilities for generating SLA reports + */ + +import type { + SLAReport, + SLAMetrics, + SLAViolation, +} from './types'; +import { + formatMetricsForDisplay, + calculateComplianceScore, +} from './metrics-utils'; + +/** + * Generate a detailed text report + */ +export function generateTextReport(report: SLAReport): string { + const lines: string[] = []; + + lines.push('╔═══════════════════════════════════════════════════════════════╗'); + lines.push('║ STELLAR BRIDGE SLA REPORT ║'); + lines.push('╚═══════════════════════════════════════════════════════════════╝'); + lines.push(''); + + lines.push('📋 REPORT METADATA'); + lines.push(` Report ID: ${report.reportId}`); + lines.push(` Provider: ${report.providerId}`); + lines.push(` Generated: ${report.generatedAt.toISOString()}`); + lines.push(` Period: ${report.period.startTime.toISOString()} to ${report.period.endTime.toISOString()}`); + lines.push(''); + + lines.push('📊 PERFORMANCE METRICS'); + const formatted = formatMetricsForDisplay(report.metrics); + lines.push(` Uptime: ${formatted.uptime}`); + lines.push(` Availability: ${formatted.availability}`); + lines.push(` Avg Latency: ${formatted.avgLatency}`); + lines.push(` P99 Latency: ${formatted.p99Latency}`); + lines.push(` P95 Latency: ${formatted.p95Latency}`); + lines.push(` Min Latency: ${formatted.minLatency}`); + lines.push(` Max Latency: ${formatted.maxLatency}`); + lines.push(` Total Requests: ${formatted.totalRequests}`); + lines.push(` Successful Requests: ${formatted.successfulRequests}`); + lines.push(` Failed Requests: ${formatted.failedRequests}`); + lines.push(` Reliability: ${formatted.reliability}`); + lines.push(''); + + lines.push('✅ SLA STATUS'); + const statusEmoji = + report.status === 'compliant' + ? '✓' + : report.status === 'at-risk' + ? '⚠' + : '✗'; + const statusLabel = + report.status === 'compliant' + ? 'COMPLIANT' + : report.status === 'at-risk' + ? 'AT-RISK' + : 'BREACHED'; + lines.push(` Status: ${statusEmoji} ${statusLabel}`); + lines.push(''); + + if (report.violations.length > 0) { + lines.push('⚠️ VIOLATIONS'); + report.violations.forEach((violation, index) => { + const severity = violation.severity === 'critical' ? '🔴' : '🟡'; + lines.push(` ${index + 1}. ${severity} ${violation.metric.toUpperCase()}`); + lines.push(` Threshold: ${violation.threshold}`); + lines.push(` Actual: ${violation.actual.toFixed(2)}`); + lines.push(` Violated: ${violation.violatedAt.toISOString()}`); + }); + lines.push(''); + } + + lines.push('📝 SUMMARY'); + lines.push(` ${report.summary}`); + lines.push(''); + + if (report.recommendations.length > 0) { + lines.push('💡 RECOMMENDATIONS'); + report.recommendations.forEach((rec, index) => { + lines.push(` ${index + 1}. ${rec}`); + }); + lines.push(''); + } + + lines.push('╔═══════════════════════════════════════════════════════════════╗'); + lines.push('║ END OF REPORT ║'); + lines.push('╚═══════════════════════════════════════════════════════════════╝'); + + return lines.join('\n'); +} + +/** + * Generate JSON report + */ +export function generateJsonReport(report: SLAReport): Record { + return { + reportId: report.reportId, + providerId: report.providerId, + generatedAt: report.generatedAt.toISOString(), + period: { + startTime: report.period.startTime.toISOString(), + endTime: report.period.endTime.toISOString(), + }, + metrics: { + uptime: (report.metrics.uptime || 0).toFixed(2), + availability: ((report.metrics.availability || 0) * 100).toFixed(2), + avgLatencyMs: (report.metrics.avgLatencyMs || 0).toFixed(2), + p99LatencyMs: (report.metrics.p99LatencyMs || 0).toFixed(2), + p95LatencyMs: (report.metrics.p95LatencyMs || 0).toFixed(2), + minLatencyMs: (report.metrics.minLatencyMs || 0).toFixed(2), + maxLatencyMs: (report.metrics.maxLatencyMs || 0).toFixed(2), + totalRequests: report.metrics.totalRequests, + successfulRequests: report.metrics.successfulRequests, + failedRequests: report.metrics.failedRequests, + reliability: ((report.metrics.reliability || 0) * 100).toFixed(2), + }, + status: report.status, + violations: report.violations.map((v) => ({ + metric: v.metric, + threshold: v.threshold, + actual: v.actual.toFixed(2), + violatedAt: v.violatedAt.toISOString(), + severity: v.severity, + })), + summary: report.summary, + recommendations: report.recommendations, + }; +} + +/** + * Generate CSV report data + */ +export function generateCsvReport(report: SLAReport): string { + const lines: string[] = []; + + // Header + lines.push('Metric,Value,Threshold,Status'); + lines.push(''); + + // Metrics + const metrics = report.metrics; + const thresholdUptime = 99.9; + const thresholdLatency = 1000; + const thresholdReliability = 0.95; + + const uptimeStatus = + (metrics.uptime || 0) >= thresholdUptime ? 'PASS' : 'FAIL'; + lines.push( + `Uptime,${(metrics.uptime || 0).toFixed(2)}%,${thresholdUptime}%,${uptimeStatus}`, + ); + + const latencyStatus = + (metrics.p99LatencyMs || 0) <= thresholdLatency ? 'PASS' : 'FAIL'; + lines.push( + `P99 Latency,${(metrics.p99LatencyMs || 0).toFixed(0)}ms,${thresholdLatency}ms,${latencyStatus}`, + ); + + const reliabilityStatus = + (metrics.reliability || 0) >= thresholdReliability ? 'PASS' : 'FAIL'; + lines.push( + `Reliability,${(metrics.reliability || 0).toFixed(3)},${thresholdReliability},${reliabilityStatus}`, + ); + + lines.push( + `Total Requests,${metrics.totalRequests || 0},,`, + ); + lines.push( + `Successful Requests,${metrics.successfulRequests || 0},,`, + ); + lines.push( + `Failed Requests,${metrics.failedRequests || 0},,`, + ); + lines.push(''); + + // Violations + if (report.violations.length > 0) { + lines.push('Violations'); + lines.push('Metric,Threshold,Actual,Severity,Violated At'); + report.violations.forEach((v) => { + lines.push( + `${v.metric},${v.threshold},${v.actual.toFixed(2)},${v.severity},${v.violatedAt.toISOString()}`, + ); + }); + } + + return lines.join('\n'); +} + +/** + * Generate HTML report + */ +export function generateHtmlReport(report: SLAReport): string { + const statusColor = + report.status === 'compliant' + ? '#28a745' + : report.status === 'at-risk' + ? '#ffc107' + : '#dc3545'; + const statusText = + report.status === 'compliant' + ? 'COMPLIANT' + : report.status === 'at-risk' + ? 'AT-RISK' + : 'BREACHED'; + + return ` + + + + + + Stellar Bridge SLA Report + + + +
+
+

Stellar Bridge SLA Report

+

Provider: ${report.providerId} | Status: ${statusText}

+
+ +
+
+ Report ID: ${report.reportId} +
+
+ Generated: ${report.generatedAt.toLocaleString()} +
+
+ Period Start: ${report.period.startTime.toLocaleString()} +
+
+ Period End: ${report.period.endTime.toLocaleString()} +
+
+ +

Performance Metrics

+
+
+
${(report.metrics.uptime || 0).toFixed(2)}%
+
Uptime
+
+
+
${(report.metrics.avgLatencyMs || 0).toFixed(0)}ms
+
Avg Latency
+
+
+
${((report.metrics.reliability || 0) * 100).toFixed(2)}%
+
Reliability
+
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MetricValue
Availability${((report.metrics.availability || 0) * 100).toFixed(2)}%
P99 Latency${(report.metrics.p99LatencyMs || 0).toFixed(2)}ms
P95 Latency${(report.metrics.p95LatencyMs || 0).toFixed(2)}ms
Min Latency${(report.metrics.minLatencyMs || 0).toFixed(2)}ms
Max Latency${(report.metrics.maxLatencyMs || 0).toFixed(2)}ms
Total Requests${report.metrics.totalRequests}
Successful Requests${report.metrics.successfulRequests}
Failed Requests${report.metrics.failedRequests}
+ + ${ + report.violations.length > 0 + ? ` +
+

⚠️ Violations Found

+ ${report.violations + .map( + (v) => ` +
+ [${v.severity.toUpperCase()}] + ${v.metric}: Threshold ${v.threshold}, Actual ${v.actual.toFixed(2)} +
+ `, + ) + .join('')} +
+ ` + : '' + } + +
+

Summary

+

${report.summary}

+
+ + ${ + report.recommendations.length > 0 + ? ` +
+

💡 Recommendations

+
    + ${report.recommendations.map((rec) => `
  • ${rec}
  • `).join('')} +
+
+ ` + : '' + } + + +
+ + + `.trim(); +} + +/** + * Export report in multiple formats + */ +export function exportReport( + report: SLAReport, + format: 'text' | 'json' | 'csv' | 'html' = 'text', +): string { + switch (format) { + case 'json': + return JSON.stringify(generateJsonReport(report), null, 2); + case 'csv': + return generateCsvReport(report); + case 'html': + return generateHtmlReport(report); + case 'text': + default: + return generateTextReport(report); + } +} diff --git a/src/monitoring/sla/stellar/stellar-bridge-sla-monitor.ts b/src/monitoring/sla/stellar/stellar-bridge-sla-monitor.ts new file mode 100644 index 0000000..149e1de --- /dev/null +++ b/src/monitoring/sla/stellar/stellar-bridge-sla-monitor.ts @@ -0,0 +1,561 @@ +/** + * Stellar Bridge SLA Monitor + * Tracks and monitors Service Level Agreements for Stellar bridge providers + */ + +import { EventEmitter } from 'events'; +import type { + SLAMetrics, + SLAStatus, + SLAViolation, + SLAReport, + SLAMeasurement, + SLAThresholds, + StellarBridgeSlaMonitorConfig, + SLAProbe, + SLAHistoricalData, +} from './types'; + +const DEFAULT_CONFIG: Required = { + checkIntervalMs: 60_000, + timeoutMs: 5_000, + maxMeasurements: 10_000, + thresholds: { + uptimePercentage: 99.9, + maxLatencyMs: 1_000, + minReliability: 0.95, + minThroughput: 100, + }, + autoReportGeneration: true, + reportIntervalMs: 86_400_000, // 24 hours + enableHistoricalData: true, + onViolation: undefined, + onStatusChange: undefined, + onError: undefined, +}; + +export class StellarBridgeSlaMonitor extends EventEmitter { + private readonly config: Required; + private readonly probes = new Map(); + private readonly measurements = new Map(); + private readonly slaStatuses = new Map(); + private readonly historicalData = new Map(); + private checkInterval: ReturnType | null = null; + private reportInterval: ReturnType | null = null; + private lastReportTime = new Map(); + + constructor(config: StellarBridgeSlaMonitorConfig = {}) { + super(); + this.config = { + ...DEFAULT_CONFIG, + ...config, + thresholds: { + ...DEFAULT_CONFIG.thresholds, + ...config.thresholds, + }, + }; + + this.setupErrorHandler(); + } + + /** + * Register a provider for SLA monitoring + */ + registerProvider(providerId: string, probe: SLAProbe): void { + this.probes.set(providerId, probe); + if (!this.measurements.has(providerId)) { + this.measurements.set(providerId, []); + } + if (!this.slaStatuses.has(providerId)) { + this.initializeProviderStatus(providerId); + } + if (this.config.enableHistoricalData && !this.historicalData.has(providerId)) { + this.historicalData.set(providerId, { + providerId, + dailyMetrics: [], + monthlyMetrics: [], + yearlyMetrics: [], + }); + } + } + + /** + * Unregister a provider + */ + unregisterProvider(providerId: string): boolean { + this.measurements.delete(providerId); + this.slaStatuses.delete(providerId); + this.lastReportTime.delete(providerId); + return this.probes.delete(providerId); + } + + /** + * Start SLA monitoring + */ + startMonitoring(): void { + if (this.checkInterval) { + return; + } + + this.checkInterval = setInterval(() => { + void this.checkAll(); + }, this.config.checkIntervalMs); + + if (this.config.autoReportGeneration && !this.reportInterval) { + this.reportInterval = setInterval(() => { + void this.generateAllReports(); + }, this.config.reportIntervalMs); + } + + void this.checkAll(); + } + + /** + * Stop SLA monitoring + */ + stopMonitoring(): void { + if (this.checkInterval) { + clearInterval(this.checkInterval); + this.checkInterval = null; + } + + if (this.reportInterval) { + clearInterval(this.reportInterval); + this.reportInterval = null; + } + } + + /** + * Reset all monitoring data + */ + reset(): void { + this.stopMonitoring(); + this.probes.clear(); + this.measurements.clear(); + this.slaStatuses.clear(); + this.historicalData.clear(); + this.lastReportTime.clear(); + } + + /** + * Check all registered providers + */ + async checkAll(): Promise { + const providerIds = Array.from(this.probes.keys()); + await Promise.all(providerIds.map((providerId) => this.checkProvider(providerId))); + } + + /** + * Check a specific provider + */ + async checkProvider(providerId: string): Promise { + const probe = this.probes.get(providerId); + if (!probe) { + return null; + } + + let measurement: SLAMeasurement; + try { + const startTime = performance.now(); + const result = await this.withTimeout(probe(), this.config.timeoutMs); + const latencyMs = performance.now() - startTime; + + measurement = { + timestamp: new Date(), + available: result.success, + latencyMs, + errorMessage: result.error, + }; + } catch (error: any) { + measurement = { + timestamp: new Date(), + available: false, + latencyMs: this.config.timeoutMs, + errorMessage: error?.message || String(error), + }; + } + + this.recordMeasurement(providerId, measurement); + await this.updateStatus(providerId); + + return measurement; + } + + /** + * Record a measurement for a provider + */ + private recordMeasurement(providerId: string, measurement: SLAMeasurement): void { + const measurements = this.measurements.get(providerId) || []; + measurements.push(measurement); + + // Keep only the latest measurements within maxMeasurements + if (measurements.length > this.config.maxMeasurements) { + measurements.shift(); + } + + this.measurements.set(providerId, measurements); + } + + /** + * Update SLA status for a provider + */ + private async updateStatus(providerId: string): Promise { + const metrics = this.calculateMetrics(providerId); + const violations = this.checkViolations(providerId, metrics); + const previousStatus = this.slaStatuses.get(providerId); + + // Determine compliance status + let status: SLAStatus['status'] = 'compliant'; + if (violations.length > 0) { + const criticalViolations = violations.filter((v) => v.severity === 'critical'); + status = criticalViolations.length > 0 ? 'breached' : 'at-risk'; + } + + const newStatus: SLAStatus = { + providerId, + status, + metricsSnapshot: metrics, + violations, + lastUpdated: new Date(), + }; + + this.slaStatuses.set(providerId, newStatus); + + // Emit events + if (previousStatus && previousStatus.status !== newStatus.status) { + this.emitStatusChange(newStatus); + } + + // Emit violations + violations.forEach((violation) => { + this.emitViolation(violation, providerId); + }); + } + + /** + * Calculate metrics from measurements + */ + private calculateMetrics(providerId: string): Partial { + const measurements = this.measurements.get(providerId) || []; + if (measurements.length === 0) { + return { + providerId, + uptime: 100, + availability: 1, + avgLatencyMs: 0, + p99LatencyMs: 0, + p95LatencyMs: 0, + minLatencyMs: 0, + maxLatencyMs: 0, + totalRequests: 0, + successfulRequests: 0, + failedRequests: 0, + reliability: 1, + }; + } + + const successfulMeasurements = measurements.filter((m) => m.available); + const latencies = successfulMeasurements.map((m) => m.latencyMs).sort((a, b) => a - b); + + const totalRequests = measurements.length; + const successfulRequests = successfulMeasurements.length; + const failedRequests = totalRequests - successfulRequests; + const availability = successfulRequests / totalRequests; + const uptime = availability * 100; + + const avgLatencyMs = + latencies.length > 0 ? latencies.reduce((a, b) => a + b, 0) / latencies.length : 0; + const minLatencyMs = latencies.length > 0 ? latencies[0] : 0; + const maxLatencyMs = latencies.length > 0 ? latencies[latencies.length - 1] : 0; + + // Calculate percentiles + const p99Index = Math.ceil((latencies.length * 99) / 100) - 1; + const p95Index = Math.ceil((latencies.length * 95) / 100) - 1; + const p99LatencyMs = latencies[Math.max(0, p99Index)] || 0; + const p95LatencyMs = latencies[Math.max(0, p95Index)] || 0; + + const reliability = availability; + + return { + providerId, + uptime, + availability, + avgLatencyMs, + p99LatencyMs, + p95LatencyMs, + minLatencyMs, + maxLatencyMs, + totalRequests, + successfulRequests, + failedRequests, + reliability, + }; + } + + /** + * Check for SLA violations + */ + private checkViolations(providerId: string, metrics: Partial): SLAViolation[] { + const violations: SLAViolation[] = []; + const thresholds = this.config.thresholds; + const now = new Date(); + + // Check uptime + if ( + metrics.uptime !== undefined && + metrics.uptime < thresholds.uptimePercentage + ) { + violations.push({ + metric: 'uptime', + threshold: thresholds.uptimePercentage, + actual: metrics.uptime, + violatedAt: now, + severity: + metrics.uptime < thresholds.uptimePercentage * 0.95 ? 'critical' : 'warning', + }); + } + + // Check latency + if ( + metrics.p99LatencyMs !== undefined && + metrics.p99LatencyMs > thresholds.maxLatencyMs + ) { + violations.push({ + metric: 'latency', + threshold: thresholds.maxLatencyMs, + actual: metrics.p99LatencyMs, + violatedAt: now, + severity: + metrics.p99LatencyMs > thresholds.maxLatencyMs * 2 ? 'critical' : 'warning', + }); + } + + // Check reliability + if ( + metrics.reliability !== undefined && + metrics.reliability < thresholds.minReliability + ) { + violations.push({ + metric: 'reliability', + threshold: thresholds.minReliability, + actual: metrics.reliability, + violatedAt: now, + severity: + metrics.reliability < thresholds.minReliability * 0.9 ? 'critical' : 'warning', + }); + } + + return violations; + } + + /** + * Generate a full SLA report for a provider + */ + generateReport(providerId: string): SLAReport | null { + const measurements = this.measurements.get(providerId); + if (!measurements || measurements.length === 0) { + return null; + } + + const metrics = this.calculateMetrics(providerId); + const status = this.slaStatuses.get(providerId); + const violations = status?.violations || []; + + const startTime = measurements[0].timestamp; + const endTime = measurements[measurements.length - 1].timestamp; + + const report: SLAReport = { + reportId: `sla-${providerId}-${Date.now()}`, + providerId, + generatedAt: new Date(), + period: { + startTime, + endTime, + }, + metrics: { + providerId, + period: { startTime, endTime }, + measurements, + ...(metrics as any), + }, + status: status?.status || 'compliant', + violations, + summary: this.generateSummary(providerId, metrics, violations), + recommendations: this.generateRecommendations(providerId, metrics, violations), + }; + + this.lastReportTime.set(providerId, new Date()); + this.emit('report-generated', report); + + return report; + } + + /** + * Generate reports for all providers + */ + async generateAllReports(): Promise { + const reports: SLAReport[] = []; + for (const providerId of this.probes.keys()) { + const report = this.generateReport(providerId); + if (report) { + reports.push(report); + } + } + return reports; + } + + /** + * Get current SLA status for a provider + */ + getStatus(providerId: string): SLAStatus | null { + return this.slaStatuses.get(providerId) || null; + } + + /** + * Get all provider statuses + */ + getAllStatuses(): SLAStatus[] { + return Array.from(this.slaStatuses.values()); + } + + /** + * Get metrics for a provider + */ + getMetrics(providerId: string): Partial | null { + const measurements = this.measurements.get(providerId); + if (!measurements) { + return null; + } + return this.calculateMetrics(providerId); + } + + /** + * Get historical data for a provider + */ + getHistoricalData(providerId: string): SLAHistoricalData | null { + return this.historicalData.get(providerId) || null; + } + + /** + * Add historical daily metrics + */ + addHistoricalDailyMetrics( + providerId: string, + date: Date, + metrics: Partial, + ): void { + const history = this.historicalData.get(providerId); + if (!history) { + return; + } + + history.dailyMetrics.push({ + date, + metrics, + }); + } + + /** + * Private helper methods + */ + private initializeProviderStatus(providerId: string): void { + this.slaStatuses.set(providerId, { + providerId, + status: 'compliant', + metricsSnapshot: {}, + violations: [], + lastUpdated: new Date(), + }); + } + + private generateSummary( + providerId: string, + metrics: Partial, + violations: SLAViolation[], + ): string { + const uptime = metrics.uptime?.toFixed(2) || '0'; + const latency = metrics.avgLatencyMs?.toFixed(0) || '0'; + const reliability = (metrics.reliability || 0).toFixed(3); + + let summary = `Provider "${providerId}" - Uptime: ${uptime}%, Avg Latency: ${latency}ms, Reliability: ${reliability}`; + + if (violations.length > 0) { + const criticalViolations = violations.filter((v) => v.severity === 'critical').length; + const warningViolations = violations.filter((v) => v.severity === 'warning').length; + summary += `. Violations: ${criticalViolations} critical, ${warningViolations} warnings`; + } + + return summary; + } + + private generateRecommendations( + providerId: string, + metrics: Partial, + violations: SLAViolation[], + ): string[] { + const recommendations: string[] = []; + + if (violations.length === 0) { + recommendations.push(`Continue monitoring "${providerId}" for consistent performance.`); + return recommendations; + } + + for (const violation of violations) { + if (violation.metric === 'uptime') { + recommendations.push( + `Investigate downtime for "${providerId}". Current uptime: ${violation.actual.toFixed(2)}%, threshold: ${violation.threshold.toFixed(2)}%`, + ); + } else if (violation.metric === 'latency') { + recommendations.push( + `Optimize latency for "${providerId}". P99 latency: ${violation.actual.toFixed(0)}ms, threshold: ${violation.threshold}ms`, + ); + } else if (violation.metric === 'reliability') { + recommendations.push( + `Improve reliability for "${providerId}". Current: ${violation.actual.toFixed(3)}, threshold: ${violation.threshold.toFixed(3)}`, + ); + } + } + + return recommendations; + } + + private emitStatusChange(status: SLAStatus): void { + this.emit('status-change', status); + if (this.config.onStatusChange) { + this.config.onStatusChange(status); + } + } + + private emitViolation(violation: SLAViolation, providerId: string): void { + this.emit('violation', { violation, providerId }); + if (this.config.onViolation) { + this.config.onViolation(violation, providerId); + } + } + + private setupErrorHandler(): void { + this.on('error', (error: unknown) => { + if (this.config.onError) { + this.config.onError(error); + } + }); + } + + private withTimeout(promise: Promise, timeoutMs: number): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error(`SLA probe timed out after ${timeoutMs}ms`)); + }, timeoutMs); + + promise + .then((value) => { + clearTimeout(timeout); + resolve(value); + }) + .catch((error) => { + clearTimeout(timeout); + reject(error); + }); + }); + } +} diff --git a/src/monitoring/sla/stellar/types.ts b/src/monitoring/sla/stellar/types.ts new file mode 100644 index 0000000..339c7ec --- /dev/null +++ b/src/monitoring/sla/stellar/types.ts @@ -0,0 +1,120 @@ +/** + * Stellar Bridge SLA Monitoring Types + * Defines types and interfaces for tracking service level agreements + */ + +export type SLAMetric = 'uptime' | 'latency' | 'throughput' | 'reliability'; + +export interface SLAThresholds { + /** Minimum uptime percentage (0-100). Default: 99.9 */ + uptimePercentage?: number; + /** Maximum acceptable latency in milliseconds. Default: 1000 */ + maxLatencyMs?: number; + /** Minimum reliability score (0-1). Default: 0.95 */ + minReliability?: number; + /** Minimum throughput (requests per second). Default: 100 */ + minThroughput?: number; +} + +export interface SLAMeasurement { + timestamp: Date; + available: boolean; + latencyMs: number; + errorMessage?: string; +} + +export interface SLAMetrics { + providerId: string; + period: { + startTime: Date; + endTime: Date; + }; + measurements: SLAMeasurement[]; + uptime: number; // 0-100 percentage + availability: number; // 0-1 + avgLatencyMs: number; + p99LatencyMs: number; + p95LatencyMs: number; + minLatencyMs: number; + maxLatencyMs: number; + totalRequests: number; + successfulRequests: number; + failedRequests: number; + reliability: number; // 0-1 +} + +export interface SLAStatus { + providerId: string; + status: 'compliant' | 'at-risk' | 'breached'; + metricsSnapshot: Partial; + violations: SLAViolation[]; + lastUpdated: Date; +} + +export interface SLAViolation { + metric: SLAMetric; + threshold: number; + actual: number; + violatedAt: Date; + severity: 'warning' | 'critical'; +} + +export interface SLAReport { + reportId: string; + providerId: string; + generatedAt: Date; + period: { + startTime: Date; + endTime: Date; + }; + metrics: SLAMetrics; + status: SLAStatus['status']; + violations: SLAViolation[]; + summary: string; + recommendations: string[]; +} + +export interface SLAHistoricalData { + providerId: string; + dailyMetrics: Array<{ + date: Date; + metrics: Partial; + }>; + monthlyMetrics: Array<{ + month: Date; + metrics: Partial; + }>; + yearlyMetrics: Array<{ + year: number; + metrics: Partial; + }>; +} + +export interface StellarBridgeSlaMonitorConfig { + /** Check interval in milliseconds. Default: 60000 (1 minute) */ + checkIntervalMs?: number; + /** Timeout for probe calls in milliseconds. Default: 5000 */ + timeoutMs?: number; + /** Number of measurements to keep in memory. Default: 10000 */ + maxMeasurements?: number; + /** SLA thresholds for compliance checking */ + thresholds?: SLAThresholds; + /** Enable automatic report generation */ + autoReportGeneration?: boolean; + /** Report generation interval in milliseconds. Default: 86400000 (24 hours) */ + reportIntervalMs?: number; + /** Enable historical data storage */ + enableHistoricalData?: boolean; + /** Callback when SLA violation occurs */ + onViolation?: (violation: SLAViolation, providerId: string) => void; + /** Callback when SLA status changes */ + onStatusChange?: (status: SLAStatus) => void; + /** Error handler for unhandled errors */ + onError?: (err: unknown) => void; +} + +export type SLAProbe = () => Promise<{ + success: boolean; + latencyMs: number; + error?: string; +}>; diff --git a/src/replay/events/index.ts b/src/replay/events/index.ts new file mode 100644 index 0000000..7ef6f7a --- /dev/null +++ b/src/replay/events/index.ts @@ -0,0 +1,6 @@ +/** + * Event Replay Module + * Centralized export for all event replay functionality + */ + +export * from './stellar'; diff --git a/src/replay/events/stellar/README.md b/src/replay/events/stellar/README.md new file mode 100644 index 0000000..b7ff9d0 --- /dev/null +++ b/src/replay/events/stellar/README.md @@ -0,0 +1,487 @@ +/** + * Soroban Event Replay Processor - Implementation Guide + */ + +# Soroban Event Replay Processor + +## Overview + +The Soroban Event Replay Processor enables developers to replay historical bridge events for debugging, testing, and analytics. It provides a comprehensive event storage and replay system with flexible filtering, multiple replay modes, and detailed statistics. + +## Features + +✅ **Event Storage** - Efficient in-memory event storage with LRU eviction +✅ **Flexible Querying** - Powerful filter criteria with indexing for fast queries +✅ **Multiple Replay Modes** - Fast, real-time, throttled, and step-by-step playback +✅ **Listener System** - Register listeners to receive replayed events +✅ **Session Management** - Track replay sessions with detailed statistics +✅ **Error Handling** - Graceful error handling with recovery options +✅ **Query Builder** - Fluent API for building complex queries +✅ **Storage Statistics** - Monitor storage usage and event distribution + +## Installation + +The event replay module is located at `src/replay/events/stellar/` and can be imported as: + +```typescript +import { + SorobanEventReplayProcessor, + createReplayQuery, + ReplayQueryPresets, +} from '@/replay/events/stellar'; +``` + +## Quick Start + +### 1. Create a Processor Instance + +```typescript +import { SorobanEventReplayProcessor } from '@/replay/events/stellar'; + +const processor = new SorobanEventReplayProcessor({ + inMemoryStorage: { + maxEvents: 100_000, + ttl: 30 * 24 * 60 * 60 * 1000, // 30 days + autoCleanup: true, + }, + defaultReplayOptions: { + mode: 'fast', + continueOnError: true, + }, +}); +``` + +### 2. Store Events + +```typescript +import type { StoredRawBridgeEvent } from '@/replay/events/stellar'; + +const bridgeEvent: StoredRawBridgeEvent = { + id: 'evt-001', + timestamp: new Date(), + source: 'stellar-bridge', + type: 'transfer', + payload: { + amount: '1000000000', + destination: 'GBUQWP3BOUZX34ULNQG23RQ6F4PFXKEC6FMBLIOTXHLHAPXVUJP3ORCZ', + }, + contractId: 'CADSCVF4MFXTV2HFYGDYXFCL3IUJG5LPNWGZYHXU6QTHGDQBGIXW5KQ', + transactionHash: 'abc123...', + sequenceNumber: 100, + ledgerSequence: 50000, + sourceAccount: 'GBUQWP3BOUZX34ULNQG23RQ6F4PFXKEC6FMBLIOTXHLHAPXVUJP3ORCZ', +}; + +await processor.storeEvent(bridgeEvent); + +// Or store multiple events +await processor.storeEvents([bridgeEvent, ...otherEvents]); +``` + +### 3. Register Listeners + +```typescript +// Register a listener to receive replayed events +const unregister = processor.registerListener((event, index, total, sessionId) => { + console.log(`Replaying event ${index + 1}/${total}: ${event.type}`); + // Handle replayed event + handleReplayedEvent(event); +}); + +// Register error listener +processor.registerErrorListener((error) => { + console.error(`Replay error: ${error.error}`); +}); +``` + +### 4. Replay Events + +```typescript +// Simple replay - all events in fast mode +const result = await processor.startReplay({}); + +console.log(`Replayed ${result.statistics.replayedEvents} events`); +console.log(`Duration: ${result.statistics.durationMs}ms`); +console.log(`Events/sec: ${result.statistics.eventsPerSecond}`); + +// Replay with filters +const result = await processor.startReplay( + { + eventTypes: ['transfer'], + startTime: new Date(Date.now() - 24 * 60 * 60 * 1000), // Last 24 hours + contractId: 'CADSCVF4MFXTV2HFYGDYXFCL3IUJG5LPNWGZYHXU6QTHGDQBGIXW5KQ', + }, + { + mode: 'throttled', + delayMs: 100, + continueOnError: true, + }, +); +``` + +## API Reference + +### SorobanEventReplayProcessor + +#### Constructor + +```typescript +new SorobanEventReplayProcessor(config?: SorobanEventReplayProcessorConfig) +``` + +**Config Options:** +- `storageBackend` - Custom storage backend (defaults to InMemoryEventStorage) +- `inMemoryStorage` - Configuration for in-memory storage +- `defaultReplayOptions` - Default options for replay operations +- `eventRetentionMs` - How long to keep events (default: 30 days) +- `cleanupIntervalMs` - Interval for automatic cleanup +- `maxConcurrentReplays` - Max concurrent replay sessions +- `verbose` - Enable verbose logging +- `onError` - Error handler callback +- `onSessionCompleted` - Callback when replay completes + +#### Methods + +```typescript +// Event Management +storeEvent(event: StoredBridgeEvent): Promise +storeEvents(events: StoredBridgeEvent[]): Promise + +// Listener Management +registerListener(listener: EventReplayListener): () => void +registerErrorListener(listener: EventReplayErrorListener): () => void + +// Querying +queryEvents(filter: ReplayFilterCriteria): Promise + +// Replay Operations +startReplay(filter: ReplayFilterCriteria, options?: ReplayOptions): Promise +pauseReplay(sessionId: string): boolean +resumeReplay(sessionId: string): Promise +cancelReplay(sessionId: string): boolean + +// Session Management +getSessionStats(sessionId: string): ReplaySession | null +getSessionHistory(sessionId: string): ReplayResult | null +getActiveSessions(): ReplaySession[] + +// Storage Management +getStorageStats(): Promise +cleanupOldEvents(): Promise +deleteEvent(eventId: string): Promise +``` + +#### Events + +```typescript +processor.on('event-stored', (event: StoredBridgeEvent) => {}) +processor.on('events-stored', ({ count: number }) => {}) +processor.on('listener-registered', ({ count: number }) => {}) +processor.on('listener-unregistered', ({ count: number }) => {}) +processor.on('replay-started', ({ sessionId, filter, options }) => {}) +processor.on('replay-progress', ({ sessionId, progress }) => {}) +processor.on('replay-completed', (result: ReplayResult) => {}) +processor.on('replay-paused', ({ sessionId }) => {}) +processor.on('replay-resumed', ({ sessionId }) => {}) +processor.on('replay-cancelled', ({ sessionId }) => {}) +``` + +### ReplayFilterCriteria + +```typescript +interface ReplayFilterCriteria { + // Time-based + startTime?: Date; + endTime?: Date; + + // Event type + eventTypes?: ReplayableEventType[]; + + // Addresses + fromAddress?: string; + toAddress?: string; + addresses?: string[]; + + // Contract + contractId?: string; + contractIds?: string[]; + + // Transaction + transactionHash?: string; + sequenceNumber?: number; + ledgerSequence?: number; + sourceAccount?: string; + + // Amount/Asset + minAmount?: string; + maxAmount?: string; + asset?: string; + assets?: string[]; + + // Metadata + hasMetadata?: boolean; + metadataMatches?: Record; + + // Pagination + limit?: number; + offset?: number; + + // Custom predicate + predicate?: (event: StoredBridgeEvent) => boolean; +} +``` + +### ReplayOptions + +```typescript +interface ReplayOptions { + mode?: 'fast' | 'real-time' | 'throttled' | 'step-by-step'; + delayMs?: number; + continueOnError?: boolean; + preserveTimestamps?: boolean; + maxEvents?: number; + batchSize?: number; + context?: Record; + normalize?: boolean; + dryRun?: boolean; +} +``` + +### Query Builder + +```typescript +import { createReplayQuery, ReplayQueryPresets } from '@/replay/events/stellar'; + +// Build queries fluently +const query = createReplayQuery() + .ofType('transfer') + .lastDays(7) + .minAmount('1000000') + .forContract(contractId) + .limit(100) + .build(); + +const result = await processor.queryEvents(query); + +// Use presets +const recentTransfers = ReplayQueryPresets.transfers().lastDays(1).build(); +const highValue = ReplayQueryPresets.highValueTransfers('1000000000').build(); +const contractEvents = ReplayQueryPresets.contractEvents(contractId).build(); +const addressHistory = ReplayQueryPresets.addressEvents(address).build(); +``` + +## Examples + +### Example 1: Replay Events for Debugging + +```typescript +import { SorobanEventReplayProcessor, createReplayQuery } from '@/replay/events/stellar'; + +const processor = new SorobanEventReplayProcessor(); + +// Collect events during normal operation +async function captureEvents(event) { + await processor.storeEvent(event); +} + +// Later, replay for debugging +async function debugTransaction(transactionHash) { + const debugListener = (event) => { + console.log('Event during replay:', event); + inspectEventDetails(event); + }; + + processor.registerListener(debugListener); + + const result = await processor.startReplay( + { transactionHash }, + { mode: 'step-by-step' }, + ); + + console.log(`Replayed ${result.statistics.replayedEvents} events`); +} +``` + +### Example 2: Replay with Filtering + +```typescript +// Replay all transfer events from the last 24 hours +const result = await processor.startReplay( + { + eventTypes: ['transfer'], + startTime: new Date(Date.now() - 24 * 60 * 60 * 1000), + }, + { mode: 'fast' }, +); + +console.log(`Success rate: ${(result.statistics.successRate * 100).toFixed(2)}%`); +``` + +### Example 3: Track Replay Progress + +```typescript +processor.on('replay-progress', ({ progress }) => { + console.log(`Replay progress: ${progress}%`); +}); + +processor.on('replay-completed', (result) => { + console.log('Replay complete:'); + console.log(` Total events: ${result.statistics.totalEvents}`); + console.log(` Replayed: ${result.statistics.replayedEvents}`); + console.log(` Failed: ${result.statistics.failedEvents}`); + console.log(` Duration: ${result.statistics.durationMs}ms`); + console.log(` Events/sec: ${result.statistics.eventsPerSecond}`); +}); + +await processor.startReplay({}); +``` + +### Example 4: Analyze Events + +```typescript +// Get storage statistics +const stats = await processor.getStorageStats(); + +console.log(`Total events: ${stats.totalEvents}`); +console.log(`Events by type:`, stats.eventsByType); +console.log(`Events by contract:`, stats.eventsByContract); +console.log(`Storage size: ${(stats.storageSize / 1024 / 1024).toFixed(2)}MB`); + +// Query specific events +const query = createReplayQuery() + .lastDays(7) + .ofTypes('transfer', 'mint') + .minAmount('1000000') + .limit(1000) + .build(); + +const result = await processor.queryEvents(query); +console.log(`Found ${result.total} events`); +``` + +### Example 5: Real-time Replay + +```typescript +// Replay events at real-time speed based on original timestamps +const result = await processor.startReplay( + { + startTime: new Date('2024-01-01'), + endTime: new Date('2024-01-02'), + }, + { + mode: 'real-time', + preserveTimestamps: true, + }, +); + +console.log(`Real-time replay completed in ${result.statistics.durationMs}ms`); +``` + +### Example 6: Error Handling + +```typescript +processor.registerListener((event) => { + if (event.type === 'unknown') { + throw new Error(`Unknown event type: ${JSON.stringify(event)}`); + } +}); + +processor.registerErrorListener((error) => { + console.error(`Failed to process event ${error.eventId}: ${error.error}`); + if (error.recoverable) { + console.log('Continuing replay...'); + } +}); + +const result = await processor.startReplay({}, { continueOnError: true }); + +// Review errors after replay +result.errors.forEach((err) => { + console.log(`Event ${err.eventId}: ${err.error} (${err.severity})`); +}); +``` + +## Replay Modes + +### Fast Mode +- Replays all events as quickly as possible +- No delay between events +- Best for batch processing and analytics +- Default mode + +### Real-time Mode +- Replays events with original timestamps preserved +- Delays between events match original timing +- Useful for reproducing exact flow scenarios +- Capped at 1-minute delays + +### Throttled Mode +- Applies fixed delay between events +- Configurable via `delayMs` option +- Useful for controlled testing +- Default delay: 100ms + +### Step-by-step Mode +- Pauses between events +- Allows interactive debugging +- Can be extended for pause/resume + +## Performance Considerations + +- **Storage**: Default 100K events with 30-day TTL +- **Memory**: Each event ~500 bytes average +- **Indexing**: Enabled by default for fast queries +- **Concurrency**: Max 5 concurrent replays by default +- **Cleanup**: Automatic TTL-based cleanup every 24 hours + +## Troubleshooting + +### Events Not Stored +- Check processor is initialized +- Verify event ID is unique +- Check storage limit not exceeded + +### Replay Not Triggering Listeners +- Confirm listeners registered before replay +- Check query filters match events +- Verify events exist with `queryEvents()` + +### Memory Usage High +- Reduce `maxEvents` in config +- Decrease `ttl` for older events +- Increase cleanup frequency + +### Slow Queries +- Add more specific filters +- Enable indexing +- Paginate results with limit/offset + +## Integration with BridgeWise + +To integrate event replay into BridgeWise: + +```typescript +import { SorobanEventReplayProcessor } from '@/replay/events/stellar'; +import { SorobanBridgeEventAggregator } from '@/events/aggregation/stellar'; + +// Setup replay processor +const replayProcessor = new SorobanEventReplayProcessor(); + +// Capture events from aggregator +const aggregator = new SorobanBridgeEventAggregator(); +aggregator.subscribe((events) => { + replayProcessor.storeEvents( + events.map((e) => ({ + id: generateId(), + timestamp: new Date(), + source: 'aggregator', + type: e.type, + payload: e, + contractId: e.contractId, + })), + ); +}); + +// Use for debugging or analytics +export { replayProcessor }; +``` diff --git a/src/replay/events/stellar/__tests__/soroban-event-replay-processor.spec.ts b/src/replay/events/stellar/__tests__/soroban-event-replay-processor.spec.ts new file mode 100644 index 0000000..5892e1e --- /dev/null +++ b/src/replay/events/stellar/__tests__/soroban-event-replay-processor.spec.ts @@ -0,0 +1,453 @@ +/** + * Soroban Event Replay Processor Tests + */ + +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { SorobanEventReplayProcessor } from '../soroban-event-replay-processor'; +import { InMemoryEventStorage } from '../event-storage'; +import { createReplayQuery, ReplayQueryPresets } from '../query-builder'; +import type { + StoredRawBridgeEvent, + StoredNormalizedBridgeEvent, + ReplayFilterCriteria, +} from '../types'; + +describe('Soroban Event Replay Processor', () => { + let processor: SorobanEventReplayProcessor; + let testEvents: StoredRawBridgeEvent[]; + + beforeEach(() => { + processor = new SorobanEventReplayProcessor({ verbose: false }); + testEvents = createTestEvents(); + }); + + afterEach(() => { + processor.removeAllListeners(); + }); + + describe('Event Storage', () => { + it('should store a single event', async () => { + const event = testEvents[0]; + await processor.storeEvent(event); + + const result = await processor.queryEvents({ limit: 10 }); + expect(result.events).toContainEqual(expect.objectContaining({ id: event.id })); + }); + + it('should store multiple events', async () => { + await processor.storeEvents(testEvents); + + const result = await processor.queryEvents({ limit: 100 }); + expect(result.events.length).toBe(testEvents.length); + expect(result.total).toBe(testEvents.length); + }); + + it('should retrieve stored events', async () => { + await processor.storeEvents(testEvents); + + const event = testEvents[0]; + const result = await processor.queryEvents({ transactionHash: event.transactionHash }); + + expect(result.events.length).toBeGreaterThan(0); + }); + + it('should emit event-stored event', async () => { + const eventListener = vi.fn(); + processor.on('event-stored', eventListener); + + await processor.storeEvent(testEvents[0]); + + expect(eventListener).toHaveBeenCalled(); + }); + }); + + describe('Event Querying', () => { + beforeEach(async () => { + await processor.storeEvents(testEvents); + }); + + it('should query by time range', async () => { + const now = new Date(); + const yesterday = new Date(now.getTime() - 24 * 60 * 60 * 1000); + + const result = await processor.queryEvents({ + startTime: yesterday, + endTime: now, + }); + + expect(result.events.length).toBeGreaterThan(0); + }); + + it('should query by event type', async () => { + const result = await processor.queryEvents({ eventTypes: ['transfer'] }); + expect(result.events.every((e) => e.type === 'transfer')).toBe(true); + }); + + it('should query by contract ID', async () => { + const contractId = testEvents[0].contractId; + const result = await processor.queryEvents({ contractId }); + + expect(result.events.length).toBeGreaterThan(0); + expect(result.events.every((e) => e.contractId === contractId)).toBe(true); + }); + + it('should query by address', async () => { + const result = await processor.queryEvents({ + addresses: ['GBUQWP3BOUZX34ULNQG23RQ6F4PFXKEC6FMBLIOTXHLHAPXVUJP3ORCZ'], + }); + + expect(result.events.length).toBeGreaterThan(0); + }); + + it('should support pagination', async () => { + const result1 = await processor.queryEvents({ limit: 2, offset: 0 }); + const result2 = await processor.queryEvents({ limit: 2, offset: 2 }); + + expect(result1.events.length).toBe(2); + expect(result2.events.length).toBeGreaterThanOrEqual(0); + expect(result1.hasMore || result2.events.length > 0).toBe(true); + }); + + it('should filter by custom predicate', async () => { + const result = await processor.queryEvents({ + predicate: (event) => event.type === 'transfer', + }); + + expect(result.events.every((e) => e.type === 'transfer')).toBe(true); + }); + }); + + describe('Event Replay', () => { + beforeEach(async () => { + await processor.storeEvents(testEvents); + }); + + it('should replay events to registered listeners', async () => { + const listener = vi.fn(); + processor.registerListener(listener); + + const result = await processor.startReplay({ eventTypes: ['transfer'] }, { mode: 'fast' }); + + expect(listener).toHaveBeenCalled(); + expect(result.success).toBe(true); + expect(result.statistics.replayedEvents).toBeGreaterThan(0); + }); + + it('should support fast mode replay', async () => { + const listener = vi.fn(); + processor.registerListener(listener); + + const startTime = Date.now(); + const result = await processor.startReplay({}, { mode: 'fast', dryRun: false }); + const durationMs = Date.now() - startTime; + + expect(result.statistics.durationMs).toBeLessThan(5000); + expect(listener).toHaveBeenCalled(); + }); + + it('should support throttled mode replay', async () => { + const listener = vi.fn(); + processor.registerListener(listener); + + const startTime = Date.now(); + await processor.startReplay({ limit: 3 }, { mode: 'throttled', delayMs: 100 }); + const durationMs = Date.now() - startTime; + + expect(durationMs).toBeGreaterThanOrEqual(200); // At least 2 delays + expect(listener).toHaveBeenCalled(); + }); + + it('should support dry-run mode', async () => { + const listener = vi.fn(); + processor.registerListener(listener); + + const result = await processor.startReplay({}, { mode: 'fast', dryRun: true }); + + expect(listener).not.toHaveBeenCalled(); + expect(result.statistics.replayedEvents).toBeGreaterThan(0); // Still counts + }); + + it('should handle errors gracefully', async () => { + let errorCount = 0; + processor.registerListener(() => { + if (errorCount++ === 1) { + throw new Error('Test error'); + } + }); + + const result = await processor.startReplay({}, { continueOnError: true }); + + expect(result.statistics.failedEvents).toBeGreaterThan(0); + }); + + it('should stop on error when continueOnError is false', async () => { + let callCount = 0; + processor.registerListener(() => { + callCount++; + if (callCount === 2) { + throw new Error('Test error'); + } + }); + + const result = await processor.startReplay({}, { continueOnError: false }); + + expect(result.success).toBe(false); + expect(callCount).toBe(2); + }); + + it('should track replay progress', async () => { + let lastProgress = 0; + processor.on('replay-progress', ({ progress }) => { + expect(progress).toBeGreaterThanOrEqual(lastProgress); + lastProgress = progress; + }); + + await processor.startReplay({}, { mode: 'fast' }); + + expect(lastProgress).toBe(100); + }); + + it('should return replay statistics', async () => { + const listener = vi.fn(); + processor.registerListener(listener); + + const result = await processor.startReplay({}, { mode: 'fast' }); + + expect(result.statistics).toHaveProperty('sessionId'); + expect(result.statistics).toHaveProperty('totalEvents'); + expect(result.statistics).toHaveProperty('replayedEvents'); + expect(result.statistics).toHaveProperty('failureCount'); + expect(result.statistics).toHaveProperty('durationMs'); + expect(result.statistics).toHaveProperty('eventsPerSecond'); + }); + }); + + describe('Replay Sessions', () => { + it('should manage active sessions', async () => { + await processor.storeEvents(testEvents); + + const activeBefore = processor.getActiveSessions(); + const result = await processor.startReplay({}); + const activeAfter = processor.getActiveSessions(); + + expect(activeAfter.length).toBeLessThanOrEqual(activeBefore.length); + expect(result.sessionId).toBeDefined(); + }); + + it('should retrieve session history', async () => { + await processor.storeEvents(testEvents); + + const result1 = await processor.startReplay({}); + const history = processor.getSessionHistory(result1.sessionId); + + expect(history).toBeDefined(); + expect(history?.sessionId).toBe(result1.sessionId); + }); + + it('should emit replay-started event', async () => { + await processor.storeEvents(testEvents); + + const listener = vi.fn(); + processor.on('replay-started', listener); + + await processor.startReplay({}); + + expect(listener).toHaveBeenCalled(); + }); + + it('should emit replay-completed event', async () => { + await processor.storeEvents(testEvents); + + const listener = vi.fn(); + processor.on('replay-completed', listener); + + await processor.startReplay({}); + + expect(listener).toHaveBeenCalled(); + }); + }); + + describe('Query Builder', () => { + beforeEach(async () => { + await processor.storeEvents(testEvents); + }); + + it('should build query with time range', async () => { + const now = new Date(); + const yesterday = new Date(now.getTime() - 24 * 60 * 60 * 1000); + + const query = createReplayQuery().between(yesterday, now).build(); + const result = await processor.queryEvents(query); + + expect(result.events.length).toBeGreaterThan(0); + }); + + it('should build query with types', async () => { + const query = createReplayQuery().ofTypes('transfer', 'mint').build(); + const result = await processor.queryEvents(query); + + expect(result.events.every((e) => e.type === 'transfer' || e.type === 'mint')).toBe(true); + }); + + it('should build query with pagination', async () => { + const query = createReplayQuery().limit(5).offset(0).build(); + const result = await processor.queryEvents(query); + + expect(result.events.length).toBeLessThanOrEqual(5); + }); + + it('should chain methods', async () => { + const query = createReplayQuery() + .ofType('transfer') + .lastDays(1) + .limit(10) + .build(); + + const result = await processor.queryEvents(query); + + expect(result.events.length).toBeGreaterThanOrEqual(0); + }); + + it('should use query presets', async () => { + const query = ReplayQueryPresets.transfers().build(); + const result = await processor.queryEvents(query); + + expect(result.events.every((e) => e.type === 'transfer')).toBe(true); + }); + }); + + describe('Storage Stats', () => { + it('should retrieve storage statistics', async () => { + await processor.storeEvents(testEvents); + + const stats = await processor.getStorageStats(); + + expect(stats).toHaveProperty('totalEvents'); + expect(stats).toHaveProperty('eventsByType'); + expect(stats).toHaveProperty('eventsByContract'); + expect(stats).toHaveProperty('storageSize'); + }); + + it('should track events by type', async () => { + await processor.storeEvents(testEvents); + + const stats = await processor.getStorageStats(); + + expect(stats.eventsByType['transfer']).toBeGreaterThan(0); + }); + + it('should track events by contract', async () => { + await processor.storeEvents(testEvents); + + const stats = await processor.getStorageStats(); + + expect(Object.keys(stats.eventsByContract).length).toBeGreaterThan(0); + }); + }); + + describe('Listener Management', () => { + it('should register and unregister listeners', async () => { + const listener = vi.fn(); + const unregister = processor.registerListener(listener); + + await processor.storeEvents(testEvents); + await processor.startReplay({}, { mode: 'fast' }); + + expect(listener).toHaveBeenCalled(); + + unregister(); + + listener.mockClear(); + await processor.startReplay({}, { mode: 'fast' }); + + expect(listener).not.toHaveBeenCalled(); + }); + + it('should support multiple listeners', async () => { + const listener1 = vi.fn(); + const listener2 = vi.fn(); + + processor.registerListener(listener1); + processor.registerListener(listener2); + + await processor.storeEvents(testEvents); + await processor.startReplay({}, { mode: 'fast' }); + + expect(listener1).toHaveBeenCalled(); + expect(listener2).toHaveBeenCalled(); + }); + + it('should support error listeners', async () => { + const errorListener = vi.fn(); + processor.registerErrorListener(errorListener); + + processor.registerListener(() => { + throw new Error('Test error'); + }); + + await processor.storeEvents(testEvents.slice(0, 2)); + await processor.startReplay({}, { continueOnError: true }); + + expect(errorListener).toHaveBeenCalled(); + }); + }); +}); + +/** + * Helper: Create test events + */ +function createTestEvents(): StoredRawBridgeEvent[] { + const now = Date.now(); + const contractId = 'CADSCVF4MFXTV2HFYGDYXFCL3IUJG5LPNWGZYHXU6QTHGDQBGIXW5KQ'; + + return [ + { + id: 'evt-1', + timestamp: new Date(now - 3600000), + source: 'test-bridge', + type: 'transfer', + payload: { amount: '1000000000' }, + contractId, + transactionHash: 'txn-1', + sequenceNumber: 100, + ledgerSequence: 50000, + sourceAccount: 'GBUQWP3BOUZX34ULNQG23RQ6F4PFXKEC6FMBLIOTXHLHAPXVUJP3ORCZ', + }, + { + id: 'evt-2', + timestamp: new Date(now - 1800000), + source: 'test-bridge', + type: 'transfer', + payload: { amount: '2000000000' }, + contractId, + transactionHash: 'txn-2', + sequenceNumber: 101, + ledgerSequence: 50001, + sourceAccount: 'GBUQWP3BOUZX34ULNQG23RQ6F4PFXKEC6FMBLIOTXHLHAPXVUJP3ORCZ', + }, + { + id: 'evt-3', + timestamp: new Date(now - 900000), + source: 'test-bridge', + type: 'mint', + payload: { amount: '500000000' }, + contractId, + transactionHash: 'txn-3', + sequenceNumber: 102, + ledgerSequence: 50002, + sourceAccount: 'GBUQWP3BOUZX34ULNQG23RQ6F4PFXKEC6FMBLIOTXHLHAPXVUJP3ORCZ', + }, + { + id: 'evt-4', + timestamp: new Date(now), + source: 'test-bridge', + type: 'transfer', + payload: { amount: '3000000000' }, + contractId, + transactionHash: 'txn-4', + sequenceNumber: 103, + ledgerSequence: 50003, + sourceAccount: 'GBUQWP3BOUZX34ULNQG23RQ6F4PFXKEC6FMBLIOTXHLHAPXVUJP3ORCZ', + }, + ]; +} diff --git a/src/replay/events/stellar/event-storage.ts b/src/replay/events/stellar/event-storage.ts new file mode 100644 index 0000000..cee2592 --- /dev/null +++ b/src/replay/events/stellar/event-storage.ts @@ -0,0 +1,400 @@ +/** + * In-Memory Event Storage Backend + * Efficient storage and querying of bridge events + */ + +import type { + EventStorageBackend, + StoredBridgeEvent, + ReplayFilterCriteria, + EventQueryResult, + InMemoryEventStorageConfig, + ReplayableEventType, +} from './types'; + +const DEFAULT_CONFIG: Required = { + maxEvents: 100_000, + ttl: 30 * 24 * 60 * 60 * 1000, // 30 days + autoCleanup: true, + cleanupInterval: 24 * 60 * 60 * 1000, // 24 hours + enableIndexing: true, +}; + +/** + * In-memory event storage with efficient querying and indexing + */ +export class InMemoryEventStorage implements EventStorageBackend { + private events = new Map(); + private config: Required; + private cleanupTimer: ReturnType | null = null; + + // Indexes for faster queries + private indexes = { + byType: new Map>(), + byContract: new Map>(), + byFromAddress: new Map>(), + byToAddress: new Map>(), + byTransactionHash: new Map>(), + bySourceAccount: new Map>(), + creationTimes: new Map(), + }; + + constructor(config: InMemoryEventStorageConfig = {}) { + this.config = { ...DEFAULT_CONFIG, ...config }; + + if (this.config.autoCleanup) { + this.startCleanup(); + } + } + + /** + * Store a single event + */ + async store(event: StoredBridgeEvent): Promise { + if (this.events.size >= this.config.maxEvents) { + this.evictOldest(); + } + + this.events.set(event.id, event); + this.updateIndexes(event, true); + } + + /** + * Store multiple events + */ + async storeMultiple(events: StoredBridgeEvent[]): Promise { + for (const event of events) { + await this.store(event); + } + } + + /** + * Query events with filters + */ + async query(filter: ReplayFilterCriteria): Promise { + let results = this.filterEvents(filter); + + // Apply pagination + const offset = filter.offset || 0; + const limit = filter.limit || 1000; + const total = results.length; + const hasMore = offset + limit < total; + + const events = results.slice(offset, offset + limit); + + return { + events, + total, + limit, + offset, + hasMore, + }; + } + + /** + * Get a single event by ID + */ + async get(eventId: string): Promise { + return this.events.get(eventId) || null; + } + + /** + * Get events by transaction hash + */ + async getByTransactionHash(hash: string): Promise { + const eventIds = this.indexes.byTransactionHash.get(hash) || new Set(); + return Array.from(eventIds) + .map((id) => this.events.get(id)) + .filter((event): event is StoredBridgeEvent => !!event); + } + + /** + * Delete a single event + */ + async delete(eventId: string): Promise { + const event = this.events.get(eventId); + if (!event) { + return false; + } + + this.updateIndexes(event, false); + this.events.delete(eventId); + return true; + } + + /** + * Delete events in a time range + */ + async deleteRange(startTime: Date, endTime: Date): Promise { + const startMs = startTime.getTime(); + const endMs = endTime.getTime(); + let deleted = 0; + + for (const [id, event] of this.events.entries()) { + const eventTime = event.timestamp.getTime(); + if (eventTime >= startMs && eventTime <= endMs) { + this.updateIndexes(event, false); + this.events.delete(id); + deleted++; + } + } + + return deleted; + } + + /** + * Count events matching filter + */ + async count(filter?: ReplayFilterCriteria): Promise { + if (!filter) { + return this.events.size; + } + return this.filterEvents(filter).length; + } + + /** + * Get total storage size in bytes + */ + async getStorageSize(): Promise { + let size = 0; + for (const event of this.events.values()) { + size += JSON.stringify(event).length; + } + return size; + } + + /** + * Cleanup storage + */ + async cleanup(): Promise { + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + this.cleanupTimer = null; + } + + this.events.clear(); + this.clearIndexes(); + } + + /** + * Private helper methods + */ + + private filterEvents(filter: ReplayFilterCriteria): StoredBridgeEvent[] { + let results = Array.from(this.events.values()); + + // Time-based filtering + if (filter.startTime) { + const startMs = filter.startTime.getTime(); + results = results.filter((e) => e.timestamp.getTime() >= startMs); + } + if (filter.endTime) { + const endMs = filter.endTime.getTime(); + results = results.filter((e) => e.timestamp.getTime() <= endMs); + } + + // Event type filtering + if (filter.eventTypes && filter.eventTypes.length > 0) { + const types = new Set(filter.eventTypes); + results = results.filter((e) => types.has(e.type)); + } + + // Address filtering + if (filter.fromAddress) { + results = results.filter((e) => 'from' in e && e.from === filter.fromAddress); + } + if (filter.toAddress) { + results = results.filter((e) => 'to' in e && e.to === filter.toAddress); + } + if (filter.addresses && filter.addresses.length > 0) { + const addresses = new Set(filter.addresses); + results = results.filter( + (e) => + ('from' in e && addresses.has(e.from || '')) || + ('to' in e && addresses.has(e.to || '')), + ); + } + + // Contract filtering + if (filter.contractId) { + results = results.filter((e) => e.contractId === filter.contractId); + } + if (filter.contractIds && filter.contractIds.length > 0) { + const contracts = new Set(filter.contractIds); + results = results.filter((e) => e.contractId && contracts.has(e.contractId)); + } + + // Transaction filtering + if (filter.transactionHash) { + results = results.filter((e) => e.transactionHash === filter.transactionHash); + } + if (filter.sequenceNumber !== undefined) { + results = results.filter((e) => e.sequenceNumber === filter.sequenceNumber); + } + if (filter.ledgerSequence !== undefined) { + results = results.filter((e) => e.ledgerSequence === filter.ledgerSequence); + } + if (filter.sourceAccount) { + results = results.filter((e) => e.sourceAccount === filter.sourceAccount); + } + + // Amount filtering + if (filter.minAmount || filter.maxAmount) { + results = results.filter((e) => { + if (!('amount' in e) || !e.amount) return false; + const amount = BigInt(e.amount || '0'); + if (filter.minAmount && amount < BigInt(filter.minAmount)) return false; + if (filter.maxAmount && amount > BigInt(filter.maxAmount)) return false; + return true; + }); + } + + // Asset filtering + if (filter.asset) { + results = results.filter((e) => 'asset' in e && e.asset === filter.asset); + } + if (filter.assets && filter.assets.length > 0) { + const assets = new Set(filter.assets); + results = results.filter((e) => 'asset' in e && e.asset && assets.has(e.asset)); + } + + // Metadata filtering + if (filter.hasMetadata) { + results = results.filter((e) => e.metadata && Object.keys(e.metadata).length > 0); + } + if (filter.metadataMatches) { + results = results.filter((e) => this.matchesMetadata(e.metadata || {}, filter.metadataMatches)); + } + + // Custom predicate + if (filter.predicate) { + results = results.filter(filter.predicate); + } + + // Sort by timestamp (newest first) + results.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime()); + + return results; + } + + private matchesMetadata(metadata: Record, pattern: Record): boolean { + for (const [key, value] of Object.entries(pattern)) { + if (typeof value === 'object' && value !== null) { + if (!this.matchesMetadata(metadata[key] || {}, value)) { + return false; + } + } else if (metadata[key] !== value) { + return false; + } + } + return true; + } + + private updateIndexes(event: StoredBridgeEvent, add: boolean): void { + if (!this.config.enableIndexing) return; + + const operation = add ? 'add' : 'delete'; + + // Index by type + this.updateIndex(this.indexes.byType, event.type, event.id, operation); + + // Index by contract + if (event.contractId) { + this.updateIndex(this.indexes.byContract, event.contractId, event.id, operation); + } + + // Index by addresses (for normalized events) + if ('from' in event && event.from) { + this.updateIndex(this.indexes.byFromAddress, event.from, event.id, operation); + } + if ('to' in event && event.to) { + this.updateIndex(this.indexes.byToAddress, event.to, event.id, operation); + } + + // Index by transaction hash + if (event.transactionHash) { + this.updateIndex(this.indexes.byTransactionHash, event.transactionHash, event.id, operation); + } + + // Index by source account + if (event.sourceAccount) { + this.updateIndex(this.indexes.bySourceAccount, event.sourceAccount, event.id, operation); + } + + // Track creation time + if (add) { + this.indexes.creationTimes.set(event.id, event.timestamp.getTime()); + } else { + this.indexes.creationTimes.delete(event.id); + } + } + + private updateIndex( + index: Map>, + key: string, + eventId: string, + operation: 'add' | 'delete', + ): void { + if (operation === 'add') { + if (!index.has(key)) { + index.set(key, new Set()); + } + index.get(key)!.add(eventId); + } else { + const set = index.get(key); + if (set) { + set.delete(eventId); + if (set.size === 0) { + index.delete(key); + } + } + } + } + + private clearIndexes(): void { + this.indexes.byType.clear(); + this.indexes.byContract.clear(); + this.indexes.byFromAddress.clear(); + this.indexes.byToAddress.clear(); + this.indexes.byTransactionHash.clear(); + this.indexes.bySourceAccount.clear(); + this.indexes.creationTimes.clear(); + } + + private evictOldest(): void { + let oldestId: string | null = null; + let oldestTime = Infinity; + + for (const [id, time] of this.indexes.creationTimes.entries()) { + if (time < oldestTime) { + oldestTime = time; + oldestId = id; + } + } + + if (oldestId) { + const event = this.events.get(oldestId); + if (event) { + this.updateIndexes(event, false); + this.events.delete(oldestId); + } + } + } + + private startCleanup(): void { + this.cleanupTimer = setInterval(() => { + const now = Date.now(); + const cutoff = now - this.config.ttl; + + for (const [id, time] of this.indexes.creationTimes.entries()) { + if (time < cutoff) { + const event = this.events.get(id); + if (event) { + this.updateIndexes(event, false); + this.events.delete(id); + } + } + } + }, this.config.cleanupInterval); + } +} diff --git a/src/replay/events/stellar/index.ts b/src/replay/events/stellar/index.ts new file mode 100644 index 0000000..adb1417 --- /dev/null +++ b/src/replay/events/stellar/index.ts @@ -0,0 +1,29 @@ +/** + * Soroban Event Replay Module + * Main exports for event replay functionality + */ + +export { SorobanEventReplayProcessor } from './soroban-event-replay-processor'; +export { InMemoryEventStorage } from './event-storage'; +export { ReplayQueryBuilder, createReplayQuery, ReplayQueryPresets } from './query-builder'; + +export type { + ReplayableEventType, + StoredRawBridgeEvent, + StoredNormalizedBridgeEvent, + StoredBridgeEvent, + ReplayFilterCriteria, + ReplayMode, + ReplayOptions, + ReplaySession, + ReplayError, + ReplayStatistics, + ReplayResult, + EventQueryResult, + EventReplayListener, + EventReplayErrorListener, + EventStorageBackend, + InMemoryEventStorageConfig, + SorobanEventReplayProcessorConfig, + EventAggregationStats, +} from './types'; diff --git a/src/replay/events/stellar/query-builder.ts b/src/replay/events/stellar/query-builder.ts new file mode 100644 index 0000000..17bcd0e --- /dev/null +++ b/src/replay/events/stellar/query-builder.ts @@ -0,0 +1,336 @@ +/** + * Event Replay Query Builder + * Fluent API for building replay filter criteria + */ + +import type { ReplayFilterCriteria, ReplayableEventType } from './types'; + +/** + * Fluent query builder for replay filters + */ +export class ReplayQueryBuilder { + private criteria: ReplayFilterCriteria = {}; + + /** + * Filter by start time + */ + since(date: Date): this { + this.criteria.startTime = date; + return this; + } + + /** + * Filter by end time + */ + until(date: Date): this { + this.criteria.endTime = date; + return this; + } + + /** + * Filter by time range + */ + between(startDate: Date, endDate: Date): this { + this.criteria.startTime = startDate; + this.criteria.endTime = endDate; + return this; + } + + /** + * Filter by last N milliseconds + */ + lastMs(ms: number): this { + this.criteria.startTime = new Date(Date.now() - ms); + this.criteria.endTime = new Date(); + return this; + } + + /** + * Filter by last N hours + */ + lastHours(hours: number): this { + return this.lastMs(hours * 60 * 60 * 1000); + } + + /** + * Filter by last N days + */ + lastDays(days: number): this { + return this.lastMs(days * 24 * 60 * 60 * 1000); + } + + /** + * Filter by event types + */ + ofTypes(...types: ReplayableEventType[]): this { + this.criteria.eventTypes = types; + return this; + } + + /** + * Filter by single event type + */ + ofType(type: ReplayableEventType): this { + this.criteria.eventTypes = [type]; + return this; + } + + /** + * Filter by from address + */ + from(address: string): this { + this.criteria.fromAddress = address; + return this; + } + + /** + * Filter by to address + */ + to(address: string): this { + this.criteria.toAddress = address; + return this; + } + + /** + * Filter by addresses (either from or to) + */ + withAddresses(...addresses: string[]): this { + this.criteria.addresses = addresses; + return this; + } + + /** + * Filter by contract ID + */ + forContract(contractId: string): this { + this.criteria.contractId = contractId; + return this; + } + + /** + * Filter by multiple contract IDs + */ + forContracts(...contractIds: string[]): this { + this.criteria.contractIds = contractIds; + return this; + } + + /** + * Filter by transaction hash + */ + withTransactionHash(hash: string): this { + this.criteria.transactionHash = hash; + return this; + } + + /** + * Filter by sequence number + */ + withSequenceNumber(seqNum: number): this { + this.criteria.sequenceNumber = seqNum; + return this; + } + + /** + * Filter by ledger sequence + */ + inLedger(ledgerSeq: number): this { + this.criteria.ledgerSequence = ledgerSeq; + return this; + } + + /** + * Filter by source account + */ + fromAccount(account: string): this { + this.criteria.sourceAccount = account; + return this; + } + + /** + * Filter by minimum amount + */ + minAmount(amount: string | number): this { + this.criteria.minAmount = amount.toString(); + return this; + } + + /** + * Filter by maximum amount + */ + maxAmount(amount: string | number): this { + this.criteria.maxAmount = amount.toString(); + return this; + } + + /** + * Filter by amount range + */ + amountBetween(min: string | number, max: string | number): this { + this.criteria.minAmount = min.toString(); + this.criteria.maxAmount = max.toString(); + return this; + } + + /** + * Filter by asset + */ + withAsset(asset: string): this { + this.criteria.asset = asset; + return this; + } + + /** + * Filter by multiple assets + */ + withAssets(...assets: string[]): this { + this.criteria.assets = assets; + return this; + } + + /** + * Filter by metadata presence + */ + withMetadata(): this { + this.criteria.hasMetadata = true; + return this; + } + + /** + * Filter by metadata values + */ + whereMetadata(pattern: Record): this { + this.criteria.metadataMatches = pattern; + return this; + } + + /** + * Custom filter predicate + */ + where(predicate: (event: any) => boolean): this { + this.criteria.predicate = predicate; + return this; + } + + /** + * Set result limit + */ + limit(limit: number): this { + this.criteria.limit = limit; + return this; + } + + /** + * Set result offset + */ + offset(offset: number): this { + this.criteria.offset = offset; + return this; + } + + /** + * Set pagination + */ + page(pageNumber: number, pageSize: number = 100): this { + this.criteria.limit = pageSize; + this.criteria.offset = (pageNumber - 1) * pageSize; + return this; + } + + /** + * Build the filter criteria + */ + build(): ReplayFilterCriteria { + return { ...this.criteria }; + } + + /** + * Reset the builder + */ + reset(): this { + this.criteria = {}; + return this; + } + + /** + * Clone the builder + */ + clone(): ReplayQueryBuilder { + const builder = new ReplayQueryBuilder(); + builder.criteria = { ...this.criteria }; + if (this.criteria.eventTypes) { + builder.criteria.eventTypes = [...this.criteria.eventTypes]; + } + if (this.criteria.addresses) { + builder.criteria.addresses = [...this.criteria.addresses]; + } + if (this.criteria.contractIds) { + builder.criteria.contractIds = [...this.criteria.contractIds]; + } + if (this.criteria.assets) { + builder.criteria.assets = [...this.criteria.assets]; + } + return builder; + } +} + +/** + * Create a new replay query builder + */ +export function createReplayQuery(): ReplayQueryBuilder { + return new ReplayQueryBuilder(); +} + +/** + * Common query presets + */ +export const ReplayQueryPresets = { + /** + * Get all transfer events + */ + transfers: () => createReplayQuery().ofType('transfer'), + + /** + * Get all failed transactions from last 24 hours + */ + recentFailures: () => + createReplayQuery().lastDays(1).ofTypes('unknown'), + + /** + * Get events for a specific contract + */ + contractEvents: (contractId: string) => + createReplayQuery().forContract(contractId), + + /** + * Get events for a specific address (as sender or receiver) + */ + addressEvents: (address: string) => + createReplayQuery().withAddresses(address), + + /** + * Get high-value transfers + */ + highValueTransfers: (minAmount: string) => + createReplayQuery() + .ofType('transfer') + .minAmount(minAmount), + + /** + * Get events from a specific ledger + */ + ledgerEvents: (ledgerSeq: number) => + createReplayQuery().inLedger(ledgerSeq), + + /** + * Get last N events + */ + recent: (count: number = 100) => + createReplayQuery().limit(count), + + /** + * Get events with errors + */ + withErrors: () => + createReplayQuery().withMetadata().whereMetadata({ error: true }), +}; diff --git a/src/replay/events/stellar/soroban-event-replay-processor.ts b/src/replay/events/stellar/soroban-event-replay-processor.ts new file mode 100644 index 0000000..a5138ba --- /dev/null +++ b/src/replay/events/stellar/soroban-event-replay-processor.ts @@ -0,0 +1,482 @@ +/** + * Soroban Event Replay Processor + * Manages event storage and replay for debugging and analytics + */ + +import { EventEmitter } from 'events'; +import { randomUUID } from 'crypto'; +import type { + EventStorageBackend, + StoredBridgeEvent, + StoredRawBridgeEvent, + StoredNormalizedBridgeEvent, + ReplayFilterCriteria, + ReplayOptions, + ReplaySession, + ReplayResult, + ReplayStatistics, + ReplayError, + EventReplayListener, + EventReplayErrorListener, + SorobanEventReplayProcessorConfig, + EventAggregationStats, +} from './types'; +import { InMemoryEventStorage } from './event-storage'; + +const DEFAULT_CONFIG: Required = { + storageBackend: new InMemoryEventStorage(), + inMemoryStorage: {}, + defaultReplayOptions: { + mode: 'fast', + delayMs: 0, + continueOnError: true, + preserveTimestamps: true, + normalize: false, + dryRun: false, + }, + eventRetentionMs: 30 * 24 * 60 * 60 * 1000, // 30 days + cleanupIntervalMs: 24 * 60 * 60 * 1000, // 24 hours + maxConcurrentReplays: 5, + verbose: false, + onError: console.error, + onSessionCompleted: undefined, +}; + +/** + * Main Soroban Event Replay Processor + */ +export class SorobanEventReplayProcessor extends EventEmitter { + private config: Required; + private storage: EventStorageBackend; + private listeners = new Set(); + private errorListeners = new Set(); + private activeSessions = new Map(); + private sessionHistory = new Map(); + private concurrentReplays = 0; + private replayQueue: Array<() => Promise> = []; + + constructor(config: SorobanEventReplayProcessorConfig = {}) { + super(); + this.config = { + ...DEFAULT_CONFIG, + ...config, + inMemoryStorage: { + ...DEFAULT_CONFIG.inMemoryStorage, + ...config.inMemoryStorage, + }, + defaultReplayOptions: { + ...DEFAULT_CONFIG.defaultReplayOptions, + ...config.defaultReplayOptions, + }, + }; + + // Use provided backend or create in-memory storage + this.storage = config.storageBackend || new InMemoryEventStorage(this.config.inMemoryStorage); + } + + /** + * Store a single event + */ + async storeEvent(event: StoredBridgeEvent): Promise { + try { + await this.storage.store(event); + this.emit('event-stored', event); + } catch (error) { + this.handleError(error); + } + } + + /** + * Store multiple events + */ + async storeEvents(events: StoredBridgeEvent[]): Promise { + try { + await this.storage.storeMultiple(events); + this.emit('events-stored', { count: events.length }); + } catch (error) { + this.handleError(error); + } + } + + /** + * Register a replay listener + */ + registerListener(listener: EventReplayListener): () => void { + this.listeners.add(listener); + this.emit('listener-registered', { count: this.listeners.size }); + + // Return unregister function + return () => { + this.listeners.delete(listener); + this.emit('listener-unregistered', { count: this.listeners.size }); + }; + } + + /** + * Register an error listener + */ + registerErrorListener(listener: EventReplayErrorListener): () => void { + this.errorListeners.add(listener); + return () => { + this.errorListeners.delete(listener); + }; + } + + /** + * Query stored events + */ + async queryEvents(filter: ReplayFilterCriteria): Promise { + return this.storage.query(filter); + } + + /** + * Start an event replay session + */ + async startReplay( + filter: ReplayFilterCriteria, + options: ReplayOptions = {}, + ): Promise { + const mergedOptions = { ...this.config.defaultReplayOptions, ...options }; + + // Create session + const sessionId = randomUUID(); + const session: ReplaySession = { + id: sessionId, + startTime: new Date(), + status: 'active', + filter, + options: mergedOptions, + totalEvents: 0, + replayedEvents: 0, + failedEvents: 0, + skippedEvents: 0, + errors: [], + progress: 0, + }; + + this.activeSessions.set(sessionId, session); + this.emit('replay-started', { sessionId, filter, options: mergedOptions }); + + // Wait for queue slot if needed + if (this.concurrentReplays >= this.config.maxConcurrentReplays) { + await new Promise((resolve) => { + this.replayQueue.push(() => this.executeReplay(session, mergedOptions).then(() => resolve())); + }); + } else { + await this.executeReplay(session, mergedOptions); + } + + // Generate result + const result = this.generateReplayResult(session); + this.sessionHistory.set(sessionId, result); + this.activeSessions.delete(sessionId); + + // Emit completion + this.emit('replay-completed', result); + if (this.config.onSessionCompleted) { + this.config.onSessionCompleted(result); + } + + return result; + } + + /** + * Pause a replay session + */ + pauseReplay(sessionId: string): boolean { + const session = this.activeSessions.get(sessionId); + if (!session || session.status !== 'active') { + return false; + } + + session.status = 'paused'; + this.emit('replay-paused', { sessionId }); + return true; + } + + /** + * Resume a replay session + */ + async resumeReplay(sessionId: string): Promise { + const session = this.activeSessions.get(sessionId); + if (!session || session.status !== 'paused') { + return false; + } + + session.status = 'active'; + this.emit('replay-resumed', { sessionId }); + // Resume could be implemented with complex session state tracking + return true; + } + + /** + * Cancel a replay session + */ + cancelReplay(sessionId: string): boolean { + const session = this.activeSessions.get(sessionId); + if (!session || session.status !== 'active') { + return false; + } + + session.status = 'failed'; + this.emit('replay-cancelled', { sessionId }); + return true; + } + + /** + * Get session statistics + */ + getSessionStats(sessionId: string): ReplaySession | null { + return this.activeSessions.get(sessionId) || null; + } + + /** + * Get session history + */ + getSessionHistory(sessionId: string): ReplayResult | null { + return this.sessionHistory.get(sessionId) || null; + } + + /** + * Get all active sessions + */ + getActiveSessions(): ReplaySession[] { + return Array.from(this.activeSessions.values()); + } + + /** + * Cleanup old events + */ + async cleanupOldEvents(): Promise { + const cutoffTime = new Date(Date.now() - this.config.eventRetentionMs); + return this.storage.deleteRange(new Date(0), cutoffTime); + } + + /** + * Get event storage statistics + */ + async getStorageStats(): Promise { + const result = await this.storage.query({}); + const events = result.events; + + const statsByType: Record = {}; + const statsByContract: Record = {}; + + for (const event of events) { + // Count by type + statsByType[event.type] = (statsByType[event.type] || 0) + 1; + + // Count by contract + if (event.contractId) { + statsByContract[event.contractId] = (statsByContract[event.contractId] || 0) + 1; + } + } + + const storageSize = await this.storage.getStorageSize(); + const dates = events.map((e) => e.timestamp.getTime()).sort((a, b) => a - b); + + return { + totalEvents: events.length, + eventsByType: statsByType, + eventsByContract: statsByContract, + dateRange: { + oldest: new Date(dates[0] || 0), + newest: new Date(dates[dates.length - 1] || 0), + }, + storageSize, + }; + } + + /** + * Delete events + */ + async deleteEvent(eventId: string): Promise { + return this.storage.delete(eventId); + } + + /** + * Private helper methods + */ + + private async executeReplay( + session: ReplaySession, + options: Required, + ): Promise { + this.concurrentReplays++; + + try { + // Query events + const queryResult = await this.storage.query(session.filter); + session.totalEvents = queryResult.total; + + if (options.dryRun) { + session.replayedEvents = queryResult.events.length; + return; + } + + // Replay events + const events = queryResult.events.slice(0, options.maxEvents); + + for (let i = 0; i < events.length; i++) { + if (session.status !== 'active') { + break; + } + + const event = events[i]; + + try { + // Apply delay for real-time and throttled modes + if (options.mode === 'real-time' || options.mode === 'throttled') { + if (i > 0) { + const timeDiff = events[i - 1].timestamp.getTime() - event.timestamp.getTime(); + const delay = options.mode === 'throttled' ? options.delayMs : Math.abs(timeDiff); + await this.sleep(Math.min(delay, 60_000)); // Cap at 1 minute + } + } else if (options.mode === 'throttled' && options.delayMs > 0) { + await this.sleep(options.delayMs); + } + + // Notify listeners + await this.notifyListeners(event, i, events.length, session.id); + session.replayedEvents++; + } catch (error: any) { + session.failedEvents++; + + const replayError: ReplayError = { + eventId: event.id, + error: error?.message || String(error), + timestamp: new Date(), + recoverable: options.continueOnError, + }; + + session.errors.push(replayError); + this.notifyErrorListeners(replayError); + + if (!options.continueOnError) { + session.status = 'failed'; + throw error; + } + } + + // Update progress + session.progress = Math.round(((i + 1) / events.length) * 100); + this.emit('replay-progress', { sessionId: session.id, progress: session.progress }); + } + + session.status = 'completed'; + } catch (error) { + session.status = 'failed'; + this.handleError(error); + } finally { + session.endTime = new Date(); + this.concurrentReplays--; + + // Process queue + if (this.replayQueue.length > 0) { + const next = this.replayQueue.shift(); + if (next) { + await next(); + } + } + } + } + + private async notifyListeners( + event: StoredBridgeEvent, + index: number, + total: number, + sessionId: string, + ): Promise { + const promises: Promise[] = []; + + for (const listener of this.listeners) { + promises.push( + Promise.resolve() + .then(() => listener(event, index, total, sessionId)) + .catch((error) => { + this.handleError(error); + }), + ); + } + + await Promise.allSettled(promises); + } + + private notifyErrorListeners(error: ReplayError): void { + for (const listener of this.errorListeners) { + try { + listener(error); + } catch (err) { + this.handleError(err); + } + } + } + + private generateReplayResult(session: ReplaySession): ReplayResult { + const durationMs = (session.endTime || new Date()).getTime() - session.startTime.getTime(); + const successRate = + session.totalEvents > 0 + ? session.replayedEvents / session.totalEvents + : 0; + const eventsPerSecond = durationMs > 0 ? (session.replayedEvents * 1000) / durationMs : 0; + + const statistics: ReplayStatistics = { + sessionId: session.id, + totalEvents: session.totalEvents, + replayedEvents: session.replayedEvents, + failedEvents: session.failedEvents, + skippedEvents: session.skippedEvents, + startTime: session.startTime, + endTime: session.endTime || new Date(), + durationMs, + eventsPerSecond: Math.round(eventsPerSecond * 100) / 100, + successRate: Math.round(successRate * 10000) / 100, // Percentage + averageEventSize: 0, + errors: session.errors, + }; + + return { + sessionId: session.id, + success: session.status === 'completed' && session.failedEvents === 0, + statistics, + errors: session.errors, + warnings: this.generateWarnings(session, statistics), + }; + } + + private generateWarnings(session: ReplaySession, stats: ReplayStatistics): string[] { + const warnings: string[] = []; + + if (stats.successRate < 1) { + warnings.push( + `${Math.round((1 - stats.successRate) * 100)}% of events failed during replay`, + ); + } + + if (stats.eventsPerSecond < 100 && session.options.mode === 'fast') { + warnings.push('Replay rate is below expected threshold; check system resources'); + } + + if (session.errors.length > 0) { + const criticalErrors = session.errors.filter((e) => !e.recoverable).length; + if (criticalErrors > 0) { + warnings.push(`${criticalErrors} critical errors encountered during replay`); + } + } + + return warnings; + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } + + private handleError(error: unknown): void { + if (this.config.onError) { + this.config.onError(error); + } + if (this.config.verbose) { + console.error('[SorobanEventReplayProcessor]', error); + } + } +} diff --git a/src/replay/events/stellar/types.ts b/src/replay/events/stellar/types.ts new file mode 100644 index 0000000..128faa1 --- /dev/null +++ b/src/replay/events/stellar/types.ts @@ -0,0 +1,307 @@ +/** + * Soroban Event Replay Types + * Defines interfaces for event storage, filtering, and replay + */ + +/** + * Supported event types that can be replayed + */ +export type ReplayableEventType = + | 'transfer' + | 'mint' + | 'burn' + | 'approval' + | 'contract_call' + | 'balance_change' + | 'unknown'; + +/** + * Raw bridge event structure for storage + */ +export interface StoredRawBridgeEvent { + id: string; // Unique event ID + timestamp: Date; + source: string; + type: ReplayableEventType; + payload: Record; + contractId?: string; + transactionHash?: string; + sequenceNumber?: number; + ledgerSequence?: number; + sourceAccount?: string; + metadata?: Record; +} + +/** + * Normalized bridge event for replay + */ +export interface StoredNormalizedBridgeEvent { + id: string; + timestamp: Date; + rawEventId: string; + type: ReplayableEventType; + from?: string; + to?: string; + amount?: string; + asset?: string; + contractId?: string; + normalized: true; + metadata?: Record; +} + +export type StoredBridgeEvent = StoredRawBridgeEvent | StoredNormalizedBridgeEvent; + +/** + * Event replay filter criteria + */ +export interface ReplayFilterCriteria { + // Time-based filtering + startTime?: Date; + endTime?: Date; + + // Event type filtering + eventTypes?: ReplayableEventType[]; + + // Address filtering + fromAddress?: string; + toAddress?: string; + addresses?: string[]; // Match either from or to + + // Contract filtering + contractId?: string; + contractIds?: string[]; + + // Transaction filtering + transactionHash?: string; + sequenceNumber?: number; + ledgerSequence?: number; + sourceAccount?: string; + + // Amount/Asset filtering + minAmount?: string; + maxAmount?: string; + asset?: string; + assets?: string[]; + + // Metadata filtering + hasMetadata?: boolean; + metadataMatches?: Record; + + // Custom predicate + predicate?: (event: StoredBridgeEvent) => boolean; + + // Pagination + limit?: number; + offset?: number; +} + +/** + * Replay execution mode + */ +export type ReplayMode = 'fast' | 'real-time' | 'throttled' | 'step-by-step'; + +/** + * Event replay execution options + */ +export interface ReplayOptions { + // Replay mode + mode?: ReplayMode; + + // For throttled mode: delay between events (ms) + delayMs?: number; + + // Whether to skip failed replays + continueOnError?: boolean; + + // Emit original timestamps as metadata + preserveTimestamps?: boolean; + + // Maximum number of events to replay + maxEvents?: number; + + // Batch size for processing + batchSize?: number; + + // Custom data to pass to listeners + context?: Record; + + // Whether to normalize events before replaying + normalize?: boolean; + + // Dry run mode (don't actually emit events) + dryRun?: boolean; +} + +/** + * Event replay session + */ +export interface ReplaySession { + id: string; + startTime: Date; + endTime?: Date; + status: 'active' | 'completed' | 'failed' | 'paused'; + filter: ReplayFilterCriteria; + options: ReplayOptions; + totalEvents: number; + replayedEvents: number; + failedEvents: number; + skippedEvents: number; + errors: ReplayError[]; + progress: number; // 0-100 +} + +/** + * Event replay error + */ +export interface ReplayError { + eventId: string; + error: string; + timestamp: Date; + recoverable: boolean; +} + +/** + * Event replay statistics + */ +export interface ReplayStatistics { + sessionId: string; + totalEvents: number; + replayedEvents: number; + failedEvents: number; + skippedEvents: number; + startTime: Date; + endTime: Date; + durationMs: number; + eventsPerSecond: number; + successRate: number; // 0-1 + averageEventSize: number; + errors: ReplayError[]; +} + +/** + * Event replay result + */ +export interface ReplayResult { + sessionId: string; + success: boolean; + statistics: ReplayStatistics; + errors: ReplayError[]; + warnings: string[]; +} + +/** + * Event storage query result + */ +export interface EventQueryResult { + events: StoredBridgeEvent[]; + total: number; + limit: number; + offset: number; + hasMore: boolean; +} + +/** + * Event replay listener + */ +export type EventReplayListener = ( + event: StoredBridgeEvent, + index: number, + total: number, + sessionId: string, +) => void | Promise; + +/** + * Event replay error listener + */ +export type EventReplayErrorListener = (error: ReplayError) => void; + +/** + * Event storage backend interface + */ +export interface EventStorageBackend { + // Storage operations + store(event: StoredBridgeEvent): Promise; + storeMultiple(events: StoredBridgeEvent[]): Promise; + + // Query operations + query(filter: ReplayFilterCriteria): Promise; + get(eventId: string): Promise; + getByTransactionHash(hash: string): Promise; + + // Deletion operations + delete(eventId: string): Promise; + deleteRange(startTime: Date, endTime: Date): Promise; + + // Statistics + count(filter?: ReplayFilterCriteria): Promise; + getStorageSize(): Promise; + + // Cleanup + cleanup(): Promise; +} + +/** + * In-memory event storage configuration + */ +export interface InMemoryEventStorageConfig { + // Maximum events to store + maxEvents?: number; + + // Time-to-live for events (ms) + ttl?: number; + + // Enable auto-cleanup on TTL expiration + autoCleanup?: boolean; + + // Cleanup interval (ms) + cleanupInterval?: number; + + // Enable indexing for faster queries + enableIndexing?: boolean; +} + +/** + * Soroban Event Replay Processor configuration + */ +export interface SorobanEventReplayProcessorConfig { + // Storage backend + storageBackend?: EventStorageBackend; + + // In-memory storage config (if backend not provided) + inMemoryStorage?: InMemoryEventStorageConfig; + + // Default replay options + defaultReplayOptions?: ReplayOptions; + + // Event retention period (ms) + eventRetentionMs?: number; + + // Auto-cleanup interval + cleanupIntervalMs?: number; + + // Maximum concurrent replays + maxConcurrentReplays?: number; + + // Enable verbose logging + verbose?: boolean; + + // Error handler + onError?: (error: unknown) => void; + + // Session completed handler + onSessionCompleted?: (result: ReplayResult) => void; +} + +/** + * Event aggregation statistics + */ +export interface EventAggregationStats { + totalEvents: number; + eventsByType: Record; + eventsByContract: Record; + dateRange: { + oldest: Date; + newest: Date; + }; + storageSize: number; +}