diff --git a/src/lib/prisma.ts b/src/lib/prisma.ts index 98ada8f4..34a0b97e 100644 --- a/src/lib/prisma.ts +++ b/src/lib/prisma.ts @@ -21,8 +21,11 @@ export const prisma = new Proxy({} as PrismaClient, { if (!connectionString) { throw new Error("DATABASE_URL must be defined"); } - - const pool = new pg.Pool({ connectionString }); + const pool = new pg.Pool({ + connectionString, + max: 20, + idleTimeoutMillis: 10000, + }); const adapter = new PrismaPg(pool); globalForPrisma.prisma = new PrismaClient({ adapter }); } diff --git a/src/lib/stellarProvider.ts b/src/lib/stellarProvider.ts index 47565637..6d12e0b3 100644 --- a/src/lib/stellarProvider.ts +++ b/src/lib/stellarProvider.ts @@ -1,17 +1,18 @@ -import { Horizon } from "@stellar/stellar-sdk"; +import { Horizon, SorobanRpc } from "@stellar/stellar-sdk"; import dotenv from "dotenv"; +import { logger } from "../utils/logger"; dotenv.config(); /** - * Whether an error from the Horizon SDK should trigger a failover to the next node. + * Whether an error from the Horizon SDK or RPC should trigger a failover to the next node. * Covers HTTP 5xx responses and common network-level errors. */ function isFailoverError(error: unknown): boolean { if (error && typeof error === "object") { const err = error as Record; - // HTTP 5xx from Horizon + // HTTP 5xx from Horizon or RPC const httpStatus: unknown = err.response?.status ?? err.status ?? err.statusCode; if (typeof httpStatus === "number" && httpStatus >= 500) { @@ -31,9 +32,17 @@ function isFailoverError(error: unknown): boolean { return true; } - // SDK timeout messages - if (typeof err.message === "string" && err.message.includes("timeout")) { - return true; + // SDK timeout messages or RPC errors indicating connection issues + if (typeof err.message === "string") { + const msg = err.message.toLowerCase(); + if ( + msg.includes("timeout") || + msg.includes("network error") || + msg.includes("econnrefused") || + msg.includes("fetch failed") + ) { + return true; + } } } @@ -67,23 +76,81 @@ function buildHorizonUrls(network: string): string[] { } /** - * StellarProvider — singleton that manages a pool of Horizon servers with + * Builds the ordered list of fallback RPC URLs for a given network. + */ +function buildRpcUrls(network: string): string[] { + const isMainnet = network === "PUBLIC"; + + const sdfUrl = isMainnet + ? "https://rpc.mainnet.stellar.org" + : "https://rpc.testnet.stellar.org"; + + const urls: string[] = []; + + const customUrl = process.env.RPC_URL?.trim(); + if (customUrl) { + urls.push(customUrl); + } + + // Load configurable fallback RPC URLs (comma-separated) + const customFallbacks = process.env.FALLBACK_RPC_URLS?.trim(); + if (customFallbacks) { + urls.push( + ...customFallbacks + .split(",") + .map((u) => u.trim()) + .filter(Boolean) + ); + } + + // Ensure default SDF node is in the list + if (!urls.includes(sdfUrl)) { + urls.push(sdfUrl); + } + + return urls; +} + +/** + * StellarProvider — singleton that manages a pool of Horizon and RPC servers with * automatic failover. */ class StellarProvider { + private readonly network: string; + + // Horizon properties private readonly urls: readonly string[]; private currentIndex: number = 0; private server: Horizon.Server; + // RPC properties + private readonly rpcUrls: readonly string[]; + private rpcCurrentIndex: number = 0; + private rpcServer: SorobanRpc.Server; + constructor() { - const network = process.env.STELLAR_NETWORK || "TESTNET"; - this.urls = buildHorizonUrls(network); + this.network = process.env.STELLAR_NETWORK || "TESTNET"; + + // Initialize Horizon + this.urls = buildHorizonUrls(this.network); this.server = new Horizon.Server(this.urls[0]!); - console.info( - `[StellarProvider] Initialized with ${this.urls.length} node(s). Primary: ${this.urls[0]!}`, + logger.info( + `[StellarProvider] Initialized Horizon with ${this.urls.length} node(s). Primary: ${this.urls[0]!}`, + ); + + // Initialize RPC + this.rpcUrls = buildRpcUrls(this.network); + this.rpcServer = new SorobanRpc.Server(this.rpcUrls[0]!, { + allowHttp: this.network === "TESTNET", + }); + logger.info( + `[StellarProvider] Initialized RPC with ${this.rpcUrls.length} node(s). Primary: ${this.rpcUrls[0]!}`, ); } + // ========================================== + // Horizon methods + // ========================================== getServer(): Horizon.Server { return this.server; } @@ -101,8 +168,8 @@ class StellarProvider { const nextIndex = (this.currentIndex + 1) % this.urls.length; if (nextIndex === this.currentIndex) { - console.error( - `[StellarProvider] Node ${failedUrl} failed and no fallback is available.`, + logger.networkError( + `[StellarProvider] Horizon Node ${failedUrl} failed and no fallback is available.`, ); return false; } @@ -110,10 +177,52 @@ class StellarProvider { this.currentIndex = nextIndex; this.server = new Horizon.Server(this.urls[this.currentIndex]!); - console.warn( - `[StellarProvider] ⚠️ Node "${failedUrl}" returned an error. ` + + logger.warn( + `[StellarProvider] ⚠️ Horizon Node "${failedUrl}" returned an error. ` + `Failing over to "${this.urls[this.currentIndex]!}" ` + `(node ${this.currentIndex + 1}/${this.urls.length}).`, + { isNetwork: true } + ); + + return true; + } + + // ========================================== + // RPC methods + // ========================================== + getRpcServer(): SorobanRpc.Server { + return this.rpcServer; + } + + getCurrentRpcUrl(): string { + return this.rpcUrls[this.rpcCurrentIndex]!; + } + + reportRpcFailure(error: unknown): boolean { + if (!isFailoverError(error)) { + return false; + } + + const failedUrl = this.rpcUrls[this.rpcCurrentIndex]!; + const nextIndex = (this.rpcCurrentIndex + 1) % this.rpcUrls.length; + + if (nextIndex === this.rpcCurrentIndex) { + logger.networkError( + `[StellarProvider] RPC Node ${failedUrl} failed and no fallback is available.`, + ); + return false; + } + + this.rpcCurrentIndex = nextIndex; + this.rpcServer = new SorobanRpc.Server(this.rpcUrls[this.rpcCurrentIndex]!, { + allowHttp: this.network === "TESTNET", + }); + + logger.warn( + `[StellarProvider] ⚠️ RPC Node "${failedUrl}" returned an error. ` + + `Failing over to "${this.rpcUrls[this.rpcCurrentIndex]!}" ` + + `(node ${this.rpcCurrentIndex + 1}/${this.rpcUrls.length}).`, + { isNetwork: true } ); return true; diff --git a/src/services/contractSanityCheckService.ts b/src/services/contractSanityCheckService.ts index 8aea19e3..896f8a51 100644 --- a/src/services/contractSanityCheckService.ts +++ b/src/services/contractSanityCheckService.ts @@ -1,5 +1,6 @@ -import dotenv from "dotenv"; import { SorobanRpc, xdr } from "@stellar/stellar-sdk"; +import stellarProvider from "../lib/stellarProvider"; +import { logger } from "../utils/logger"; dotenv.config(); @@ -19,19 +20,11 @@ interface ContractSanityCheckResult { export class ContractSanityCheckService { private readonly CONTRACT_ID: string; private readonly NETWORK: string; - private readonly rpcUrl: string; private readonly TIMEOUT_MS = 10000; // 10 second timeout for contract reads constructor() { this.CONTRACT_ID = process.env.CONTRACT_ID || ""; this.NETWORK = process.env.STELLAR_NETWORK || "TESTNET"; - - // Configure RPC URL based on network - if (this.NETWORK === "PUBLIC") { - this.rpcUrl = "https://rpc.mainnet.stellar.org"; - } else { - this.rpcUrl = "https://rpc.testnet.stellar.org"; - } } /** @@ -46,7 +39,7 @@ export class ContractSanityCheckService { // Skip check if CONTRACT_ID is not configured if (!this.CONTRACT_ID) { - console.warn( + logger.warn( "⚠️ CONTRACT_ID not configured - skipping contract sanity check", ); return { @@ -56,20 +49,19 @@ export class ContractSanityCheckService { }; } - console.log( + logger.networkInfo( `🔍 Performing contract sanity check on ${this.CONTRACT_ID} (${this.NETWORK})`, ); try { - const server = new SorobanRpc.Server(this.rpcUrl, { - allowHttp: this.NETWORK === "TESTNET", - }); + // Use the shared StellarProvider so it respects the current RPC failover state + const server = stellarProvider.getRpcServer(); // Attempt to read contract version (low-cost read) const versionResult = await this.tryGetVersion(server); if (versionResult.success) { - console.log( + logger.networkInfo( `✅ Contract sanity check passed - Version: ${versionResult.version}`, ); return { @@ -84,7 +76,7 @@ export class ContractSanityCheckService { const activeResult = await this.tryIsActive(server); if (activeResult.success) { - console.log( + logger.networkInfo( `✅ Contract sanity check passed - Contract is active`, ); return { @@ -95,16 +87,23 @@ export class ContractSanityCheckService { } // Both checks failed - const error = versionResult.error || activeResult.error || "Unknown error"; - console.error(`❌ Contract sanity check failed: ${error}`); + const errorStr = versionResult.error || activeResult.error || "Unknown error"; + const origError = versionResult.originalError || activeResult.originalError; + + if (origError) { + stellarProvider.reportRpcFailure(origError); + } + + logger.networkError(`❌ Contract sanity check failed: ${errorStr}`); return { ...result, - error, + error: errorStr, }; } catch (error) { + stellarProvider.reportRpcFailure(error); const errorMessage = error instanceof Error ? error.message : String(error); - console.error(`❌ Contract sanity check error: ${errorMessage}`); + logger.networkError(`❌ Contract sanity check error: ${errorMessage}`); return { ...result, error: errorMessage, @@ -118,7 +117,7 @@ export class ContractSanityCheckService { */ private async tryGetVersion( server: SorobanRpc.Server, - ): Promise<{ success: boolean; version?: string; error?: string }> { + ): Promise<{ success: boolean; version?: string; error?: string; originalError?: any }> { try { // Attempt to read a 'version' function from the contract // This is a common pattern in Soroban contracts @@ -141,6 +140,7 @@ export class ContractSanityCheckService { return { success: false, error: error instanceof Error ? error.message : String(error), + originalError: error, }; } } @@ -151,7 +151,7 @@ export class ContractSanityCheckService { */ private async tryIsActive( server: SorobanRpc.Server, - ): Promise<{ success: boolean; isActive?: boolean; error?: string }> { + ): Promise<{ success: boolean; isActive?: boolean; error?: string; originalError?: any }> { try { const contractAddress = this.CONTRACT_ID; @@ -182,6 +182,7 @@ export class ContractSanityCheckService { return { success: false, error: errorMessage, + originalError: error, }; } } diff --git a/src/services/sorobanEventListener.ts b/src/services/sorobanEventListener.ts index 043d4e51..628cd762 100644 --- a/src/services/sorobanEventListener.ts +++ b/src/services/sorobanEventListener.ts @@ -6,6 +6,7 @@ import { broadcastToSessions } from "../lib/socket"; import stellarProvider from "../lib/stellarProvider"; import dotenv from "dotenv"; import { signer } from "../signer"; +import { logger } from "../utils/logger"; dotenv.config(); @@ -37,14 +38,14 @@ export class SorobanEventListener { async start(): Promise { if (this.isRunning) { - console.warn("SorobanEventListener is already running"); + logger.warn("[EventListener] SorobanEventListener is already running"); return; } this.isRunning = true; this.oraclePublicKey = await signer.getPublicKey(); - console.log( + logger.info( `[EventListener] Starting listener for account ${this.oraclePublicKey}`, ); @@ -54,7 +55,7 @@ export class SorobanEventListener { }); if (lastRecord) { this.lastProcessedLedger = lastRecord.ledgerSeq; - console.log( + logger.info( `[EventListener] Resuming from ledger ${this.lastProcessedLedger}`, ); } @@ -65,7 +66,7 @@ export class SorobanEventListener { // Start periodic polling this.pollTimer = setInterval(() => { this.pollTransactions().catch((err) => { - console.error("[EventListener] Poll error:", err); + logger.networkError("[EventListener] Poll error:", { err }); }); }, this.pollIntervalMs); } @@ -76,7 +77,7 @@ export class SorobanEventListener { this.pollTimer = null; } this.isRunning = false; - console.log("[EventListener] Stopped"); + logger.info("[EventListener] Stopped"); } restart(newIntervalMs: number): void { @@ -88,10 +89,10 @@ export class SorobanEventListener { } this.pollTimer = setInterval(() => { this.pollTransactions().catch((err) => { - console.error("[EventListener] Poll error:", err); + logger.networkError("[EventListener] Poll error:", { err }); }); }, this.pollIntervalMs); - console.info(`[EventListener] Poll interval updated to ${this.pollIntervalMs}ms`); + logger.info(`[EventListener] Poll interval updated to ${this.pollIntervalMs}ms`); } private async pollTransactions(): Promise { @@ -143,7 +144,7 @@ export class SorobanEventListener { // Account not found is expected for new accounts with no transactions if (error instanceof Error && error.message.includes("status code 404")) { - console.log("[EventListener] No transactions found for oracle account"); + logger.networkInfo("[EventListener] No transactions found for oracle account"); return; } throw error; @@ -191,8 +192,9 @@ export class SorobanEventListener { const rate = parseFloat(valueStr); if (isNaN(rate)) { - console.warn( + logger.warn( `[EventListener] Invalid rate value for ${currency}: ${valueStr}`, + { isNetwork: true } ); continue; } @@ -207,9 +209,9 @@ export class SorobanEventListener { }); } } catch (error) { - console.error( + logger.networkError( `[EventListener] Error parsing operations for tx ${tx.hash}:`, - error, + { error } ); } @@ -236,13 +238,13 @@ export class SorobanEventListener { confirmedAt: price.confirmedAt, }, }); - console.log( + logger.networkInfo( `[EventListener] Saved confirmed price: ${price.currency} = ${price.rate} (tx: ${price.txHash.substring(0, 8)}...)`, ); } catch (error) { - console.error( + logger.networkError( `[EventListener] Error saving price for ${price.currency}:`, - error, + { error } ); } } diff --git a/src/services/stellarService.ts b/src/services/stellarService.ts index 996190ac..fd7b0d83 100644 --- a/src/services/stellarService.ts +++ b/src/services/stellarService.ts @@ -14,6 +14,7 @@ import stellarProvider from "../lib/stellarProvider"; import { sequenceManager } from "./sequence-manager"; import { assertSigningAllowed } from "../state/appState"; import { signer } from "../signer"; +import { logger } from "../utils/logger"; dotenv.config(); @@ -110,7 +111,7 @@ export class StellarService { baseFee, ); - console.info(`✅ Price update for ${currency} confirmed. Hash: ${result.hash}`); + logger.networkInfo(`✅ Price update for ${currency} confirmed. Hash: ${result.hash}`, { hash: result.hash }); return result.hash; } @@ -155,7 +156,7 @@ export class StellarService { ); const currencies = updates.map((u) => u.currency).join(", "); - console.info(`✅ Batched price update for [${currencies}] confirmed. Hash: ${result.hash}`); + logger.networkInfo(`✅ Batched price update for [${currencies}] confirmed. Hash: ${result.hash}`, { hash: result.hash, currencies }); return result.hash; } @@ -193,7 +194,7 @@ export class StellarService { baseFee, ); - console.info(`✅ Multi-signed price update for ${currency} confirmed. Hash: ${result.hash}`); + logger.networkInfo(`✅ Multi-signed price update for ${currency} confirmed. Hash: ${result.hash}`, { hash: result.hash }); return result.hash; } @@ -245,7 +246,7 @@ export class StellarService { const resultCode = error.response?.data?.extras?.result_codes?.transaction; if (resultCode === "tx_bad_seq" || this.isLocalTimeoutError(error)) { - console.warn( + logger.warn( "⚠️ SequenceManager: stale or invalid local transaction assignment detected. Invalidating sequence and retrying...", ); const publicKey = await this.getPublicKey(); @@ -258,7 +259,7 @@ export class StellarService { } if (this.isStuckError(error) && attempt <= maxRetries) { - console.warn( + logger.warn( `⚠️ Transaction stuck, expired, or fee too low (Attempt ${attempt}). Recycling locally and retrying...`, ); if (!this.shouldRecycleImmediately(error)) { @@ -333,7 +334,7 @@ export class StellarService { transaction.signatures.push(decoratedSignature); } catch (error) { - console.error(`[StellarService] Failed to add signature for ${sig.signerPublicKey}:`, error); + logger.error(`[StellarService] Failed to add signature for ${sig.signerPublicKey}:`, { error }); } } @@ -342,7 +343,7 @@ export class StellarService { const resultCode = error.response?.data?.extras?.result_codes?.transaction; if (resultCode === "tx_bad_seq" || this.isLocalTimeoutError(error)) { - console.warn( + logger.warn( "⚠️ SequenceManager: stale or invalid multi-sig assignment detected. Invalidating sequence...", ); const publicKey = await this.getPublicKey(); @@ -410,7 +411,7 @@ export class StellarService { activePending.timedOut = true; this.pendingTimeBoundTransactions.delete(pending.hash); - console.warn( + logger.warn( `[StellarService] Transaction ${pending.hash} exceeded ${this.TRANSACTION_TIME_BOUND_SECONDS}s time-bound. Recycling local assignment.`, ); reject( diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 1e715928..777e86b9 100644 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -1,10 +1,17 @@ import winstonLogger from './winstonLogger'; +import { Logger } from 'winston'; -// Export the Winston logger as the default logger -export const logger = winstonLogger; +export interface ExtendedLogger extends Logger { + fetcherError: (message: string, meta?: any) => void; + networkInfo: (message: string, meta?: any) => void; + networkError: (message: string, meta?: any) => void; +} + +// Export the Winston logger as the default logger with extended types +export const logger = winstonLogger as ExtendedLogger; // For compatibility, export a createFetcherLogger that returns the same logger export function createFetcherLogger(fetcherName: string) { // Optionally, you can add child loggers or labels here - return winstonLogger; + return winstonLogger as ExtendedLogger; } diff --git a/src/utils/winstonLogger.ts b/src/utils/winstonLogger.ts index 37427a59..f2c26a5d 100644 --- a/src/utils/winstonLogger.ts +++ b/src/utils/winstonLogger.ts @@ -8,23 +8,42 @@ const __dirname = path.dirname(__filename); const logDir = path.resolve(__dirname, "../../logs"); +// Custom filters to separate network and system logs +const networkFilter = format((info) => { + return info.isNetwork ? info : false; +}); + +const systemFilter = format((info) => { + return info.isNetwork ? false : info; +}); + +const baseFormat = format.combine( + format.timestamp({ format: "YYYY-MM-DD HH:mm:ss" }), + format.errors({ stack: true }), + format.splat(), + format.json(), +); + const logger = createLogger({ level: "info", - format: format.combine( - format.timestamp({ format: "YYYY-MM-DD HH:mm:ss" }), - format.errors({ stack: true }), - format.splat(), - format.json(), - ), transports: [ new DailyRotateFile({ - filename: path.join(logDir, "application-%DATE%.log"), + filename: path.join(logDir, "system-%DATE%.log"), datePattern: "YYYY-MM-DD", maxSize: "100m", maxFiles: "10", zippedArchive: true, handleExceptions: true, handleRejections: true, + format: format.combine(systemFilter(), baseFormat), + }), + new DailyRotateFile({ + filename: path.join(logDir, "stellar-network-%DATE%.log"), + datePattern: "YYYY-MM-DD", + maxSize: "100m", + maxFiles: "10", + zippedArchive: true, + format: format.combine(networkFilter(), baseFormat), }), new transports.Console({ format: format.combine(format.colorize(), format.simple()), @@ -40,4 +59,13 @@ const logger = createLogger({ logger.error(`[FETCHER_ERROR] ${message}`, meta); }; +// Add custom methods for network boundary logging +(logger as any).networkInfo = (message: string, meta?: any) => { + logger.info(`[NETWORK] ${message}`, { ...meta, isNetwork: true }); +}; + +(logger as any).networkError = (message: string, meta?: any) => { + logger.error(`[NETWORK_ERROR] ${message}`, { ...meta, isNetwork: true }); +}; + export default logger;