From 6196e6517e6e10a1e1b89c8c028f43b345cf222b Mon Sep 17 00:00:00 2001 From: dzekojohn4 Date: Mon, 1 Jun 2026 08:52:55 +0000 Subject: [PATCH] feat: implement Stellar Bridge Liquidity Monitor Closes #297 - Add StellarBridgeLiquidityMonitor with provider registration and caching - Configurable liquidity thresholds with low/critical alert levels - Periodic refresh with event-based status change notifications - Full test coverage including threshold evaluation, caching, and queries --- src/liquidity/stellar/index.ts | 5 + .../stellar-bridge-liquidity-monitor.spec.ts | 434 +++++++++++++++++ .../stellar-bridge-liquidity-monitor.ts | 455 ++++++++++++++++++ 3 files changed, 894 insertions(+) create mode 100644 src/liquidity/stellar/index.ts create mode 100644 src/liquidity/stellar/stellar-bridge-liquidity-monitor.spec.ts create mode 100644 src/liquidity/stellar/stellar-bridge-liquidity-monitor.ts diff --git a/src/liquidity/stellar/index.ts b/src/liquidity/stellar/index.ts new file mode 100644 index 0000000..ef89a74 --- /dev/null +++ b/src/liquidity/stellar/index.ts @@ -0,0 +1,5 @@ +export * from './stellar-bridge-liquidity-monitor'; + +import { StellarBridgeLiquidityMonitor } from './stellar-bridge-liquidity-monitor'; + +export const stellarBridgeLiquidityMonitor = new StellarBridgeLiquidityMonitor(); diff --git a/src/liquidity/stellar/stellar-bridge-liquidity-monitor.spec.ts b/src/liquidity/stellar/stellar-bridge-liquidity-monitor.spec.ts new file mode 100644 index 0000000..50bc67b --- /dev/null +++ b/src/liquidity/stellar/stellar-bridge-liquidity-monitor.spec.ts @@ -0,0 +1,434 @@ +import { + StellarBridgeLiquidityMonitor, + StellarLiquiditySnapshot, + StellarLiquidityProviderConfig, +} from './stellar-bridge-liquidity-monitor'; + +describe('StellarBridgeLiquidityMonitor', () => { + let monitor: StellarBridgeLiquidityMonitor; + + const createMockProvider = ( + name: string, + availableAmount: string, + totalAmount?: string, + ): StellarLiquidityProviderConfig => ({ + name, + fetchFn: async (asset: string): Promise => ({ + provider: name, + asset, + availableAmount, + totalAmount: totalAmount ?? availableAmount, + sourceChain: 'stellar', + destinationChain: 'ethereum', + timestamp: Date.now(), + status: 'active', + }), + cacheTtlMs: 1000, + }); + + beforeEach(() => { + monitor = new StellarBridgeLiquidityMonitor({ + refreshIntervalMs: 30000, + cacheTtlMs: 1000, + thresholds: [ + { asset: 'USDC', lowThreshold: '50000', criticalThreshold: '10000' }, + { asset: 'XLM', lowThreshold: '100000', criticalThreshold: '25000' }, + ], + }); + }); + + afterEach(() => { + monitor.stopMonitoring(); + monitor.removeAllListeners(); + }); + + describe('Provider Registration', () => { + it('should register providers via constructor', () => { + const m = new StellarBridgeLiquidityMonitor({ + providers: [createMockProvider('test-provider', '100000')], + }); + + expect(m.getRegisteredProviders()).toContain('test-provider'); + }); + + it('should register additional providers after construction', () => { + monitor.registerProvider(createMockProvider('dynamic-provider', '50000')); + + expect(monitor.getRegisteredProviders()).toContain('dynamic-provider'); + }); + + it('should unregister providers and clear their cached data', async () => { + monitor.registerProvider( + createMockProvider('temp-provider', '100000'), + ); + + // Fetch to populate cache + await monitor.getLiquidity({ asset: 'USDC', provider: 'temp-provider' }); + + const removed = monitor.unregisterProvider('temp-provider'); + expect(removed).toBe(true); + + // Provider should no longer be registered + expect(monitor.getRegisteredProviders()).not.toContain('temp-provider'); + + // Attempting to unregister again returns false + expect(monitor.unregisterProvider('temp-provider')).toBe(false); + }); + }); + + describe('Liquidity Fetching', () => { + it('should fetch liquidity from registered providers', async () => { + monitor.registerProvider(createMockProvider('provider-a', '100000')); + + const results = await monitor.getLiquidity({ + asset: 'USDC', + provider: 'provider-a', + }); + + expect(results).toHaveLength(1); + expect(results[0].provider).toBe('provider-a'); + expect(results[0].availableAmount).toBe('100000'); + expect(results[0].asset).toBe('USDC'); + }); + + it('should return cached data when available and fresh', async () => { + const fetchFn = jest.fn().mockResolvedValue({ + provider: 'cached-provider', + asset: 'USDC', + availableAmount: '75000', + totalAmount: '75000', + sourceChain: 'stellar', + destinationChain: 'ethereum', + timestamp: Date.now(), + status: 'active', + } as StellarLiquiditySnapshot); + + monitor.registerProvider({ + name: 'cached-provider', + fetchFn, + cacheTtlMs: 60000, + }); + + // First call — should invoke fetchFn + await monitor.getLiquidity({ asset: 'USDC', provider: 'cached-provider' }); + expect(fetchFn).toHaveBeenCalledTimes(1); + + // Second call — should use cache + await monitor.getLiquidity({ asset: 'USDC', provider: 'cached-provider' }); + expect(fetchFn).toHaveBeenCalledTimes(1); + }); + + it('should use stale cache when provider fetch fails', async () => { + const fetchFn = jest + .fn() + .mockResolvedValueOnce({ + provider: 'flaky-provider', + asset: 'USDC', + availableAmount: '50000', + totalAmount: '50000', + sourceChain: 'stellar', + destinationChain: 'ethereum', + timestamp: Date.now(), + status: 'active', + } as StellarLiquiditySnapshot) + .mockRejectedValueOnce(new Error('Network error')); + + monitor.registerProvider({ + name: 'flaky-provider', + fetchFn, + cacheTtlMs: 100, // short TTL so we can test stale cache quickly + }); + + // First call — succeeds + await monitor.getLiquidity({ asset: 'USDC', provider: 'flaky-provider' }); + + // Wait for cache to expire + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Second call — fails, should return stale cache + const results = await monitor.getLiquidity({ + asset: 'USDC', + provider: 'flaky-provider', + }); + + expect(results).toHaveLength(1); + expect(results[0].availableAmount).toBe('50000'); + }, 10000); + }); + + describe('Liquidity Fetching - all providers', () => { + it('should fetch from all providers when no specific provider is given', async () => { + monitor.registerProvider(createMockProvider('provider-x', '100000')); + monitor.registerProvider(createMockProvider('provider-y', '200000')); + + const results = await monitor.getLiquidity({ asset: 'USDC' }); + + expect(results).toHaveLength(2); + const amounts = results.map((r) => r.availableAmount); + expect(amounts).toContain('100000'); + expect(amounts).toContain('200000'); + }); + }); + + describe('Threshold Evaluation', () => { + it('should emit low liquidity alert when below low threshold', async () => { + const alerts: string[] = []; + monitor.on('alert', (event) => { + alerts.push(`${event.level}:${event.asset}`); + }); + + monitor.registerProvider(createMockProvider('low-provider', '30000')); + + await monitor.getLiquidity({ asset: 'USDC', provider: 'low-provider' }); + + expect(alerts).toContain('low:USDC'); + }); + + it('should emit depleted alert when below critical threshold', async () => { + const alerts: string[] = []; + monitor.on('depleted', (event) => { + alerts.push(`${event.level}:${event.asset}`); + }); + + monitor.registerProvider(createMockProvider('empty-provider', '5000')); + + await monitor.getLiquidity({ asset: 'USDC', provider: 'empty-provider' }); + + expect(alerts).toContain('critical:USDC'); + }); + + it('should emit recovered alert when liquidity returns to healthy', async () => { + const recoveredAlerts: string[] = []; + monitor.on('recovered', (event) => { + recoveredAlerts.push(event.provider); + }); + + let currentAmount = '30000'; + const adjustableProvider: StellarLiquidityProviderConfig = { + name: 'adjustable', + fetchFn: async (asset: string) => ({ + provider: 'adjustable', + asset, + availableAmount: currentAmount, + totalAmount: '200000', + sourceChain: 'stellar', + destinationChain: 'ethereum', + timestamp: Date.now(), + status: 'active', + }), + cacheTtlMs: 0, // never cache + }; + + monitor.registerProvider(adjustableProvider); + + // Fetch low liquidity + await monitor.getLiquidity({ asset: 'USDC', provider: 'adjustable' }); + + // Now raise liquidity back to healthy + currentAmount = '100000'; + await monitor.getLiquidity({ asset: 'USDC', provider: 'adjustable' }); + + expect(recoveredAlerts).toContain('adjustable'); + }); + + it('should not emit duplicate alerts when already in low state', async () => { + const lowAlerts: string[] = []; + monitor.on('low', (event) => { + lowAlerts.push(event.provider); + }); + + let currentAmount = '30000'; + const adjustableProvider: StellarLiquidityProviderConfig = { + name: 'still-low', + fetchFn: async (asset: string) => ({ + provider: 'still-low', + asset, + availableAmount: currentAmount, + totalAmount: '50000', + sourceChain: 'stellar', + destinationChain: 'ethereum', + timestamp: Date.now(), + status: 'active', + }), + cacheTtlMs: 0, + }; + + monitor.registerProvider(adjustableProvider); + + // First fetch — emits low + await monitor.getLiquidity({ asset: 'USDC', provider: 'still-low' }); + expect(lowAlerts).toHaveLength(1); + + // Second fetch — still low, cacheTtlMs=0 forces re-fetch + await monitor.getLiquidity({ asset: 'USDC', provider: 'still-low' }); + expect(lowAlerts).toHaveLength(1); + }); + }); + + describe('Threshold Management', () => { + it('should get configured thresholds', () => { + const thresholds = monitor.getThresholds(); + expect(thresholds).toHaveLength(2); + expect(thresholds.find((t) => t.asset === 'USDC')?.lowThreshold).toBe( + '50000', + ); + }); + + it('should set and update thresholds', () => { + monitor.setThreshold({ + asset: 'USDC', + lowThreshold: '100000', + criticalThreshold: '25000', + }); + + const updated = monitor.getThresholds().find((t) => t.asset === 'USDC'); + expect(updated?.lowThreshold).toBe('100000'); + + // Add new threshold + monitor.setThreshold({ + asset: 'BTC', + lowThreshold: '10', + criticalThreshold: '2', + }); + + expect(monitor.getThresholds()).toHaveLength(3); + }); + + it('should remove thresholds', () => { + expect(monitor.removeThreshold('USDC')).toBe(true); + expect(monitor.getThresholds()).toHaveLength(1); + expect(monitor.removeThreshold('NONEXISTENT')).toBe(false); + }); + }); + + describe('Query Helpers', () => { + it('should return low liquidity providers for an asset', async () => { + monitor.registerProvider(createMockProvider('good', '100000')); + monitor.registerProvider(createMockProvider('bad', '30000')); + + await monitor.getLiquidity({ asset: 'USDC' }); + + const lowProviders = monitor.getLowLiquidityProviders('USDC'); + expect(lowProviders).toHaveLength(1); + expect(lowProviders[0].provider).toBe('bad'); + }); + + it('should return status summary', async () => { + monitor.registerProvider(createMockProvider('ok-provider', '100000')); + monitor.registerProvider(createMockProvider('low-provider', '30000')); + + await monitor.getLiquidity({ asset: 'USDC' }); + + const summary = monitor.getStatusSummary(); + expect(summary.USDC).toBeDefined(); + expect(summary.USDC.providers).toBe(2); + expect(summary.USDC.low).toBe(1); + expect(summary.USDC.depleted).toBe(0); + }); + }); + + describe('Cache Management', () => { + it('should invalidate cache for a specific provider and asset', async () => { + const fetchFn = jest.fn().mockResolvedValue({ + provider: 'cache-test', + asset: 'USDC', + availableAmount: '75000', + totalAmount: '75000', + sourceChain: 'stellar', + destinationChain: 'ethereum', + timestamp: Date.now(), + status: 'active', + } as StellarLiquiditySnapshot); + + monitor.registerProvider({ + name: 'cache-test', + fetchFn, + cacheTtlMs: 60000, + }); + + await monitor.getLiquidity({ asset: 'USDC', provider: 'cache-test' }); + expect(fetchFn).toHaveBeenCalledTimes(1); + + monitor.invalidateCache('cache-test', 'USDC'); + + await monitor.getLiquidity({ asset: 'USDC', provider: 'cache-test' }); + expect(fetchFn).toHaveBeenCalledTimes(2); + }); + + it('should clear entire cache', async () => { + const fetchFn = jest.fn().mockResolvedValue({ + provider: 'clear-test', + asset: 'USDC', + availableAmount: '75000', + totalAmount: '75000', + sourceChain: 'stellar', + destinationChain: 'ethereum', + timestamp: Date.now(), + status: 'active', + } as StellarLiquiditySnapshot); + + monitor.registerProvider({ + name: 'clear-test', + fetchFn, + cacheTtlMs: 60000, + }); + + await monitor.getLiquidity({ asset: 'USDC', provider: 'clear-test' }); + expect(fetchFn).toHaveBeenCalledTimes(1); + + monitor.clearCache(); + + await monitor.getLiquidity({ asset: 'USDC', provider: 'clear-test' }); + expect(fetchFn).toHaveBeenCalledTimes(2); + }); + }); + + describe('Polling', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('should start and stop periodic monitoring', () => { + expect(monitor.isMonitoring).toBe(false); + + monitor.startMonitoring(); + expect(monitor.isMonitoring).toBe(true); + + monitor.stopMonitoring(); + expect(monitor.isMonitoring).toBe(false); + }); + + it('should refresh all liquidity on interval', async () => { + const refreshSpy = jest.spyOn(monitor, 'refreshAll'); + + monitor.startMonitoring(); + // Should have been called once immediately + expect(refreshSpy).toHaveBeenCalledTimes(1); + + jest.advanceTimersByTime(30000); + expect(refreshSpy).toHaveBeenCalledTimes(2); + + jest.advanceTimersByTime(30000); + expect(refreshSpy).toHaveBeenCalledTimes(3); + + monitor.stopMonitoring(); + }); + + it('should handle refreshAll with no providers gracefully', async () => { + await expect(monitor.refreshAll()).resolves.not.toThrow(); + }); + + it('should populate cache via getAllLiquidity', async () => { + monitor.registerProvider(createMockProvider('full-provider', '100000')); + + const results = await monitor.getAllLiquidity(); + + expect(results.length).toBeGreaterThanOrEqual(1); + expect(results.some((r) => r.provider === 'full-provider')).toBe(true); + }); + }); +}); diff --git a/src/liquidity/stellar/stellar-bridge-liquidity-monitor.ts b/src/liquidity/stellar/stellar-bridge-liquidity-monitor.ts new file mode 100644 index 0000000..08888bb --- /dev/null +++ b/src/liquidity/stellar/stellar-bridge-liquidity-monitor.ts @@ -0,0 +1,455 @@ +/** + * Stellar Bridge Liquidity Monitor. + * + * Monitors available liquidity across Stellar bridge providers by fetching + * liquidity metrics from each registered provider, caching results, and + * emitting events when liquidity drops below configurable thresholds. + * + * @see Issue #297 — Implement Stellar Bridge Liquidity Monitor + */ + +import { EventEmitter } from 'events'; + +// ─── Types ──────────────────────────────────────────────────────────────────── + +export interface StellarLiquiditySnapshot { + /** Provider/bridge identifier. */ + provider: string; + /** Asset symbol (e.g. USDC, XLM). */ + asset: string; + /** Available liquidity amount as a string to preserve precision. */ + availableAmount: string; + /** Total liquidity cap if applicable, or the same as availableAmount. */ + totalAmount: string; + /** Timestamp of the snapshot. */ + timestamp: number; + /** Source chain identifier. */ + sourceChain: string; + /** Destination chain identifier. */ + destinationChain: string; + /** Optional human-readable status. */ + status?: 'active' | 'low' | 'depleted' | 'unknown'; +} + +export interface StellarLiquidityQuery { + asset: string; + sourceChain?: string; + destinationChain?: string; + provider?: string; +} + +export interface LiquidityThreshold { + /** Asset symbol this threshold applies to. */ + asset: string; + /** Minimum amount before emitting a 'low_liquidity' alert. */ + lowThreshold: string; + /** Minimum amount before emitting a 'depleted' alert. */ + criticalThreshold: string; +} + +export interface StellarLiquidityProviderConfig { + /** Provider name. */ + name: string; + /** Function that fetches the current liquidity snapshot from this provider. */ + fetchFn: (asset: string) => Promise; + /** Cache TTL for this provider's data in milliseconds (default: 60000). */ + cacheTtlMs?: number; +} + +export interface StellarBridgeLiquidityMonitorConfig { + /** Registered liquidity providers. */ + providers?: StellarLiquidityProviderConfig[]; + /** Refresh interval for periodic polling in milliseconds (default: 30000). */ + refreshIntervalMs?: number; + /** Default thresholds for low/critical liquidity alerts. */ + thresholds?: LiquidityThreshold[]; + /** Maximum age of cached data before forced refresh in ms (default: 60000). */ + cacheTtlMs?: number; +} + +export interface LiquidityAlertEvent { + provider: string; + asset: string; + availableAmount: string; + threshold: string; + level: 'low' | 'critical' | 'recovered'; + snapshot: StellarLiquiditySnapshot; + timestamp: number; +} + +// ─── Defaults ───────────────────────────────────────────────────────────────── + +const DEFAULT_CONFIG: Required = { + providers: [], + refreshIntervalMs: 30_000, + cacheTtlMs: 60_000, + thresholds: [ + { asset: 'USDC', lowThreshold: '50000', criticalThreshold: '10000' }, + { asset: 'USDT', lowThreshold: '50000', criticalThreshold: '10000' }, + { asset: 'XLM', lowThreshold: '100000', criticalThreshold: '25000' }, + { asset: 'ETH', lowThreshold: '50', criticalThreshold: '10' }, + ], +}; + +// ─── Monitor ────────────────────────────────────────────────────────────────── + +export class StellarBridgeLiquidityMonitor extends EventEmitter { + private readonly config: Required; + private readonly cache = new Map(); + private readonly cacheTimestamps = new Map(); + private readonly providerMap = new Map(); + private readonly previousAlerts = new Map(); // key -> wasLow + private refreshTimer: ReturnType | null = null; + + constructor(config: StellarBridgeLiquidityMonitorConfig = {}) { + super(); + this.config = { + providers: config.providers ? [...config.providers] : [], + refreshIntervalMs: + config.refreshIntervalMs ?? DEFAULT_CONFIG.refreshIntervalMs, + cacheTtlMs: config.cacheTtlMs ?? DEFAULT_CONFIG.cacheTtlMs, + thresholds: config.thresholds + ? [...config.thresholds] + : [...DEFAULT_CONFIG.thresholds], + }; + + for (const provider of this.config.providers) { + this.providerMap.set(provider.name, provider); + } + } + + // ─── Provider Registration ───────────────────────────────────────────────── + + /** + * Register a liquidity provider for monitoring. + * Duplicate provider names will be overwritten. + */ + registerProvider(provider: StellarLiquidityProviderConfig): void { + this.providerMap.set(provider.name, provider); + this.config.providers.push(provider); + } + + /** + * Remove a previously registered provider. + */ + unregisterProvider(name: string): boolean { + const idx = this.config.providers.findIndex((p) => p.name === name); + if (idx === -1) return false; + this.config.providers.splice(idx, 1); + this.providerMap.delete(name); + + // Clear cached data for this provider + const keysToDelete: string[] = []; + for (const [key, snapshot] of this.cache) { + if (snapshot.provider === name) { + keysToDelete.push(key); + } + } + for (const key of keysToDelete) { + this.cache.delete(key); + this.cacheTimestamps.delete(key); + } + + return true; + } + + /** Get list of registered provider names. */ + getRegisteredProviders(): string[] { + return Array.from(this.providerMap.keys()); + } + + // ─── Liquidity Fetching ───────────────────────────────────────────────────── + + /** + * Fetch liquidity for a specific asset from all registered providers. + * Uses cached data when available and still fresh. + */ + async getLiquidity( + query: StellarLiquidityQuery, + ): Promise { + const providers = query.provider + ? [query.provider] + : Array.from(this.providerMap.keys()); + + const results: StellarLiquiditySnapshot[] = []; + const errors: string[] = []; + + for (const providerName of providers) { + const providerConfig = this.providerMap.get(providerName); + if (!providerConfig) { + errors.push(`Unknown provider: ${providerName}`); + continue; + } + + const cacheKey = this.buildCacheKey(providerName, query.asset); + const cached = this.cache.get(cacheKey); + const cachedAt = this.cacheTimestamps.get(cacheKey) ?? 0; + + const effectiveTtl = providerConfig.cacheTtlMs ?? this.config.cacheTtlMs; + // Return cached data if still fresh + if (cached && Date.now() - cachedAt < effectiveTtl) { + results.push(cached); + continue; + } + + try { + const snapshot = await providerConfig.fetchFn(query.asset); + const enriched: StellarLiquiditySnapshot = { + ...snapshot, + sourceChain: query.sourceChain ?? snapshot.sourceChain, + destinationChain: query.destinationChain ?? snapshot.destinationChain, + timestamp: Date.now(), + }; + + this.cache.set(cacheKey, enriched); + this.cacheTimestamps.set(cacheKey, Date.now()); + this.evaluateThresholds(enriched); + results.push(enriched); + } catch (error: unknown) { + const message = error instanceof Error ? error.message : String(error); + errors.push(`Provider ${providerName} failed: ${message}`); + + // Use stale cache if available on failure + if (cached) { + results.push({ ...cached, status: cached.status ?? 'unknown' }); + } + } + } + + return results; + } + + /** + * Get liquidity for all tracked assets from all providers. + */ + async getAllLiquidity(): Promise { + const allAssets = new Set(); + for (const snapshot of this.cache.values()) { + allAssets.add(snapshot.asset); + } + + // Add assets from thresholds that might not be cached yet + for (const t of this.config.thresholds) { + allAssets.add(t.asset); + } + + const results: StellarLiquiditySnapshot[] = []; + for (const asset of allAssets) { + const snapshots = await this.getLiquidity({ asset }); + results.push(...snapshots); + } + + return results; + } + + // ─── Cache Management ─────────────────────────────────────────────────────── + + /** + * Invalidate the cache for a specific provider and asset, forcing a refresh + * on the next query. + */ + invalidateCache(provider: string, asset: string): void { + const key = this.buildCacheKey(provider, asset); + this.cache.delete(key); + this.cacheTimestamps.delete(key); + } + + /** Clear the entire cache. */ + clearCache(): void { + this.cache.clear(); + this.cacheTimestamps.clear(); + } + + // ─── Threshold Evaluation ─────────────────────────────────────────────────── + + /** + * Evaluate the current snapshot against configured thresholds and emit + * alerts if conditions are met. + */ + private evaluateThresholds(snapshot: StellarLiquiditySnapshot): void { + const threshold = this.config.thresholds.find( + (t) => t.asset === snapshot.asset, + ); + if (!threshold) return; + + const available = BigInt(snapshot.availableAmount); + const lowThreshold = BigInt(threshold.lowThreshold); + const criticalThreshold = BigInt(threshold.criticalThreshold); + + const alertKey = `${snapshot.provider}:${snapshot.asset}`; + const wasLow = this.previousAlerts.get(alertKey) ?? false; + + if (available <= criticalThreshold) { + // Critical — depleted, only alert if transitioning into this state + snapshot.status = 'depleted'; + if (!wasLow) { + this.emitAlert('critical', snapshot, threshold.criticalThreshold); + } + this.previousAlerts.set(alertKey, true); + } else if (available <= lowThreshold) { + // Low liquidity — only alert if transitioning into this state + snapshot.status = 'low'; + if (!wasLow) { + this.emitAlert('low', snapshot, threshold.lowThreshold); + } + this.previousAlerts.set(alertKey, true); + } else { + // Healthy — mark recovered if previously low + snapshot.status = 'active'; + if (wasLow) { + this.emitAlert('recovered', snapshot, threshold.lowThreshold); + } + this.previousAlerts.set(alertKey, false); + } + } + + private emitAlert( + level: 'low' | 'critical' | 'recovered', + snapshot: StellarLiquiditySnapshot, + threshold: string, + ): void { + const event: LiquidityAlertEvent = { + provider: snapshot.provider, + asset: snapshot.asset, + availableAmount: snapshot.availableAmount, + threshold, + level, + snapshot, + timestamp: Date.now(), + }; + + this.emit(level === 'critical' ? 'depleted' : level, event); + this.emit('alert', event); + } + + // ─── Threshold Management ─────────────────────────────────────────────────── + + /** Get all configured thresholds. */ + getThresholds(): LiquidityThreshold[] { + return [...this.config.thresholds]; + } + + /** Set or update a threshold for a given asset. */ + setThreshold(threshold: LiquidityThreshold): void { + const idx = this.config.thresholds.findIndex( + (t) => t.asset === threshold.asset, + ); + if (idx !== -1) { + this.config.thresholds[idx] = threshold; + } else { + this.config.thresholds.push(threshold); + } + } + + /** Remove threshold for a given asset. */ + removeThreshold(asset: string): boolean { + const idx = this.config.thresholds.findIndex((t) => t.asset === asset); + if (idx === -1) return false; + this.config.thresholds.splice(idx, 1); + return true; + } + + // ─── Query Helpers ───────────────────────────────────────────────────────── + + /** + * Get all providers with liquidity below the low threshold for a given asset. + */ + getLowLiquidityProviders(asset: string): StellarLiquiditySnapshot[] { + const results: StellarLiquiditySnapshot[] = []; + for (const snapshot of this.cache.values()) { + if (snapshot.asset !== asset) continue; + if (snapshot.status === 'low' || snapshot.status === 'depleted') { + results.push(snapshot); + } + } + return results; + } + + /** Get a summary of liquidity status across all tracked assets. */ + getStatusSummary(): Record { + const summary = new Map< + string, + { providers: number; low: number; depleted: number } + >(); + + for (const snapshot of this.cache.values()) { + const asset = snapshot.asset; + if (!summary.has(asset)) { + summary.set(asset, { providers: 0, low: 0, depleted: 0 }); + } + const entry = summary.get(asset)!; + entry.providers++; + if (snapshot.status === 'low') entry.low++; + if (snapshot.status === 'depleted') entry.depleted++; + } + + const result: Record = {}; + for (const [asset, entry] of summary) { + result[asset] = entry; + } + return result; + } + + // ─── Polling ──────────────────────────────────────────────────────────────── + + /** + * Start periodic refresh of all liquidity data. + * Idempotent — calling twice has no effect. + */ + startMonitoring(): void { + if (this.refreshTimer) return; + + void this.refreshAll(); + + this.refreshTimer = setInterval(() => { + void this.refreshAll(); + }, this.config.refreshIntervalMs); + } + + /** Stop periodic refresh. Idempotent. */ + stopMonitoring(): void { + if (this.refreshTimer) { + clearInterval(this.refreshTimer); + this.refreshTimer = null; + } + } + + get isMonitoring(): boolean { + return this.refreshTimer !== null; + } + + /** + * Force refresh all cached liquidity data from all providers. + */ + async refreshAll(): Promise { + const allAssets = new Set(); + for (const snapshot of this.cache.values()) { + allAssets.add(snapshot.asset); + } + // Also refresh assets with configured thresholds + for (const t of this.config.thresholds) { + allAssets.add(t.asset); + } + + if (allAssets.size === 0) return; + + const promises: Array> = []; + for (const asset of allAssets) { + // Invalidate cache for this asset to force refresh + for (const providerName of this.providerMap.keys()) { + this.invalidateCache(providerName, asset); + } + promises.push( + this.getLiquidity({ asset }).then(() => {}), // discard return value + ); + } + + await Promise.allSettled(promises); + } + + // ─── Helpers ─────────────────────────────────────────────────────────────── + + private buildCacheKey(provider: string, asset: string): string { + return `${provider}:${asset}`; + } +}