From b54a14da9891ccfe5d5e41504d2e9c819386fa5d Mon Sep 17 00:00:00 2001 From: mikkyvans0-source Date: Mon, 1 Jun 2026 18:45:41 +0100 Subject: [PATCH] feat: implement blockchain indexing service with reorg detection, transactional atomicity, and event idempotency support --- .../blockchain-indexer.service.spec.ts | 53 ++-- src/blockchain/blockchain-indexer.service.ts | 148 ++++++---- src/blockchain/blockchain-indexer.spec.ts | 16 +- .../blockchain-reorg.integration.spec.ts | 135 +++++++++ src/blockchain/blockchain-replay.spec.ts | 265 ++++++------------ .../entities/processed-event.entity.ts | 8 + src/blockchain/reorg-detector.service.ts | 1 + src/blockchain/utils/rpc-backoff.util.spec.ts | 110 ++++++++ src/blockchain/utils/rpc-backoff.util.ts | 155 ++++++++++ src/blockchain/utils/sequential-queue.spec.ts | 70 +++++ src/blockchain/utils/sequential-queue.ts | 52 ++++ src/indexer/event-indexer.service.ts | 25 +- .../1769500000002-AddProcessedEventPayload.ts | 17 ++ .../services/blockchain-listener.service.ts | 18 +- 14 files changed, 804 insertions(+), 269 deletions(-) create mode 100644 src/blockchain/blockchain-reorg.integration.spec.ts create mode 100644 src/blockchain/utils/rpc-backoff.util.spec.ts create mode 100644 src/blockchain/utils/rpc-backoff.util.ts create mode 100644 src/blockchain/utils/sequential-queue.spec.ts create mode 100644 src/blockchain/utils/sequential-queue.ts create mode 100644 src/migrations/1769500000002-AddProcessedEventPayload.ts diff --git a/src/blockchain/blockchain-indexer.service.spec.ts b/src/blockchain/blockchain-indexer.service.spec.ts index 9832f50..3b09231 100644 --- a/src/blockchain/blockchain-indexer.service.spec.ts +++ b/src/blockchain/blockchain-indexer.service.spec.ts @@ -1,6 +1,6 @@ import { Test, TestingModule } from '@nestjs/testing'; import { getRepositoryToken } from '@nestjs/typeorm'; -import { Repository, DataSource } from 'typeorm'; +import { Repository, DataSource, MoreThanOrEqual } from 'typeorm'; import { BlockchainIndexerService } from './blockchain-indexer.service'; import { ProcessedEvent } from './entities/processed-event.entity'; import { TokenBalance } from './entities/token-balance.entity'; @@ -176,37 +176,34 @@ describe('BlockchainIndexerService', () => { }); describe('replayFromBlock', () => { - it('should delete events from startBlock onward and replay', async () => { - const createQueryBuilder = jest.fn().mockReturnValue({ - delete: jest.fn().mockReturnValue({ - where: jest.fn().mockReturnValue({ - execute: jest.fn().mockResolvedValue({ affected: 3, raw: {} }) - }) - }) - }); - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockImplementation(createQueryBuilder); + // Replay is now transactional and reverses orphaned state; the detailed + // coverage lives in blockchain-replay.spec.ts. This is a smoke check that + // the service drives the transaction and deletes everything >= startBlock. + it('should transactionally delete events from startBlock onward', async () => { + const manager = { + find: jest.fn().mockResolvedValue([]), + delete: jest.fn().mockResolvedValue({ affected: 3, raw: {} }), + save: jest.fn().mockResolvedValue(null), + increment: jest.fn().mockResolvedValue({ affected: 1 }), + decrement: jest.fn().mockResolvedValue({ affected: 1 }), + findOne: jest.fn().mockResolvedValue(null), + }; + const queryRunner = { + connect: jest.fn(), + startTransaction: jest.fn(), + commitTransaction: jest.fn(), + rollbackTransaction: jest.fn(), + release: jest.fn(), + manager, + }; + jest.spyOn(dataSource, 'createQueryRunner').mockReturnValue(queryRunner as any); await service.replayFromBlock(100); - expect(processedEventRepo.createQueryBuilder).toHaveBeenCalled(); - expect(createQueryBuilder().delete().where).toHaveBeenCalledWith('blockNumber >= :startBlock', { startBlock: 100 }); - }); - - it('should prevent double processing by cleaning up all future events', async () => { - // Test that ensures events from blocks 100, 101, 102+ are all cleaned up - const createQueryBuilder = jest.fn().mockReturnValue({ - delete: jest.fn().mockReturnValue({ - where: jest.fn().mockReturnValue({ - execute: jest.fn().mockResolvedValue({ affected: 5, raw: {} }) - }) - }) + expect(queryRunner.commitTransaction).toHaveBeenCalled(); + expect(manager.delete).toHaveBeenCalledWith(ProcessedEvent, { + blockNumber: MoreThanOrEqual(100), }); - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockImplementation(createQueryBuilder); - - await service.replayFromBlock(100); - - // Verify the query uses >= to clean up all events from startBlock onward - expect(createQueryBuilder().delete().where).toHaveBeenCalledWith('blockNumber >= :startBlock', { startBlock: 100 }); }); }); }); \ No newline at end of file diff --git a/src/blockchain/blockchain-indexer.service.ts b/src/blockchain/blockchain-indexer.service.ts index d942e03..787fa83 100644 --- a/src/blockchain/blockchain-indexer.service.ts +++ b/src/blockchain/blockchain-indexer.service.ts @@ -1,15 +1,24 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { Repository, DataSource } from 'typeorm'; +import { Repository, DataSource, EntityManager, MoreThanOrEqual } from 'typeorm'; import { ProcessedEvent } from './entities/processed-event.entity'; import { TokenBalance } from './entities/token-balance.entity'; import { IndexerCheckpoint } from './entities/indexer-checkpoint.entity'; import { BlockchainEvent, TransferEventData } from './interfaces/blockchain-event.interface'; +import { SequentialQueue } from './utils/sequential-queue'; @Injectable() export class BlockchainIndexerService { private readonly logger = new Logger(BlockchainIndexerService.name); + /** + * Serialises every state mutation (event processing and reorg rollbacks) so + * they are applied strictly in order and never interleave. Without this, a + * reorg rollback could race against a newer block's transaction and leave + * balances or the checkpoint desynced. + */ + private readonly queue = new SequentialQueue(); + constructor( @InjectRepository(ProcessedEvent) private processedEventRepo: Repository, @@ -20,10 +29,20 @@ export class BlockchainIndexerService { private dataSource: DataSource, ) {} + /** + * Process a single blockchain event. Enqueued so events are persisted one at + * a time, in submission order. + */ async processEvent(event: BlockchainEvent): Promise { + return this.queue.enqueue(() => this.processEventInternal(event)); + } + + private async processEventInternal(event: BlockchainEvent): Promise { const { txHash, logIndex, blockNumber, eventType, data } = event; - // Check if event already processed using the transaction/log identity. + // Idempotency: an event is uniquely identified by (txHash, logIndex). + // Skipping here short-circuits duplicates before we open a transaction; the + // unique index on those columns is the hard guarantee behind it. const existing = await this.processedEventRepo.findOne({ where: { txHash, logIndex }, }); @@ -33,32 +52,34 @@ export class BlockchainIndexerService { return; } - // Start transaction const queryRunner = this.dataSource.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction(); try { - // Insert event record (will fail if unique constraint violated) + // Persist the event record. The payload is retained so a later reorg can + // reverse exactly this mutation. const processedEvent = this.processedEventRepo.create({ txHash, logIndex, blockNumber, eventType, + payload: (data as Record) ?? null, }); await queryRunner.manager.save(ProcessedEvent, processedEvent); - // Process the event + // Apply the state mutation for this event type. if (eventType === 'Transfer') { - await this.updateBalances(queryRunner.manager, data as TransferEventData); + await this.applyTransfer(queryRunner.manager, data as TransferEventData); } + // Advance the checkpoint inside the SAME transaction so the event, the + // balance changes and the checkpoint all commit atomically. If anything + // fails we roll back as a unit and the checkpoint never moves. + await this.saveCheckpoint(queryRunner.manager, blockNumber); + await queryRunner.commitTransaction(); this.logger.log(`Processed event: ${eventType} at block ${blockNumber}`); - - // Save checkpoint after successful commit to avoid checkpoint desyncs - // if the transaction is rolled back. Use repository (outside txn). - await this.saveCheckpointAfterCommit(blockNumber); } catch (error) { await queryRunner.rollbackTransaction(); this.logger.error(`Failed to process event: ${error.message}`, error.stack); @@ -68,61 +89,94 @@ export class BlockchainIndexerService { } } - private async updateBalances(manager: any, data: TransferEventData): Promise { - const { from, to, amount, token } = data; + /** + * Roll back every event from `startBlock` onward and reverse the state it + * applied. Used to recover from a chain reorganization: orphaned blocks are + * undone atomically and the checkpoint is rewound so the canonical chain can + * be re-indexed cleanly. + */ + async replayFromBlock(startBlock: number): Promise { + return this.queue.enqueue(() => this.replayFromBlockInternal(startBlock)); + } - // Decrease sender balance - await manager.decrement(TokenBalance, { address: from, tokenAddress: token }, 'balance', amount); + private async replayFromBlockInternal(startBlock: number): Promise { + this.logger.log(`Rolling back state from block ${startBlock}`); - // Increase receiver balance - await manager.increment(TokenBalance, { address: to, tokenAddress: token }, 'balance', amount); - } + const queryRunner = this.dataSource.createQueryRunner(); + await queryRunner.connect(); + await queryRunner.startTransaction(); - private async getLastBlock(manager: any): Promise { - const checkpoint = await manager.findOne(IndexerCheckpoint, { where: { id: 1 } }); - return checkpoint ? checkpoint.lastBlock : null; + try { + // Load orphaned events newest-first so reversals unwind in the opposite + // order they were applied. + const orphaned = await queryRunner.manager.find(ProcessedEvent, { + where: { blockNumber: MoreThanOrEqual(startBlock) }, + order: { blockNumber: 'DESC', logIndex: 'DESC' }, + }); + + for (const event of orphaned) { + if (event.eventType === 'Transfer' && event.payload) { + await this.reverseTransfer( + queryRunner.manager, + event.payload as TransferEventData, + ); + } + } + + // Remove the orphaned event records (>= startBlock) so the canonical + // chain can be re-indexed without tripping the idempotency check. + await queryRunner.manager.delete(ProcessedEvent, { + blockNumber: MoreThanOrEqual(startBlock), + }); + + // Rewind the checkpoint to just before the rolled-back range. + const rewoundTo = Math.max(0, startBlock - 1); + await queryRunner.manager.save(IndexerCheckpoint, { + id: 1, + lastBlock: rewoundTo, + updatedAt: new Date(), + }); + + await queryRunner.commitTransaction(); + this.logger.log( + `Rolled back ${orphaned.length} event(s); checkpoint rewound to block ${rewoundTo}`, + ); + } catch (error) { + await queryRunner.rollbackTransaction(); + this.logger.error(`Failed to roll back from block ${startBlock}: ${error.message}`, error.stack); + throw error; + } finally { + await queryRunner.release(); + } } - private async saveCheckpoint(manager: any, blockNumber: number): Promise { - const currentLastBlock = (await this.getLastBlock(manager)) || 0; - const nextLastBlock = Math.max(currentLastBlock, blockNumber); + private async applyTransfer(manager: EntityManager, data: TransferEventData): Promise { + const { from, to, amount, token } = data; + await manager.decrement(TokenBalance, { address: from, tokenAddress: token }, 'balance', amount); + await manager.increment(TokenBalance, { address: to, tokenAddress: token }, 'balance', amount); + } - await manager.save(IndexerCheckpoint, { - id: 1, - lastBlock: nextLastBlock, - updatedAt: new Date(), - }); + /** Inverse of {@link applyTransfer}, used when unwinding an orphaned block. */ + private async reverseTransfer(manager: EntityManager, data: TransferEventData): Promise { + const { from, to, amount, token } = data; + await manager.increment(TokenBalance, { address: from, tokenAddress: token }, 'balance', amount); + await manager.decrement(TokenBalance, { address: to, tokenAddress: token }, 'balance', amount); } - private async saveCheckpointAfterCommit(blockNumber: number): Promise { - const checkpoint = await this.checkpointRepo.findOne({ where: { id: 1 } }); + private async saveCheckpoint(manager: EntityManager, blockNumber: number): Promise { + const checkpoint = await manager.findOne(IndexerCheckpoint, { where: { id: 1 } }); const currentLastBlock = checkpoint ? checkpoint.lastBlock : 0; const nextLastBlock = Math.max(currentLastBlock || 0, blockNumber); - await this.checkpointRepo.save({ + await manager.save(IndexerCheckpoint, { id: 1, lastBlock: nextLastBlock, updatedAt: new Date(), }); } - async replayFromBlock(startBlock: number): Promise { - this.logger.log(`Starting replay from block ${startBlock}`); - - // Delete processed events from startBlock onwards (>= to prevent stale events) - await this.processedEventRepo.createQueryBuilder() - .delete() - .where('blockNumber >= :startBlock', { startBlock }) - .execute(); - - // Note: In a real implementation, you'd fetch events from blockchain - // For now, assume events are provided externally or mocked - // This is a placeholder for replay logic - this.logger.log(`Replay completed from block ${startBlock}`); - } - async getLastProcessedBlock(): Promise { const checkpoint = await this.checkpointRepo.findOne({ where: { id: 1 } }); return checkpoint ? checkpoint.lastBlock : null; } -} \ No newline at end of file +} diff --git a/src/blockchain/blockchain-indexer.spec.ts b/src/blockchain/blockchain-indexer.spec.ts index d365d24..887c92a 100644 --- a/src/blockchain/blockchain-indexer.spec.ts +++ b/src/blockchain/blockchain-indexer.spec.ts @@ -59,17 +59,21 @@ describe('BlockchainIndexerService - Checkpoint Commit Behavior', () => { service = module.get(BlockchainIndexerService); }); - it('saves checkpoint after commit when processing succeeds', async () => { + it('saves the checkpoint inside the transaction when processing succeeds', async () => { const event = { txHash: '0x1', logIndex: 0, blockNumber: 123, eventType: 'Transfer', data: { from: 'a', to: 'b', amount: 1, token: 'T' } } as any; + const manager = dataSource.createQueryRunner().manager; + await service.processEvent(event); expect(dataSource.createQueryRunner).toHaveBeenCalled(); - // Repository.save should be called after commit - expect(checkpointRepo.save).toHaveBeenCalledTimes(1); - const saved = checkpointRepo.save.mock.calls[0][0]; - expect(saved.id).toBe(1); - expect(saved.lastBlock).toBe(123); + // Checkpoint must be persisted through the transaction's manager (atomic + // with the event + balance changes), not via the repository after commit. + expect(manager.save).toHaveBeenCalledWith( + IndexerCheckpoint, + expect.objectContaining({ id: 1, lastBlock: 123 }), + ); + expect(checkpointRepo.save).not.toHaveBeenCalled(); }); it('does not save checkpoint when processing fails and rolls back', async () => { diff --git a/src/blockchain/blockchain-reorg.integration.spec.ts b/src/blockchain/blockchain-reorg.integration.spec.ts new file mode 100644 index 0000000..ead18af --- /dev/null +++ b/src/blockchain/blockchain-reorg.integration.spec.ts @@ -0,0 +1,135 @@ +import { DataSource, Repository } from 'typeorm'; +import { BlockchainIndexerService } from './blockchain-indexer.service'; +import { ProcessedEvent } from './entities/processed-event.entity'; +import { TokenBalance } from './entities/token-balance.entity'; +import { IndexerCheckpoint } from './entities/indexer-checkpoint.entity'; +import { BlockchainEvent } from './interfaces/blockchain-event.interface'; + +/** + * End-to-end integration test against a real in-memory SQLite database. + * Exercises the acceptance criteria directly: + * - recovery of balances after a 10-block reorg + * - idempotent processing of duplicate events + * - atomic (transactional) block persistence + */ +describe('BlockchainIndexerService (integration: reorg + idempotency)', () => { + let dataSource: DataSource; + let service: BlockchainIndexerService; + let processedEventRepo: Repository; + let tokenBalanceRepo: Repository; + let checkpointRepo: Repository; + + const TOKEN = '0xtoken'; + const ALICE = '0xalice'; + const BOB = '0xbob'; + + const transferAt = (blockNumber: number, amount: string): BlockchainEvent => ({ + txHash: `0xtx${blockNumber}`, + logIndex: 0, + blockNumber, + eventType: 'Transfer', + data: { from: ALICE, to: BOB, amount, token: TOKEN }, + }); + + const balanceOf = async (address: string): Promise => { + const row = await tokenBalanceRepo.findOne({ + where: { address, tokenAddress: TOKEN }, + }); + return Number(row?.balance ?? 0); + }; + + beforeEach(async () => { + dataSource = new DataSource({ + type: 'sqlite', + database: ':memory:', + entities: [ProcessedEvent, TokenBalance, IndexerCheckpoint], + synchronize: true, + }); + await dataSource.initialize(); + + processedEventRepo = dataSource.getRepository(ProcessedEvent); + tokenBalanceRepo = dataSource.getRepository(TokenBalance); + checkpointRepo = dataSource.getRepository(IndexerCheckpoint); + + // Seed starting balances. increment/decrement update existing rows. + await tokenBalanceRepo.save([ + { address: ALICE, tokenAddress: TOKEN, balance: '1000' }, + { address: BOB, tokenAddress: TOKEN, balance: '0' }, + ]); + + service = new BlockchainIndexerService( + processedEventRepo, + tokenBalanceRepo, + checkpointRepo, + dataSource, + ); + }); + + afterEach(async () => { + await dataSource.destroy(); + }); + + it('recovers balances and checkpoint after a 10-block reorg', async () => { + // Index blocks 1..10: each transfers 10 from Alice to Bob. + for (let block = 1; block <= 10; block++) { + await service.processEvent(transferAt(block, '10')); + } + + expect(await balanceOf(ALICE)).toBe(900); + expect(await balanceOf(BOB)).toBe(100); + expect(await service.getLastProcessedBlock()).toBe(10); + expect(await processedEventRepo.count()).toBe(10); + + // Reorg: blocks 6..10 are orphaned. Roll back from block 6. + await service.replayFromBlock(6); + + // Effect of the 5 orphaned transfers (5 x 10 = 50) must be reversed. + expect(await balanceOf(ALICE)).toBe(950); + expect(await balanceOf(BOB)).toBe(50); + // Orphaned event records removed; checkpoint rewound to block 5. + expect(await processedEventRepo.count()).toBe(5); + expect(await service.getLastProcessedBlock()).toBe(5); + + // Re-index the canonical chain for blocks 6..10. + for (let block = 6; block <= 10; block++) { + await service.processEvent(transferAt(block, '10')); + } + + // State is fully restored — no double counting, no stale balances. + expect(await balanceOf(ALICE)).toBe(900); + expect(await balanceOf(BOB)).toBe(100); + expect(await service.getLastProcessedBlock()).toBe(10); + expect(await processedEventRepo.count()).toBe(10); + }); + + it('processes duplicate events exactly once (idempotency)', async () => { + const event = transferAt(1, '10'); + + await service.processEvent(event); + await service.processEvent(event); // duplicate delivery + await service.processEvent(event); // and again + + expect(await balanceOf(ALICE)).toBe(990); + expect(await balanceOf(BOB)).toBe(10); + expect(await processedEventRepo.count()).toBe(1); + }); + + it('rolls back the whole block atomically when the checkpoint write fails', async () => { + // First, index block 1 successfully. + await service.processEvent(transferAt(1, '10')); + expect(await balanceOf(ALICE)).toBe(990); + expect(await processedEventRepo.count()).toBe(1); + + // Force a genuine mid-transaction failure: remove the checkpoint table so + // the in-transaction checkpoint write throws. The event insert and the + // balance mutations for block 2 must all roll back as a unit. + await dataSource.query('DROP TABLE indexer_checkpoint'); + + await expect(service.processEvent(transferAt(2, '10'))).rejects.toThrow(); + + // Nothing from the failed block 2 was persisted. + expect(await balanceOf(ALICE)).toBe(990); + expect(await balanceOf(BOB)).toBe(10); + expect(await processedEventRepo.count()).toBe(1); + }); +}); diff --git a/src/blockchain/blockchain-replay.spec.ts b/src/blockchain/blockchain-replay.spec.ts index f4e4954..b00c050 100644 --- a/src/blockchain/blockchain-replay.spec.ts +++ b/src/blockchain/blockchain-replay.spec.ts @@ -1,219 +1,140 @@ import { Test, TestingModule } from '@nestjs/testing'; import { getRepositoryToken } from '@nestjs/typeorm'; -import { Repository, DataSource } from 'typeorm'; +import { Repository, DataSource, MoreThanOrEqual } from 'typeorm'; import { BlockchainIndexerService } from './blockchain-indexer.service'; import { ProcessedEvent } from './entities/processed-event.entity'; import { TokenBalance } from './entities/token-balance.entity'; import { IndexerCheckpoint } from './entities/indexer-checkpoint.entity'; -import { BlockchainEvent } from './interfaces/blockchain-event.interface'; +/** + * Replay / reorg rollback is now transactional: it reverses the state applied + * by orphaned events, deletes those event records (>= startBlock), and rewinds + * the checkpoint — all in a single transaction. + */ describe('BlockchainIndexerService - Replay Regression Tests', () => { let service: BlockchainIndexerService; let processedEventRepo: Repository; - let tokenBalanceRepo: Repository; - let checkpointRepo: Repository; let dataSource: DataSource; + let manager: any; + + const buildManager = (orphaned: Partial[] = []) => ({ + find: jest.fn().mockResolvedValue(orphaned), + delete: jest.fn().mockResolvedValue({ affected: orphaned.length, raw: {} }), + save: jest.fn().mockResolvedValue(null), + increment: jest.fn().mockResolvedValue({ affected: 1 }), + decrement: jest.fn().mockResolvedValue({ affected: 1 }), + findOne: jest.fn().mockResolvedValue(null), + }); + + const setup = async (orphaned: Partial[] = []) => { + manager = buildManager(orphaned); + const queryRunner = { + connect: jest.fn().mockResolvedValue(null), + startTransaction: jest.fn().mockResolvedValue(null), + commitTransaction: jest.fn().mockResolvedValue(null), + rollbackTransaction: jest.fn().mockResolvedValue(null), + release: jest.fn().mockResolvedValue(null), + manager, + }; - beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ providers: [ BlockchainIndexerService, - { - provide: getRepositoryToken(ProcessedEvent), - useClass: Repository, - }, - { - provide: getRepositoryToken(TokenBalance), - useClass: Repository, - }, - { - provide: getRepositoryToken(IndexerCheckpoint), - useClass: Repository, - }, - { - provide: DataSource, - useValue: { - createQueryRunner: jest.fn(), - }, - }, + { provide: getRepositoryToken(ProcessedEvent), useClass: Repository }, + { provide: getRepositoryToken(TokenBalance), useClass: Repository }, + { provide: getRepositoryToken(IndexerCheckpoint), useClass: Repository }, + { provide: DataSource, useValue: { createQueryRunner: jest.fn().mockReturnValue(queryRunner) } }, ], }).compile(); - service = module.get(BlockchainIndexerService); - processedEventRepo = module.get>(getRepositoryToken(ProcessedEvent)); - tokenBalanceRepo = module.get>(getRepositoryToken(TokenBalance)); - checkpointRepo = module.get>(getRepositoryToken(IndexerCheckpoint)); - dataSource = module.get(DataSource); - }); + service = module.get(BlockchainIndexerService); + processedEventRepo = module.get(getRepositoryToken(ProcessedEvent)); + dataSource = module.get(DataSource); + return queryRunner; + }; describe('Replay State Consistency', () => { - it('should produce identical state when run multiple times', async () => { - // Setup: Create events in blocks 100, 101, 102 - const events = [ - { blockNumber: 100, txHash: '0x100', logIndex: 0, eventType: 'Transfer' }, - { blockNumber: 101, txHash: '0x101', logIndex: 0, eventType: 'Transfer' }, - { blockNumber: 102, txHash: '0x102', logIndex: 0, eventType: 'Transfer' }, - ]; - - // Mock the createQueryBuilder for delete operations - const mockQueryBuilder = { - delete: jest.fn().mockReturnThis(), - where: jest.fn().mockReturnThis(), - execute: jest.fn().mockResolvedValue({ affected: 3, raw: {} }), - }; - - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockReturnValue(mockQueryBuilder as any); - - // First replay - await service.replayFromBlock(100); - - const firstDeleteCall = mockQueryBuilder.where.mock.calls[0]; - expect(firstDeleteCall[0]).toBe('blockNumber >= :startBlock'); - expect(firstDeleteCall[1]).toEqual({ startBlock: 100 }); - expect(mockQueryBuilder.execute).toHaveBeenCalledTimes(1); + it('deletes all events from startBlock onward (>=) and rewinds the checkpoint', async () => { + await setup(); - // Reset mocks - jest.clearAllMocks(); - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockReturnValue(mockQueryBuilder as any); - - // Second replay - should behave identically await service.replayFromBlock(100); - - const secondDeleteCall = mockQueryBuilder.where.mock.calls[0]; - expect(secondDeleteCall[0]).toBe('blockNumber >= :startBlock'); - expect(secondDeleteCall[1]).toEqual({ startBlock: 100 }); - expect(mockQueryBuilder.execute).toHaveBeenCalledTimes(1); - - // Both replays should have identical behavior - expect(firstDeleteCall).toEqual(secondDeleteCall); - }); - it('should clean up all events from startBlock onward preventing stale data', async () => { - // Mock events that would exist in database - const existingEvents = [ - { id: '1', blockNumber: 99, txHash: '0x99' }, // Should NOT be deleted - { id: '2', blockNumber: 100, txHash: '0x100' }, // Should be deleted - { id: '3', blockNumber: 101, txHash: '0x101' }, // Should be deleted - { id: '4', blockNumber: 102, txHash: '0x102' }, // Should be deleted - ]; - - const mockQueryBuilder = { - delete: jest.fn().mockReturnThis(), - where: jest.fn().mockReturnThis(), - execute: jest.fn().mockResolvedValue({ affected: 3, raw: {} }), // 3 events deleted (100, 101, 102) - }; - - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockReturnValue(mockQueryBuilder as any); - - // Replay from block 100 - await service.replayFromBlock(100); - - // Verify the correct WHERE clause is used - expect(mockQueryBuilder.where).toHaveBeenCalledWith('blockNumber >= :startBlock', { startBlock: 100 }); - - // Verify 3 events were deleted (blocks 100, 101, 102) - expect(mockQueryBuilder.execute).toHaveBeenCalled(); + // CRITICAL regression guard: deletion must cover everything >= startBlock, + // not just the exact block (the original double-processing bug). + expect(manager.delete).toHaveBeenCalledWith(ProcessedEvent, { + blockNumber: MoreThanOrEqual(100), + }); + // Checkpoint rewound to just before the rolled-back range. + expect(manager.save).toHaveBeenCalledWith( + IndexerCheckpoint, + expect.objectContaining({ id: 1, lastBlock: 99 }), + ); }); - it('should handle edge case when startBlock is higher than existing events', async () => { - const mockQueryBuilder = { - delete: jest.fn().mockReturnThis(), - where: jest.fn().mockReturnThis(), - execute: jest.fn().mockResolvedValue({ affected: 0, raw: {} }), // No events deleted - }; - - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockReturnValue(mockQueryBuilder as any); + it('reverses the balance effect of orphaned Transfer events', async () => { + await setup([ + { + blockNumber: 101, + logIndex: 0, + eventType: 'Transfer', + payload: { from: '0xa', to: '0xb', amount: '100', token: '0xc' }, + }, + ]); - // Replay from a block higher than any existing events - await service.replayFromBlock(999); + await service.replayFromBlock(100); - expect(mockQueryBuilder.where).toHaveBeenCalledWith('blockNumber >= :startBlock', { startBlock: 999 }); - expect(mockQueryBuilder.execute).toHaveBeenCalled(); + // Inverse of applyTransfer: sender credited back, receiver debited. + expect(manager.increment).toHaveBeenCalledWith( + TokenBalance, + { address: '0xa', tokenAddress: '0xc' }, + 'balance', + '100', + ); + expect(manager.decrement).toHaveBeenCalledWith( + TokenBalance, + { address: '0xb', tokenAddress: '0xc' }, + 'balance', + '100', + ); }); - it('should handle replay from block 0 (full replay)', async () => { - const mockQueryBuilder = { - delete: jest.fn().mockReturnThis(), - where: jest.fn().mockReturnThis(), - execute: jest.fn().mockResolvedValue({ affected: 100, raw: {} }), // All events deleted - }; - - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockReturnValue(mockQueryBuilder as any); + it('rewinds the checkpoint to 0 for a full replay from block 0', async () => { + await setup(); - // Full replay from beginning await service.replayFromBlock(0); - expect(mockQueryBuilder.where).toHaveBeenCalledWith('blockNumber >= :startBlock', { startBlock: 0 }); - expect(mockQueryBuilder.execute).toHaveBeenCalled(); + expect(manager.delete).toHaveBeenCalledWith(ProcessedEvent, { + blockNumber: MoreThanOrEqual(0), + }); + expect(manager.save).toHaveBeenCalledWith( + IndexerCheckpoint, + expect.objectContaining({ id: 1, lastBlock: 0 }), + ); }); - it('should be idempotent - multiple replays from same block should be safe', async () => { - const mockQueryBuilder = { - delete: jest.fn().mockReturnThis(), - where: jest.fn().mockReturnThis(), - execute: jest.fn().mockResolvedValue({ affected: 0, raw: {} }), // No events to delete on subsequent runs - }; - - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockReturnValue(mockQueryBuilder as any); + it('is idempotent - repeated replays from the same block behave identically', async () => { + const queryRunner = await setup(); - // First replay await service.replayFromBlock(100); - - // Second replay (should be safe) await service.replayFromBlock(100); - - // Third replay (should still be safe) await service.replayFromBlock(100); - // All three calls should have identical behavior - expect(mockQueryBuilder.where).toHaveBeenCalledTimes(3); - expect(mockQueryBuilder.execute).toHaveBeenCalledTimes(3); - - // Verify all calls used the same parameters - mockQueryBuilder.where.mock.calls.forEach(call => { - expect(call[0]).toBe('blockNumber >= :startBlock'); - expect(call[1]).toEqual({ startBlock: 100 }); + expect(manager.delete).toHaveBeenCalledTimes(3); + expect(queryRunner.commitTransaction).toHaveBeenCalledTimes(3); + manager.delete.mock.calls.forEach((call: any[]) => { + expect(call[0]).toBe(ProcessedEvent); + expect(call[1]).toEqual({ blockNumber: MoreThanOrEqual(100) }); }); }); - }); - - describe('Regression Tests for Double Processing Prevention', () => { - it('should prevent double processing by cleaning up all future events', async () => { - // This test ensures the fix for the original bug where only exact block matches were deleted - const mockQueryBuilder = { - delete: jest.fn().mockReturnThis(), - where: jest.fn().mockReturnThis(), - execute: jest.fn().mockResolvedValue({ affected: 5, raw: {} }), - }; - - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockReturnValue(mockQueryBuilder as any); - - await service.replayFromBlock(100); - - // CRITICAL: Verify >= is used, not == (this was the original bug) - expect(mockQueryBuilder.where).toHaveBeenCalledWith('blockNumber >= :startBlock', { startBlock: 100 }); - - // The original buggy code would have used: { blockNumber: 100 } - // The fixed code should use: 'blockNumber >= :startBlock', { startBlock: 100 } - }); - - it('should ensure no stale events remain after replay', async () => { - // Test scenario: Events exist in blocks 100, 101, 102, 103 - // After replay from 101, only events from 101+ should be removed - // Events from block 100 should remain - - const mockQueryBuilder = { - delete: jest.fn().mockReturnThis(), - where: jest.fn().mockReturnThis(), - execute: jest.fn().mockResolvedValue({ affected: 3, raw: {} }), // Delete blocks 101, 102, 103 - }; - jest.spyOn(processedEventRepo, 'createQueryBuilder').mockReturnValue(mockQueryBuilder as any); + it('rolls back the transaction if rollback work fails', async () => { + const queryRunner = await setup(); + manager.delete.mockRejectedValueOnce(new Error('db error')); - await service.replayFromBlock(101); + await expect(service.replayFromBlock(100)).rejects.toThrow('db error'); - expect(mockQueryBuilder.where).toHaveBeenCalledWith('blockNumber >= :startBlock', { startBlock: 101 }); - expect(mockQueryBuilder.execute).toHaveBeenCalled(); + expect(queryRunner.rollbackTransaction).toHaveBeenCalled(); + expect(queryRunner.commitTransaction).not.toHaveBeenCalled(); }); }); }); diff --git a/src/blockchain/entities/processed-event.entity.ts b/src/blockchain/entities/processed-event.entity.ts index 7d890ec..1db90f1 100644 --- a/src/blockchain/entities/processed-event.entity.ts +++ b/src/blockchain/entities/processed-event.entity.ts @@ -18,6 +18,14 @@ export class ProcessedEvent { @Column({ name: 'event_type', length: 100 }) eventType: string; + /** + * Decoded event payload, retained so a reorg rollback can reverse the exact + * state mutation this event applied (e.g. the Transfer amounts). Nullable for + * backwards compatibility with rows written before this column existed. + */ + @Column({ name: 'payload', type: 'simple-json', nullable: true }) + payload: Record | null; + @Column({ name: 'processed_at', type: 'datetime', default: () => 'CURRENT_TIMESTAMP' }) processedAt: Date; } \ No newline at end of file diff --git a/src/blockchain/reorg-detector.service.ts b/src/blockchain/reorg-detector.service.ts index 05ccc67..42852ce 100644 --- a/src/blockchain/reorg-detector.service.ts +++ b/src/blockchain/reorg-detector.service.ts @@ -127,6 +127,7 @@ export class ReorgDetectorService { divergenceDepth++; checkBlockNumber--; } + } } return divergenceDepth; diff --git a/src/blockchain/utils/rpc-backoff.util.spec.ts b/src/blockchain/utils/rpc-backoff.util.spec.ts new file mode 100644 index 0000000..9828d6f --- /dev/null +++ b/src/blockchain/utils/rpc-backoff.util.spec.ts @@ -0,0 +1,110 @@ +import { + withRpcBackoff, + isRateLimitError, + isRetryableRpcError, +} from './rpc-backoff.util'; + +describe('rpc-backoff', () => { + // No-op sleep so tests don't wait on real timers. + const sleep = jest.fn().mockResolvedValue(undefined); + + beforeEach(() => jest.clearAllMocks()); + + describe('isRateLimitError', () => { + it.each([ + { status: 429 }, + { statusCode: 429 }, + { info: { responseStatus: '429 Too Many Requests' } }, + { code: -32005 }, + { message: 'Too Many Requests' }, + { message: 'request failed: rate limit exceeded' }, + ])('detects 429 / rate limit shape %j', (err) => { + expect(isRateLimitError(err)).toBe(true); + }); + + it('ignores unrelated errors', () => { + expect(isRateLimitError(new Error('execution reverted'))).toBe(false); + expect(isRateLimitError({ status: 400 })).toBe(false); + expect(isRateLimitError(null)).toBe(false); + }); + }); + + describe('isRetryableRpcError', () => { + it('retries transient server/network errors', () => { + expect(isRetryableRpcError({ code: 'SERVER_ERROR' })).toBe(true); + expect(isRetryableRpcError({ code: 'TIMEOUT' })).toBe(true); + expect(isRetryableRpcError({ code: 'ECONNRESET' })).toBe(true); + expect(isRetryableRpcError({ status: 503 })).toBe(true); + }); + + it('does not retry deterministic client errors', () => { + expect(isRetryableRpcError({ code: 'CALL_EXCEPTION' })).toBe(false); + expect(isRetryableRpcError({ status: 400 })).toBe(false); + }); + }); + + describe('withRpcBackoff', () => { + it('returns immediately when the call succeeds', async () => { + const fn = jest.fn().mockResolvedValue('ok'); + + await expect(withRpcBackoff(fn, { sleep })).resolves.toBe('ok'); + + expect(fn).toHaveBeenCalledTimes(1); + expect(sleep).not.toHaveBeenCalled(); + }); + + it('retries on 429 and eventually succeeds', async () => { + const rateLimit = { status: 429, message: 'Too Many Requests' }; + const fn = jest + .fn() + .mockRejectedValueOnce(rateLimit) + .mockRejectedValueOnce(rateLimit) + .mockResolvedValue('recovered'); + + const onRetry = jest.fn(); + const result = await withRpcBackoff(fn, { sleep, onRetry, jitter: false }); + + expect(result).toBe('recovered'); + expect(fn).toHaveBeenCalledTimes(3); + expect(sleep).toHaveBeenCalledTimes(2); + expect(onRetry).toHaveBeenCalledTimes(2); + }); + + it('backs off exponentially (no jitter)', async () => { + const fn = jest + .fn() + .mockRejectedValueOnce({ status: 429 }) + .mockRejectedValueOnce({ status: 429 }) + .mockResolvedValue('done'); + + await withRpcBackoff(fn, { sleep, jitter: false, baseDelayMs: 100 }); + + // 100 * 2^0, then 100 * 2^1 + expect(sleep).toHaveBeenNthCalledWith(1, 100); + expect(sleep).toHaveBeenNthCalledWith(2, 200); + }); + + it('gives up after maxRetries and rethrows the last error', async () => { + const rateLimit = { status: 429, message: 'Too Many Requests' }; + const fn = jest.fn().mockRejectedValue(rateLimit); + + await expect( + withRpcBackoff(fn, { sleep, maxRetries: 3, jitter: false }), + ).rejects.toBe(rateLimit); + + // initial attempt + 3 retries + expect(fn).toHaveBeenCalledTimes(4); + expect(sleep).toHaveBeenCalledTimes(3); + }); + + it('does not retry non-retryable errors', async () => { + const reverted = { code: 'CALL_EXCEPTION', message: 'execution reverted' }; + const fn = jest.fn().mockRejectedValue(reverted); + + await expect(withRpcBackoff(fn, { sleep })).rejects.toBe(reverted); + + expect(fn).toHaveBeenCalledTimes(1); + expect(sleep).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/blockchain/utils/rpc-backoff.util.ts b/src/blockchain/utils/rpc-backoff.util.ts new file mode 100644 index 0000000..972eb58 --- /dev/null +++ b/src/blockchain/utils/rpc-backoff.util.ts @@ -0,0 +1,155 @@ +/** + * Intelligent retry/backoff for RPC calls. + * + * Public RPC providers throttle aggressively and answer with HTTP 429 + * ("Too Many Requests") or transient server/network errors. Failing the whole + * indexing pass on the first 429 stalls the pipeline, so we retry with + * exponential backoff and full jitter, only for errors that are actually + * worth retrying. + */ + +export interface RpcBackoffOptions { + /** Maximum number of retries after the initial attempt. Default: 5. */ + maxRetries?: number; + /** Base delay in ms for the first retry. Default: 250. */ + baseDelayMs?: number; + /** Upper bound on any single delay in ms. Default: 10_000. */ + maxDelayMs?: number; + /** Add randomised jitter to spread out retries. Default: true. */ + jitter?: boolean; + /** Predicate deciding whether an error is worth retrying. */ + isRetryable?: (error: unknown) => boolean; + /** Hook invoked before each backoff sleep (useful for logging/metrics). */ + onRetry?: (error: unknown, attempt: number, delayMs: number) => void; + /** Injectable sleep, primarily so tests don't wait on real timers. */ + sleep?: (ms: number) => Promise; +} + +const defaultSleep = (ms: number): Promise => + new Promise((resolve) => setTimeout(resolve, ms)); + +/** + * Detect HTTP 429 / rate-limit responses across the shapes ethers, web3 and + * raw fetch errors surface them in. + */ +export function isRateLimitError(error: unknown): boolean { + if (!error || typeof error !== 'object') { + return false; + } + + const err = error as Record; + + // Direct status fields used by various HTTP/JSON-RPC clients. + if (err.status === 429 || err.statusCode === 429) { + return true; + } + + // ethers v6 nests the HTTP status under `info`. + if (err.info?.responseStatus && String(err.info.responseStatus).includes('429')) { + return true; + } + + // JSON-RPC error codes: -32005 is the de-facto "limit exceeded" code. + if (err.code === -32005) { + return true; + } + + const message = String(err.message ?? err.shortMessage ?? '').toLowerCase(); + return ( + message.includes('429') || + message.includes('too many requests') || + message.includes('rate limit') || + message.includes('rate-limit') || + message.includes('exceeded') + ); +} + +/** + * Default retryability: rate limits plus the transient ethers/network error + * codes. Deterministic client errors (bad params, reverts) are not retried. + */ +export function isRetryableRpcError(error: unknown): boolean { + if (isRateLimitError(error)) { + return true; + } + + if (!error || typeof error !== 'object') { + return false; + } + + const err = error as Record; + const retryableCodes = new Set([ + 'SERVER_ERROR', + 'TIMEOUT', + 'NETWORK_ERROR', + 'ECONNRESET', + 'ECONNREFUSED', + 'ETIMEDOUT', + 'EAI_AGAIN', + ]); + + if (retryableCodes.has(err.code)) { + return true; + } + + // 5xx gateway/server responses are transient. + const status = Number(err.status ?? err.statusCode); + return status >= 500 && status < 600; +} + +function computeDelay( + attempt: number, + baseDelayMs: number, + maxDelayMs: number, + jitter: boolean, +): number { + // Exponential: base * 2^(attempt-1), capped at maxDelay. + const exponential = Math.min(maxDelayMs, baseDelayMs * 2 ** (attempt - 1)); + if (!jitter) { + return exponential; + } + // Full jitter: random point in [0, exponential]. + return Math.floor(Math.random() * exponential); +} + +/** + * Run `fn`, retrying transient failures (429s, server/network errors) with + * exponential backoff. Re-throws the last error once retries are exhausted or + * when the error is not retryable. + */ +export async function withRpcBackoff( + fn: () => Promise, + options: RpcBackoffOptions = {}, +): Promise { + const { + maxRetries = 5, + baseDelayMs = 250, + maxDelayMs = 10_000, + jitter = true, + isRetryable = isRetryableRpcError, + onRetry, + sleep = defaultSleep, + } = options; + + let lastError: unknown; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (error) { + lastError = error; + + // Give up immediately on non-retryable errors or once retries run out. + if (attempt === maxRetries || !isRetryable(error)) { + throw error; + } + + const delayMs = computeDelay(attempt + 1, baseDelayMs, maxDelayMs, jitter); + onRetry?.(error, attempt + 1, delayMs); + await sleep(delayMs); + } + } + + // Unreachable, but keeps the type checker happy. + throw lastError; +} diff --git a/src/blockchain/utils/sequential-queue.spec.ts b/src/blockchain/utils/sequential-queue.spec.ts new file mode 100644 index 0000000..46bf622 --- /dev/null +++ b/src/blockchain/utils/sequential-queue.spec.ts @@ -0,0 +1,70 @@ +import { SequentialQueue } from './sequential-queue'; + +const tick = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +describe('SequentialQueue', () => { + it('runs tasks strictly in submission order even when later tasks are faster', async () => { + const queue = new SequentialQueue(); + const order: number[] = []; + + const p1 = queue.enqueue(async () => { + await tick(30); + order.push(1); + }); + const p2 = queue.enqueue(async () => { + await tick(5); + order.push(2); + }); + const p3 = queue.enqueue(async () => { + order.push(3); + }); + + await Promise.all([p1, p2, p3]); + + expect(order).toEqual([1, 2, 3]); + }); + + it('never overlaps task execution', async () => { + const queue = new SequentialQueue(); + let active = 0; + let maxActive = 0; + + const work = async () => { + active++; + maxActive = Math.max(maxActive, active); + await tick(10); + active--; + }; + + await Promise.all([ + queue.enqueue(work), + queue.enqueue(work), + queue.enqueue(work), + ]); + + expect(maxActive).toBe(1); + }); + + it('propagates a task rejection to its caller without stalling the queue', async () => { + const queue = new SequentialQueue(); + + const failing = queue.enqueue(async () => { + throw new Error('boom'); + }); + const after = queue.enqueue(async () => 'after'); + + await expect(failing).rejects.toThrow('boom'); + await expect(after).resolves.toBe('after'); + }); + + it('reports outstanding size and drains via onIdle', async () => { + const queue = new SequentialQueue(); + + queue.enqueue(() => tick(10)); + queue.enqueue(() => tick(10)); + expect(queue.size).toBe(2); + + await queue.onIdle(); + expect(queue.size).toBe(0); + }); +}); diff --git a/src/blockchain/utils/sequential-queue.ts b/src/blockchain/utils/sequential-queue.ts new file mode 100644 index 0000000..f36a20f --- /dev/null +++ b/src/blockchain/utils/sequential-queue.ts @@ -0,0 +1,52 @@ +/** + * A minimal in-process queue that runs async tasks strictly one at a time, + * in the order they were enqueued. + * + * The indexer relies on this to guarantee that blocks are persisted + * sequentially: a reorg rollback must never interleave with the processing + * of a newer block, otherwise balances and the checkpoint can desync. + * + * A task rejecting does not break the chain — subsequent tasks still run, and + * the rejection is propagated to the caller that enqueued the failing task. + */ +export class SequentialQueue { + private tail: Promise = Promise.resolve(); + private pending = 0; + + /** + * Enqueue a task. Resolves/rejects with the task's own result, but only + * after every previously-enqueued task has settled. + */ + enqueue(task: () => Promise): Promise { + this.pending++; + + // Chain off the tail regardless of whether the previous task fulfilled or + // rejected, so one failure cannot stall the whole queue. + const run = this.tail.then( + () => task(), + () => task(), + ); + + // The tail swallows results/errors; it only exists to serialise execution. + this.tail = run.then( + () => { + this.pending--; + }, + () => { + this.pending--; + }, + ); + + return run; + } + + /** Number of tasks enqueued that have not yet settled. */ + get size(): number { + return this.pending; + } + + /** Resolves once the queue has fully drained. */ + async onIdle(): Promise { + await this.tail; + } +} diff --git a/src/indexer/event-indexer.service.ts b/src/indexer/event-indexer.service.ts index 75c36e8..0435809 100644 --- a/src/indexer/event-indexer.service.ts +++ b/src/indexer/event-indexer.service.ts @@ -4,6 +4,7 @@ import { ethers, EventLog } from 'ethers'; import { IndexedEvent, IndexingState } from '../entities'; import { EventIndexerConfig } from '../config'; import { serializeBigInts } from '../common/utils/bigint-serialization.util'; +import { withRpcBackoff } from '../blockchain/utils/rpc-backoff.util'; /** * Core event indexing service @@ -189,12 +190,24 @@ export class EventIndexerService { toBlock: number, ): Promise { try { - const logs = await this.provider.getLogs({ - address: contractAddress, - topics: [eventSignature], - fromBlock, - toBlock, - }); + // Wrap the RPC call so transient throttling (HTTP 429) and server/network + // blips are retried with exponential backoff instead of failing the pass. + const logs = await withRpcBackoff( + () => + this.provider.getLogs({ + address: contractAddress, + topics: [eventSignature], + fromBlock, + toBlock, + }), + { + onRetry: (error, attempt, delayMs) => + this.logger.warn( + `RPC getLogs throttled (blocks ${fromBlock}-${toBlock}), ` + + `retry ${attempt} in ${delayMs}ms: ${(error as Error)?.message ?? error}`, + ), + }, + ); return logs as EventLog[]; } catch (error) { diff --git a/src/migrations/1769500000002-AddProcessedEventPayload.ts b/src/migrations/1769500000002-AddProcessedEventPayload.ts new file mode 100644 index 0000000..1a4650c --- /dev/null +++ b/src/migrations/1769500000002-AddProcessedEventPayload.ts @@ -0,0 +1,17 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +/** + * Adds a nullable `payload` column to processed_events so the indexer can + * reverse the exact state mutation an event applied when rolling back a reorg. + */ +export class AddProcessedEventPayload1769500000002 implements MigrationInterface { + name = 'AddProcessedEventPayload1769500000002'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "processed_events" ADD "payload" text`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "processed_events" DROP COLUMN "payload"`); + } +} diff --git a/src/rewards/services/blockchain-listener.service.ts b/src/rewards/services/blockchain-listener.service.ts index d956f17..b883815 100644 --- a/src/rewards/services/blockchain-listener.service.ts +++ b/src/rewards/services/blockchain-listener.service.ts @@ -4,6 +4,7 @@ import { ethers } from 'ethers'; import { RewardSyncService } from './reward-sync.service'; import { RewardClaimEventDto } from '../dto/reward-claim-event.dto'; import { RewardDistributionEventDto } from '../dto/reward-distribution-event.dto'; +import { withRpcBackoff } from '../../blockchain/utils/rpc-backoff.util'; @Injectable() export class BlockchainListenerService implements OnModuleInit { @@ -80,12 +81,11 @@ export class BlockchainListenerService implements OnModuleInit { private async processBlockRange(fromBlock: number, toBlock: number) { try { - // Query RewardClaimed events + // Query RewardClaimed events. Wrapped with backoff so RPC throttling + // (HTTP 429) and transient server errors are retried, not fatal. const claimFilter = this.contract.filters.RewardClaimed(); - const claimEvents = await this.contract.queryFilter( - claimFilter, - fromBlock, - toBlock, + const claimEvents = await withRpcBackoff(() => + this.contract.queryFilter(claimFilter, fromBlock, toBlock), ); for (const event of claimEvents) { @@ -94,12 +94,10 @@ export class BlockchainListenerService implements OnModuleInit { } } - // Query RewardDistributed events + // Query RewardDistributed events (same backoff treatment). const distFilter = this.contract.filters.RewardDistributed(); - const distEvents = await this.contract.queryFilter( - distFilter, - fromBlock, - toBlock, + const distEvents = await withRpcBackoff(() => + this.contract.queryFilter(distFilter, fromBlock, toBlock), ); for (const event of distEvents) {