From 394d037f8d5aaa2435d0b81f8d7180463b570446 Mon Sep 17 00:00:00 2001 From: 0xull Date: Thu, 11 Jun 2026 13:37:58 +0100 Subject: [PATCH 1/5] [backend]: rewrite indexer with block-header subscription and topic-only filtering Replace address-filtered WebSocket log subscription with SubscribeNewHead plus per-block eth_getLogs using topic-only filters. Add chunk-local address set to handle Deposited preceding BasketCreated in the same factory transaction. Remove migrate.go and all migration tests. Signed-off-by: 0xull --- server/db/migrate.go | 126 ------- server/indexer/indexer.go | 588 ++++++++++++++++-------------- server/indexer/indexer_db_test.go | 524 +++++++++++++------------- server/indexer/indexer_test.go | 144 ++------ server/main.go | 5 - 5 files changed, 607 insertions(+), 780 deletions(-) delete mode 100644 server/db/migrate.go diff --git a/server/db/migrate.go b/server/db/migrate.go deleted file mode 100644 index 8cfc5c2..0000000 --- a/server/db/migrate.go +++ /dev/null @@ -1,126 +0,0 @@ -package db - -import ( - "fmt" - "strings" -) - -// Migrate applies schema migrations that cannot be expressed as CREATE TABLE IF NOT EXISTS -// in schema.sql because they modify existing tables on a live Railway volume. -// -// Each migration is idempotent: -// - ALTER TABLE statements are skipped if the column already exists. -// - CREATE UNIQUE INDEX IF NOT EXISTS is safe to re-run. -// - CREATE TABLE IF NOT EXISTS is safe to re-run. -// - UPDATE/DELETE statements whose effects are already applied are no-ops. -// -// Call Migrate() in main.go immediately after db.Open() and before starting -// the indexer. A migration failure is fatal. -func (d *DB) Migrate() error { - migrations := []struct { - name string - sql string - }{ - // Phase 1 — add log_index to event tables for idempotent re-scan inserts. - { - name: "deposits.log_index", - sql: `ALTER TABLE deposits ADD COLUMN log_index INTEGER NOT NULL DEFAULT 0`, - }, - { - name: "redemptions.log_index", - sql: `ALTER TABLE redemptions ADD COLUMN log_index INTEGER NOT NULL DEFAULT 0`, - }, - { - name: "rebalances.log_index", - sql: `ALTER TABLE rebalances ADD COLUMN log_index INTEGER NOT NULL DEFAULT 0`, - }, - { - name: "fee_snapshots.log_index", - sql: `ALTER TABLE fee_snapshots ADD COLUMN log_index INTEGER NOT NULL DEFAULT 0`, - }, - // Unique deduplication indexes. - { - name: "idx_deposits_dedup", - sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_deposits_dedup ON deposits(tx_hash, log_index)`, - }, - { - name: "idx_redemptions_dedup", - sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_redemptions_dedup ON redemptions(tx_hash, log_index)`, - }, - { - name: "idx_rebalances_dedup", - sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_rebalances_dedup ON rebalances(tx_hash, log_index)`, - }, - { - name: "idx_fee_snapshots_dedup", - sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_fee_snapshots_dedup ON fee_snapshots(tx_hash, log_index)`, - }, - // Seed failure tracker. - { - name: "basket_seed_failures", - sql: `CREATE TABLE IF NOT EXISTS basket_seed_failures ( - basket_address TEXT PRIMARY KEY, - attempts INTEGER NOT NULL DEFAULT 0, - last_attempt INTEGER NOT NULL - )`, - }, - - // Phase 2 — fix RevenueSnapshoted indexing. - // - // Root cause: the indexer never included CreatorToken contract addresses in - // its FilterLogs/WebSocket subscription filter, so every RevenueSnapshoted - // event was silently dropped. Additionally, even if the event had been - // received, handleFeeSnapshot stored vLog.Address (the CreatorToken address) - // as basket_address — the wrong value. - // - // Fix applied in indexer.go: - // 1. creatorTokenToBasket map tracks CreatorToken → basket address. - // 2. filterAddresses() now includes all CreatorToken addresses. - // 3. writeFeeSnapshot resolves the basket from creatorTokenToBasket (DB fallback). - // - // To make the already-emitted RevenueSnapshoted events visible we must: - // a. Delete any fee_snapshot rows written with the wrong basket_address - // (i.e. rows whose basket_address is a creator token address, not a basket). - // These were stored under the creator token address due to the old bug. - // b. Reset the event cursor to the deploy block so the historical scan - // replays from the beginning and re-indexes all events including the - // missed RevenueSnapshoted events with the correct basket_address. - // c. Clear the creator_claimable_cache so stale zeros are not returned - // while the rescan is in progress. - // - // All three steps are idempotent: deleting already-absent rows is a no-op, - // setting the cursor to a value it already has is a no-op, and deleting - // cache rows that don't exist is a no-op. - { - name: "fix_fee_snapshots_wrong_basket_address", - sql: `DELETE FROM fee_snapshots - WHERE basket_address NOT IN (SELECT address FROM baskets)`, - }, - { - name: "reset_event_cursor_for_creator_token_rescan", - sql: `UPDATE sync_cursors SET block_num = 68391146 WHERE key = 'events'`, - }, - { - name: "clear_creator_claimable_cache", - sql: `DELETE FROM creator_claimable_cache`, - }, - // Index creator_token_address so the writeFeeSnapshot DB fallback - // (SELECT address FROM baskets WHERE creator_token_address = ?) is - // an index seek rather than a full table scan during historical rescan. - { - name: "idx_baskets_creator_token", - sql: `CREATE INDEX IF NOT EXISTS idx_baskets_creator_token ON baskets(creator_token_address)`, - }, - } - - for _, m := range migrations { - if _, err := d.Exec(m.sql); err != nil { - if strings.Contains(err.Error(), "duplicate column name") { - continue - } - return fmt.Errorf("migration %q failed: %w", m.name, err) - } - } - - return nil -} \ No newline at end of file diff --git a/server/indexer/indexer.go b/server/indexer/indexer.go index 5031714..a34bb3f 100644 --- a/server/indexer/indexer.go +++ b/server/indexer/indexer.go @@ -40,6 +40,20 @@ var ( topicBasketSuspend = eventTopic("Suspended()") ) +// knownTopics is the complete set of event signatures this indexer handles. +// Passed as the sole filter when querying logs from the node. Address +// verification is performed in the handler against in-memory sets. +var knownTopics = [][]common.Hash{{ + topicBasketCreated, + topicDeposited, + topicRedeemed, + topicRebalanced, + topicFeeSnapshoted, + topicAssetAdded, + topicAssetDeact, + topicBasketSuspend, +}} + var ( depositedABI abi.Arguments redeemedABI abi.Arguments @@ -169,7 +183,6 @@ func init() { } // RevenueSnapshoted(uint256 indexed snapshotId, uint256 usdgAmount, uint256 totalSupply) - // snapshotId is Topics[1]; usdgAmount and totalSupply are in Data. feeSnapshotABI = abi.Arguments{ {Name: "usdgAmount", Type: uint256Type}, {Name: "totalSupply", Type: uint256Type}, @@ -181,7 +194,29 @@ type blockTsEntry struct { timestamp uint64 } -// Indexer subscribes to on-chain events and writes them to SQLite. +// chunkAddrs holds basket and creator token addresses first seen within a +// single writeChunkAtomic call. BasketFactory emits Deposited before +// BasketCreated in the same transaction. A pre-pass over the chunk decodes +// all BasketCreated logs and populates this set before the main write loop +// runs, so Deposited events that precede their own BasketCreated in log order +// are still correctly attributed. +type chunkAddrs struct { + baskets map[common.Address]bool + creatorTokens map[common.Address]common.Address +} + +func newChunkAddrs() *chunkAddrs { + return &chunkAddrs{ + baskets: make(map[common.Address]bool), + creatorTokens: make(map[common.Address]common.Address), + } +} + +// Indexer subscribes to new block headers via WebSocket and fetches event logs +// per block via HTTP using a topic-only filter. Address verification is +// performed against in-memory sets populated from the DB at startup and kept +// current as new baskets are indexed. The topic-only filter means the +// subscription never needs to be rebuilt when new contracts are deployed. type Indexer struct { ctx context.Context wsURL string @@ -191,14 +226,11 @@ type Indexer struct { deployBlock int64 db *db.DB - // basketAddrs is the set of known basket proxy addresses for filter construction. - mu sync.RWMutex - basketAddrs map[common.Address]bool - - // creatorTokenAddrs maps each CreatorToken address → its basket address. - // RevenueSnapshoted is emitted by CreatorToken contracts, not basket proxies. - // These addresses must be included in the event filter, and the mapping lets - // handleFeeSnapshot store the correct basket_address in fee_snapshots. + // basketAddrs and creatorTokenToBasket grow with basket deployments. + // They are never evicted because every address represents a live contract + // whose events must not be dropped. + mu sync.RWMutex + basketAddrs map[common.Address]bool creatorTokenToBasket map[common.Address]common.Address blockTsMu sync.Mutex @@ -206,7 +238,7 @@ type Indexer struct { blockTsFIFO []blockTsEntry } -// New constructs an Indexer. +// New constructs an Indexer. Fails immediately if BASKET_FACTORY_ADDRESS is unset. func New( ctx context.Context, wsURL, rpcURL, registryAddr string, @@ -233,7 +265,8 @@ func New( }, nil } -// Run starts the indexer. +// Run starts the indexer. Chain state is synced before any live processing +// begins so the address sets are fully populated. func (idx *Indexer) Run() { idx.syncFromChain() @@ -241,7 +274,7 @@ func (idx *Indexer) Run() { wg.Add(1) go func() { defer wg.Done() - idx.scanTransactionalEvents() + idx.scanHistoricalEvents() }() delay := reconnectBaseDelay @@ -253,12 +286,12 @@ func (idx *Indexer) Run() { default: } - if err := idx.subscribe(); err != nil { + if err := idx.subscribeBlocks(); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { wg.Wait() return } - log.Printf("indexer: subscription error: %v — reconnecting in %s", err, delay) + log.Printf("indexer: block subscription error: %v — reconnecting in %s", err, delay) select { case <-idx.ctx.Done(): wg.Wait() @@ -284,6 +317,11 @@ func (idx *Indexer) syncFromChain() { idx.syncAssets(client) idx.syncBaskets(client) idx.seedMissingConstituents() + + idx.mu.RLock() + log.Printf("indexer: address sets ready — %d baskets, %d creator tokens", + len(idx.basketAddrs), len(idx.creatorTokenToBasket)) + idx.mu.RUnlock() } func (idx *Indexer) syncAssets(client *ethclient.Client) { @@ -397,13 +435,17 @@ func (idx *Indexer) syncBaskets(client *ethclient.Client) { return } - tx, err := idx.db.Begin() - if err != nil { - log.Printf("indexer: syncBaskets begin tx: %v", err) - return + type basketEntry struct { + basketCommon common.Address + creatorTokenCommon common.Address + basket string + creatorToken string + creator string + createdAt int64 + suspended int } - count := 0 + entries := make([]basketEntry, 0, rv.Len()) for i := 0; i < rv.Len(); i++ { elem := rv.Index(i) @@ -422,18 +464,31 @@ func (idx *Indexer) syncBaskets(client *ethclient.Client) { continue } - basketCommon := basketField.Interface().(common.Address) + basketCommon := basketField.Interface().(common.Address) creatorTokenCommon := creatorTokenField.Interface().(common.Address) - - basket := strings.ToLower(basketCommon.Hex()) - creatorToken := strings.ToLower(creatorTokenCommon.Hex()) - creator := strings.ToLower(creatorField.Interface().(common.Address).Hex()) - createdAt := createdAtField.Interface().(*big.Int).Int64() - suspended := 0 + suspended := 0 if !activeField.Bool() { suspended = 1 } + entries = append(entries, basketEntry{ + basketCommon: basketCommon, + creatorTokenCommon: creatorTokenCommon, + basket: strings.ToLower(basketCommon.Hex()), + creatorToken: strings.ToLower(creatorTokenCommon.Hex()), + creator: strings.ToLower(creatorField.Interface().(common.Address).Hex()), + createdAt: createdAtField.Interface().(*big.Int).Int64(), + suspended: suspended, + }) + } + + tx, err := idx.db.Begin() + if err != nil { + log.Printf("indexer: syncBaskets begin tx: %v", err) + return + } + + for _, e := range entries { _, err := tx.Exec(` INSERT INTO baskets (address, creator_token_address, creator_address, name, symbol, thesis, @@ -441,19 +496,13 @@ func (idx *Indexer) syncBaskets(client *ethclient.Client) { VALUES (?, ?, ?, '', '', '', 0, ?, '', ?) ON CONFLICT(address) DO UPDATE SET suspended = excluded.suspended`, - basket, creatorToken, creator, createdAt, suspended, + e.basket, e.creatorToken, e.creator, e.createdAt, e.suspended, ) if err != nil { tx.Rollback() - log.Printf("indexer: syncBaskets upsert %s: %v — rolling back", basket, err) + log.Printf("indexer: syncBaskets upsert %s: %v — rolling back", e.basket, err) return } - - idx.mu.Lock() - idx.basketAddrs[basketCommon] = true - idx.creatorTokenToBasket[creatorTokenCommon] = basketCommon - idx.mu.Unlock() - count++ } if err := tx.Commit(); err != nil { @@ -461,11 +510,16 @@ func (idx *Indexer) syncBaskets(client *ethclient.Client) { return } - log.Printf("indexer: synced %d baskets from chain", count) + idx.mu.Lock() + for _, e := range entries { + idx.basketAddrs[e.basketCommon] = true + idx.creatorTokenToBasket[e.creatorTokenCommon] = e.basketCommon + } + idx.mu.Unlock() + + log.Printf("indexer: synced %d baskets from chain", len(entries)) } -// seedMissingConstituents fills metadata and constituent rows for any basket -// that has zero constituent rows and has not exceeded seedMaxAttempts failures. func (idx *Indexer) seedMissingConstituents() { rows, err := idx.db.Query(` SELECT b.address @@ -500,15 +554,21 @@ func (idx *Indexer) seedMissingConstituents() { log.Printf("indexer: seeding %d baskets", len(needsSeeding)) - const workers = 5 - work := make(chan string, len(needsSeeding)) - for _, addr := range needsSeeding { - work <- addr - } - close(work) + work := make(chan string) + go func() { + for _, addr := range needsSeeding { + work <- addr + } + close(work) + }() + const workers = 5 var wg sync.WaitGroup - for i := 0; i < workers && i < len(needsSeeding); i++ { + n := workers + if len(needsSeeding) < n { + n = len(needsSeeding) + } + for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() @@ -545,8 +605,6 @@ func (idx *Indexer) recordSeedFailure(basketAddr string) { } } -// seedOneBasket fetches basketState() plus name/symbol/thesis and writes all -// constituent rows and basket metadata atomically. func (idx *Indexer) seedOneBasket(client *ethclient.Client, basketAddr string) error { addr := common.HexToAddress(basketAddr) @@ -681,12 +739,12 @@ func (idx *Indexer) seedOneBasket(client *ethclient.Client, basketAddr string) e return nil } -// scanTransactionalEvents performs a cursor-based block scan for deposits, -// redemptions, rebalances, and fee snapshots. -func (idx *Indexer) scanTransactionalEvents() { +// scanHistoricalEvents replays all blocks from the stored cursor to chain tip +// using chunked eth_getLogs calls filtered by topic only. +func (idx *Indexer) scanHistoricalEvents() { client, err := idx.newHTTPClient() if err != nil { - log.Printf("indexer: scanTransactionalEvents dial error: %v", err) + log.Printf("indexer: scanHistoricalEvents dial error: %v", err) return } defer client.Close() @@ -700,7 +758,7 @@ func (idx *Indexer) scanTransactionalEvents() { if _, err := idx.db.Exec( `INSERT INTO sync_cursors (key, block_num) VALUES ('events', ?)`, fromBlock, ); err != nil { - log.Printf("indexer: scanTransactionalEvents: failed to initialise cursor: %v", err) + log.Printf("indexer: scanHistoricalEvents: failed to initialise cursor: %v", err) return } } @@ -709,7 +767,7 @@ func (idx *Indexer) scanTransactionalEvents() { latestHeader, err := client.HeaderByNumber(hctx, nil) hcancel() if err != nil { - log.Printf("indexer: scanTransactionalEvents latest block error: %v", err) + log.Printf("indexer: scanHistoricalEvents latest block error: %v", err) return } toBlock := latestHeader.Number.Int64() @@ -719,14 +777,12 @@ func (idx *Indexer) scanTransactionalEvents() { return } - log.Printf("indexer: scanning transactional events blocks %d → %d", fromBlock, toBlock) - - addresses := idx.filterAddresses() + log.Printf("indexer: scanning historical events blocks %d → %d", fromBlock, toBlock) for start := fromBlock; start <= toBlock; start += chunkSize { select { case <-idx.ctx.Done(): - log.Printf("indexer: scan interrupted at block %d by context cancellation", start) + log.Printf("indexer: historical scan interrupted at block %d", start) return default: } @@ -740,14 +796,7 @@ func (idx *Indexer) scanTransactionalEvents() { logs, err := client.FilterLogs(filterCtx, ethereum.FilterQuery{ FromBlock: big.NewInt(start), ToBlock: big.NewInt(end), - Addresses: addresses, - Topics: [][]common.Hash{{ - topicBasketCreated, - topicDeposited, - topicRedeemed, - topicRebalanced, - topicFeeSnapshoted, - }}, + Topics: knownTopics, }) filterCancel() @@ -762,35 +811,104 @@ func (idx *Indexer) scanTransactionalEvents() { } } - log.Printf("indexer: transactional event scan complete through block %d", toBlock) + log.Printf("indexer: historical scan complete through block %d", toBlock) } -// writeChunkAtomic writes all logs for a scan chunk and advances the cursor -// in a single SQLite transaction. +// subscribeBlocks subscribes to new block headers via WebSocket. For each +// header, event logs for that block are fetched via HTTP using a topic-only +// filter. +func (idx *Indexer) subscribeBlocks() error { + dialCtx, dialCancel := context.WithTimeout(idx.ctx, rpcTimeout) + defer dialCancel() + + wsClient, err := ethclient.DialContext(dialCtx, idx.wsURL) + if err != nil { + return fmt.Errorf("ws dial: %w", err) + } + defer wsClient.Close() + + headers := make(chan *types.Header, 16) + sub, err := wsClient.SubscribeNewHead(idx.ctx, headers) + if err != nil { + return fmt.Errorf("subscribe new head: %w", err) + } + defer sub.Unsubscribe() + + httpClient, err := idx.newHTTPClient() + if err != nil { + return fmt.Errorf("http dial: %w", err) + } + defer httpClient.Close() + + log.Println("indexer: live block subscription active") + + for { + select { + case <-idx.ctx.Done(): + return nil + + case err := <-sub.Err(): + return fmt.Errorf("subscription: %w", err) + + case header := <-headers: + blockNum := header.Number.Int64() + + fetchCtx, fetchCancel := context.WithTimeout(idx.ctx, rpcTimeout) + logs, err := httpClient.FilterLogs(fetchCtx, ethereum.FilterQuery{ + FromBlock: header.Number, + ToBlock: header.Number, + Topics: knownTopics, + }) + fetchCancel() + + if err != nil { + log.Printf("indexer: FilterLogs block %d: %v — skipping block", blockNum, err) + continue + } + + if len(logs) == 0 { + continue + } + + if err := idx.writeChunkAtomic(httpClient, logs, blockNum); err != nil { + log.Printf("indexer: writeChunkAtomic block %d: %v", blockNum, err) + } + } + } +} + +// writeChunkAtomic writes all logs for a block or scan chunk atomically and +// advances the event cursor to endBlock. func (idx *Indexer) writeChunkAtomic(client *ethclient.Client, logs []types.Log, endBlock int64) error { tx, err := idx.db.Begin() if err != nil { return fmt.Errorf("begin chunk tx: %w", err) } + local := newChunkAddrs() + + // Pre-pass: decode all BasketCreated logs and write basket rows first. + // This populates the local address set before the main loop processes + // Deposited and RevenueSnapshoted events from the same transactions. for _, vLog := range logs { - if len(vLog.Topics) == 0 { + if len(vLog.Topics) == 0 || vLog.Topics[0] != topicBasketCreated { continue } - - if vLog.Topics[0] == topicBasketCreated { - if err := tx.Commit(); err != nil { - return fmt.Errorf("pre-basket commit: %w", err) - } - idx.handleBasketCreated(client, vLog) - tx, err = idx.db.Begin() - if err != nil { - return fmt.Errorf("post-basket begin tx: %w", err) - } + if vLog.Address != idx.factoryAddr { continue } + if err := idx.writeBasketCreated(tx, client, vLog, local); err != nil { + tx.Rollback() + return fmt.Errorf("writeBasketCreated tx=%s: %w", vLog.TxHash.Hex(), err) + } + } - if err := idx.writeLog(tx, client, vLog); err != nil { + // Main pass: process all non-BasketCreated logs in emission order. + for _, vLog := range logs { + if len(vLog.Topics) == 0 || vLog.Topics[0] == topicBasketCreated { + continue + } + if err := idx.writeLog(tx, client, vLog, local); err != nil { tx.Rollback() return fmt.Errorf("write log tx=%s idx=%d: %w", vLog.TxHash.Hex(), vLog.Index, err) } @@ -803,116 +921,112 @@ func (idx *Indexer) writeChunkAtomic(client *ethclient.Client, logs []types.Log, return fmt.Errorf("advance cursor to %d: %w", endBlock, err) } - return tx.Commit() -} + if err := tx.Commit(); err != nil { + return err + } -// writeLog routes a single log to the appropriate write function within an -// open transaction. -func (idx *Indexer) writeLog(tx *sql.Tx, client *ethclient.Client, vLog types.Log) error { - if len(vLog.Topics) == 0 { - return nil + // Merge chunk-local sets into global sets after commit. + idx.mu.Lock() + for addr := range local.baskets { + idx.basketAddrs[addr] = true } - switch vLog.Topics[0] { - case topicDeposited: - return idx.writeDeposited(tx, client, vLog) - case topicRedeemed: - return idx.writeRedeemed(tx, client, vLog) - case topicRebalanced: - return idx.writeRebalanced(tx, client, vLog) - case topicFeeSnapshoted: - return idx.writeFeeSnapshot(tx, client, vLog) + for ct, basket := range local.creatorTokens { + idx.creatorTokenToBasket[ct] = basket } + idx.mu.Unlock() + return nil } -// handleLog is used by the live subscription. Each event type gets its own -// transaction. -func (idx *Indexer) handleLog(client *ethclient.Client, vLog types.Log) { +// writeLog routes a single non-BasketCreated log to its handler after +// verifying the emitting address belongs to this protocol. +func (idx *Indexer) writeLog(tx *sql.Tx, client *ethclient.Client, vLog types.Log, local *chunkAddrs) error { if len(vLog.Topics) == 0 { - return + return nil } - switch vLog.Topics[0] { - case topicBasketCreated: - idx.handleBasketCreated(client, vLog) - case topicDeposited: - tx, err := idx.db.Begin() - if err != nil { - log.Printf("indexer: live Deposited begin tx: %v", err) - return - } - if err := idx.writeDeposited(tx, client, vLog); err != nil { - tx.Rollback() - log.Printf("indexer: live writeDeposited: %v", err) - return - } - if err := tx.Commit(); err != nil { - log.Printf("indexer: live Deposited commit: %v", err) - } + topic := vLog.Topics[0] - case topicRedeemed: - tx, err := idx.db.Begin() - if err != nil { - log.Printf("indexer: live Redeemed begin tx: %v", err) - return - } - if err := idx.writeRedeemed(tx, client, vLog); err != nil { - tx.Rollback() - log.Printf("indexer: live writeRedeemed: %v", err) - return + if topic == topicAssetAdded || topic == topicAssetDeact { + if vLog.Address != idx.registryAddr { + return nil } - if err := tx.Commit(); err != nil { - log.Printf("indexer: live Redeemed commit: %v", err) + if topic == topicAssetAdded { + idx.handleAssetAdded(vLog) + } else { + idx.handleAssetDeactivated(vLog) } + return nil + } - case topicRebalanced: - tx, err := idx.db.Begin() - if err != nil { - log.Printf("indexer: live Rebalanced begin tx: %v", err) - return - } - if err := idx.writeRebalanced(tx, client, vLog); err != nil { - tx.Rollback() - log.Printf("indexer: live writeRebalanced: %v", err) - return - } - if err := tx.Commit(); err != nil { - log.Printf("indexer: live Rebalanced commit: %v", err) + if topic == topicBasketSuspend { + if !idx.isKnownBasket(vLog.Address, local) { + return nil } + idx.handleBasketSuspended(vLog) + return nil + } - case topicFeeSnapshoted: - tx, err := idx.db.Begin() - if err != nil { - log.Printf("indexer: live FeeSnapshot begin tx: %v", err) - return + if topic == topicDeposited || topic == topicRedeemed || topic == topicRebalanced { + if !idx.isKnownBasket(vLog.Address, local) { + return nil } - if err := idx.writeFeeSnapshot(tx, client, vLog); err != nil { - tx.Rollback() - log.Printf("indexer: live writeFeeSnapshot: %v", err) - return + switch topic { + case topicDeposited: + return idx.writeDeposited(tx, client, vLog) + case topicRedeemed: + return idx.writeRedeemed(tx, client, vLog) + case topicRebalanced: + return idx.writeRebalanced(tx, client, vLog) } - if err := tx.Commit(); err != nil { - log.Printf("indexer: live FeeSnapshot commit: %v", err) + } + + if topic == topicFeeSnapshoted { + if !idx.isKnownCreatorToken(vLog.Address, local) { + return nil } + return idx.writeFeeSnapshot(tx, client, vLog, local) + } - case topicAssetAdded: - idx.handleAssetAdded(vLog) + return nil +} - case topicAssetDeact: - idx.handleAssetDeactivated(vLog) +// isKnownBasket returns true if addr is a known basket proxy, checking the +// global set then the chunk-local set. +func (idx *Indexer) isKnownBasket(addr common.Address, local *chunkAddrs) bool { + idx.mu.RLock() + known := idx.basketAddrs[addr] + idx.mu.RUnlock() + if known { + return true + } + return local.baskets[addr] +} - case topicBasketSuspend: - idx.handleBasketSuspended(vLog) +// isKnownCreatorToken returns true if addr is a known creator token contract, +// checking the global map then the chunk-local map. +func (idx *Indexer) isKnownCreatorToken(addr common.Address, local *chunkAddrs) bool { + idx.mu.RLock() + _, known := idx.creatorTokenToBasket[addr] + idx.mu.RUnlock() + if known { + return true } + _, localKnown := local.creatorTokens[addr] + return localKnown } -func (idx *Indexer) handleBasketCreated(client *ethclient.Client, vLog types.Log) { +// writeBasketCreated writes the basket and constituent rows within the provided +// transaction and registers the new addresses in the chunk-local set immediately +// so subsequent logs in the same chunk that reference this basket are correctly +// handled before the global sets are updated after commit. +func (idx *Indexer) writeBasketCreated(tx *sql.Tx, client *ethclient.Client, vLog types.Log, local *chunkAddrs) error { if len(vLog.Topics) < 4 { - log.Printf("indexer: handleBasketCreated: expected 4 topics, got %d — tx=%s", len(vLog.Topics), vLog.TxHash.Hex()) - return + log.Printf("indexer: writeBasketCreated: expected 4 topics, got %d — tx=%s", len(vLog.Topics), vLog.TxHash.Hex()) + return nil } - basketCommon := common.HexToAddress(vLog.Topics[1].Hex()) + basketCommon := common.HexToAddress(vLog.Topics[1].Hex()) creatorTokenCommon := common.HexToAddress(vLog.Topics[2].Hex()) basket := strings.ToLower(basketCommon.Hex()) creatorToken := strings.ToLower(creatorTokenCommon.Hex()) @@ -920,8 +1034,8 @@ func (idx *Indexer) handleBasketCreated(client *ethclient.Client, vLog types.Log decoded, err := basketCreatedABI.Unpack(vLog.Data) if err != nil || len(decoded) < 6 { - log.Printf("indexer: handleBasketCreated decode error tx=%s: %v", vLog.TxHash.Hex(), err) - return + log.Printf("indexer: writeBasketCreated decode error tx=%s: %v", vLog.TxHash.Hex(), err) + return nil } name, _ := decoded[0].(string) @@ -944,7 +1058,7 @@ func (idx *Indexer) handleBasketCreated(client *ethclient.Client, vLog types.Log `SELECT symbol FROM supported_assets WHERE address = ?`, cAddr, ).Scan(&cSymbol) if err != nil && !errors.Is(err, sql.ErrNoRows) { - log.Printf("indexer: handleBasketCreated symbol lookup %s: %v", cAddr, err) + log.Printf("indexer: writeBasketCreated symbol lookup %s: %v", cAddr, err) } weight := int64(0) if i < len(targetWeights) && targetWeights[i] != nil { @@ -955,12 +1069,6 @@ func (idx *Indexer) handleBasketCreated(client *ethclient.Client, vLog types.Log createdAt := idx.blockTimestamp(client, vLog.BlockNumber) - tx, err := idx.db.Begin() - if err != nil { - log.Printf("indexer: handleBasketCreated begin tx: %v", err) - return - } - _, err = tx.Exec(` INSERT INTO baskets (address, creator_token_address, creator_address, name, symbol, thesis, @@ -975,9 +1083,7 @@ func (idx *Indexer) handleBasketCreated(client *ethclient.Client, vLog types.Log boolToInt(rebalancing), createdAt, vLog.TxHash.Hex(), ) if err != nil { - tx.Rollback() - log.Printf("indexer: handleBasketCreated insert basket: %v", err) - return + return fmt.Errorf("insert basket %s: %w", basket, err) } for i, row := range cRows { @@ -989,23 +1095,15 @@ func (idx *Indexer) handleBasketCreated(client *ethclient.Client, vLog types.Log basket, row.addr, row.symbol, row.weight, i, ) if err != nil { - tx.Rollback() - log.Printf("indexer: handleBasketCreated insert constituent %s: %v", row.addr, err) - return + return fmt.Errorf("insert constituent %s for basket %s: %w", row.addr, basket, err) } } - if err := tx.Commit(); err != nil { - log.Printf("indexer: handleBasketCreated commit: %v", err) - return - } + local.baskets[basketCommon] = true + local.creatorTokens[creatorTokenCommon] = basketCommon - idx.mu.Lock() - idx.basketAddrs[basketCommon] = true - idx.creatorTokenToBasket[creatorTokenCommon] = basketCommon - idx.mu.Unlock() - - log.Printf("indexer: basket %s indexed with %d constituents (creatorToken=%s)", basket, len(cRows), creatorToken) + log.Printf("indexer: basket %s written with %d constituents (creatorToken=%s)", basket, len(cRows), creatorToken) + return nil } func (idx *Indexer) handleAssetAdded(vLog types.Log) { @@ -1121,40 +1219,30 @@ func (idx *Indexer) writeRebalanced(tx *sql.Tx, client *ethclient.Client, vLog t return err } -// writeFeeSnapshot handles RevenueSnapshoted events emitted by CreatorToken contracts. -func (idx *Indexer) writeFeeSnapshot(tx *sql.Tx, client *ethclient.Client, vLog types.Log) error { +// writeFeeSnapshot handles RevenueSnapshoted events emitted by CreatorToken +// contracts. vLog.Address is the CreatorToken contract address. +func (idx *Indexer) writeFeeSnapshot(tx *sql.Tx, client *ethclient.Client, vLog types.Log, local *chunkAddrs) error { if len(vLog.Topics) < 2 { return nil } - creatorTokenAddr := strings.ToLower(vLog.Address.Hex()) creatorTokenCommon := vLog.Address idx.mu.RLock() basketCommon, found := idx.creatorTokenToBasket[creatorTokenCommon] idx.mu.RUnlock() - var basketAddr string - if found { - basketAddr = strings.ToLower(basketCommon.Hex()) - } else { - err := idx.db.QueryRow( - `SELECT address FROM baskets WHERE creator_token_address = ?`, - creatorTokenAddr, - ).Scan(&basketAddr) - if errors.Is(err, sql.ErrNoRows) { - log.Printf("indexer: writeFeeSnapshot: no basket found for creatorToken %s — skipping", creatorTokenAddr) - return nil - } - if err != nil { - return fmt.Errorf("writeFeeSnapshot basket lookup for creatorToken %s: %w", creatorTokenAddr, err) - } - // Populate the in-memory map so subsequent events don't hit the DB. - idx.mu.Lock() - idx.creatorTokenToBasket[creatorTokenCommon] = common.HexToAddress(basketAddr) - idx.mu.Unlock() + if !found { + basketCommon, found = local.creatorTokens[creatorTokenCommon] + } + + if !found { + log.Printf("indexer: writeFeeSnapshot: no basket for creatorToken %s — skipping", + strings.ToLower(creatorTokenCommon.Hex())) + return nil } + basketAddr := strings.ToLower(basketCommon.Hex()) snapshotID := new(big.Int).SetBytes(vLog.Topics[1].Bytes()).Int64() decoded, err := feeSnapshotABI.Unpack(vLog.Data) @@ -1195,15 +1283,15 @@ func (idx *Indexer) handleBasketSuspended(vLog types.Log) { } } -// blockTimestamp returns the Unix timestamp for a block number from a bounded -// FIFO cache, fetching via RPC on a miss. +// blockTimestamp returns the Unix timestamp for a block from a bounded FIFO +// cache, fetching via RPC on a miss. func (idx *Indexer) blockTimestamp(client *ethclient.Client, blockNumber uint64) int64 { idx.blockTsMu.Lock() - if ts, ok := idx.blockTs[blockNumber]; ok { - idx.blockTsMu.Unlock() + ts, ok := idx.blockTs[blockNumber] + idx.blockTsMu.Unlock() + if ok { return int64(ts) } - idx.blockTsMu.Unlock() if client == nil { log.Printf("indexer: blockTimestamp(%d): nil client — storing 0", blockNumber) @@ -1219,7 +1307,7 @@ func (idx *Indexer) blockTimestamp(client *ethclient.Client, blockNumber uint64) return 0 } - ts := header.Time + fetched := header.Time idx.blockTsMu.Lock() defer idx.blockTsMu.Unlock() @@ -1229,70 +1317,10 @@ func (idx *Indexer) blockTimestamp(client *ethclient.Client, blockNumber uint64) idx.blockTsFIFO = idx.blockTsFIFO[1:] delete(idx.blockTs, oldest.blockNumber) } + idx.blockTs[blockNumber] = fetched + idx.blockTsFIFO = append(idx.blockTsFIFO, blockTsEntry{blockNumber: blockNumber, timestamp: fetched}) - idx.blockTs[blockNumber] = ts - idx.blockTsFIFO = append(idx.blockTsFIFO, blockTsEntry{blockNumber: blockNumber, timestamp: ts}) - - return int64(ts) -} - -// subscribe opens a WebSocket connection and processes live events until -// ctx is cancelled or the connection drops. -func (idx *Indexer) subscribe() error { - dialCtx, dialCancel := context.WithTimeout(idx.ctx, rpcTimeout) - defer dialCancel() - - client, err := ethclient.DialContext(dialCtx, idx.wsURL) - if err != nil { - return fmt.Errorf("dial: %w", err) - } - defer client.Close() - - query := ethereum.FilterQuery{Addresses: idx.filterAddresses()} - - logs := make(chan types.Log, 512) - sub, err := client.SubscribeFilterLogs(idx.ctx, query, logs) - if err != nil { - return fmt.Errorf("subscribe: %w", err) - } - defer sub.Unsubscribe() - - // Drain the log channel in a separate goroutine. - done := make(chan struct{}) - go func() { - defer close(done) - for vLog := range logs { - idx.handleLog(client, vLog) - } - }() - - log.Println("indexer: live subscription active") - - select { - case <-idx.ctx.Done(): - return nil - case err := <-sub.Err(): - return fmt.Errorf("subscription: %w", err) - } -} - -// filterAddresses returns all addresses to include in FilterQuery: -// registry, factory, all known basket proxies, and all known creator token contracts. -func (idx *Indexer) filterAddresses() []common.Address { - idx.mu.RLock() - defer idx.mu.RUnlock() - - addresses := make([]common.Address, 0, len(idx.basketAddrs)+len(idx.creatorTokenToBasket)+2) - addresses = append(addresses, idx.registryAddr) - addresses = append(addresses, idx.factoryAddr) - for addr := range idx.basketAddrs { - addresses = append(addresses, addr) - } - // Include all known creator token addresses so RevenueSnapshoted events are received. - for creatorToken := range idx.creatorTokenToBasket { - addresses = append(addresses, creatorToken) - } - return addresses + return int64(fetched) } func (idx *Indexer) newHTTPClient() (*ethclient.Client, error) { @@ -1312,7 +1340,6 @@ func boolToInt(b bool) int { return 0 } -// minDuration returns the smaller of two durations. func minDuration(a, b time.Duration) time.Duration { if a < b { return a @@ -1320,7 +1347,6 @@ func minDuration(a, b time.Duration) time.Duration { return b } -// getEnv returns the environment variable value or a fallback. func getEnv(key, fallback string) string { if v := strings.TrimSpace(os.Getenv(key)); v != "" { return v diff --git a/server/indexer/indexer_db_test.go b/server/indexer/indexer_db_test.go index f4f7740..6402476 100644 --- a/server/indexer/indexer_db_test.go +++ b/server/indexer/indexer_db_test.go @@ -14,8 +14,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -// testSchema is the minimal schema required by the indexer tests. -// It mirrors schema.sql SQL scripts. const testSchema = ` CREATE TABLE IF NOT EXISTS baskets ( address TEXT PRIMARY KEY, @@ -133,8 +131,6 @@ CREATE TABLE IF NOT EXISTS creator_claimable_cache ( ); ` -// newTestDB opens a fresh in-memory SQLite database with the full production -// schema applied. Each test gets its own database; there is no shared state. func newTestDB(t *testing.T) *db.DB { t.Helper() dir := t.TempDir() @@ -146,7 +142,6 @@ func newTestDB(t *testing.T) *db.DB { return d } -// newTestIndexer constructs an Indexer wired to the given database. func newTestIndexer(t *testing.T, d *db.DB) *Indexer { t.Helper() os.Setenv("BASKET_FACTORY_ADDRESS", "0xE9854c4734cd4A9dbC5086398A11df3c11f40b21") @@ -166,7 +161,6 @@ func newTestIndexer(t *testing.T, d *db.DB) *Indexer { return idx } -// insertAsset is a test helper that seeds a supported_assets row directly. func insertAsset(t *testing.T, d *db.DB, address, symbol string) { t.Helper() _, err := d.Exec(` @@ -181,9 +175,6 @@ func insertAsset(t *testing.T, d *db.DB, address, symbol string) { // writeChunkAtomic tests -// TestWriteChunkAtomic_DepositWrittenAndCursorAdvanced verifies that a -// Deposited event in a chunk is written to the deposits table and the cursor -// is advanced to endBlock in the same transaction. func TestWriteChunkAtomic_DepositWrittenAndCursorAdvanced(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) @@ -191,8 +182,6 @@ func TestWriteChunkAtomic_DepositWrittenAndCursorAdvanced(t *testing.T) { if _, err := d.Exec(`INSERT INTO sync_cursors (key, block_num) VALUES ('events', 68391146)`); err != nil { t.Fatalf("seed cursor: %v", err) } - - // Seed basket so the deposit foreign-key-style lookup succeeds. if _, err := d.Exec(` INSERT INTO baskets (address, creator_token_address, creator_address, name, symbol, thesis, rebalancing_enabled, created_at, created_tx, suspended) @@ -200,17 +189,19 @@ func TestWriteChunkAtomic_DepositWrittenAndCursorAdvanced(t *testing.T) { t.Fatalf("seed basket: %v", err) } - investor := common.HexToAddress("0x4e4b989abe79381c1b8a4871d6af481b175f4865") - usdg := big.NewInt(10_000_000) + // Pre-populate the in-memory set so the deposit passes the address check. + idx.mu.Lock() + idx.basketAddrs[common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6")] = true + idx.mu.Unlock() + + investor := common.HexToAddress("0x4e4b989abe79381c1b8a4871d6af481b175f4865") + usdg := big.NewInt(10_000_000) tokens, _ := new(big.Int).SetString("9950000000000000000", 10) - fee := big.NewInt(50_000) - data, _ := depositedABI.Pack(usdg, tokens, fee) + fee := big.NewInt(50_000) + data, _ := depositedABI.Pack(usdg, tokens, fee) vLog := types.Log{ - Topics: []common.Hash{ - topicDeposited, - common.BytesToHash(investor.Bytes()), - }, + Topics: []common.Hash{topicDeposited, common.BytesToHash(investor.Bytes())}, Data: data, BlockNumber: 68391200, TxHash: common.HexToHash("0xdeadbeef01"), @@ -218,14 +209,10 @@ func TestWriteChunkAtomic_DepositWrittenAndCursorAdvanced(t *testing.T) { Address: common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6"), } - // writeChunkAtomic with a nil client — blockTimestamp will fail the RPC - // and store 0, which is acceptable for this test since we are verifying - // write correctness, not timestamp resolution. if err := idx.writeChunkAtomic(nil, []types.Log{vLog}, 68391300); err != nil { t.Fatalf("writeChunkAtomic: %v", err) } - // Verify deposit row exists. var count int if err := d.QueryRow( `SELECT COUNT(*) FROM deposits WHERE basket_address = ?`, @@ -237,7 +224,6 @@ func TestWriteChunkAtomic_DepositWrittenAndCursorAdvanced(t *testing.T) { t.Errorf("expected 1 deposit row, got %d", count) } - // Verify cursor was advanced. var cursor int64 if err := d.QueryRow(`SELECT block_num FROM sync_cursors WHERE key = 'events'`).Scan(&cursor); err != nil { t.Fatalf("read cursor: %v", err) @@ -247,9 +233,6 @@ func TestWriteChunkAtomic_DepositWrittenAndCursorAdvanced(t *testing.T) { } } -// TestWriteChunkAtomic_IdempotentOnReplay verifies that replaying the same -// chunk twice (simulating a crash-restart) produces exactly one deposit row, -// not two. func TestWriteChunkAtomic_IdempotentOnReplay(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) @@ -265,6 +248,10 @@ func TestWriteChunkAtomic_IdempotentOnReplay(t *testing.T) { t.Fatalf("seed basket: %v", err) } + idx.mu.Lock() + idx.basketAddrs[common.HexToAddress(idempotBasket)] = true + idx.mu.Unlock() + investor := common.HexToAddress("0x4e4b989abe79381c1b8a4871d6af481b175f4865") data, _ := depositedABI.Pack(big.NewInt(1_000_000), big.NewInt(990_000_000_000_000_000), big.NewInt(10_000)) @@ -279,21 +266,16 @@ func TestWriteChunkAtomic_IdempotentOnReplay(t *testing.T) { logs := []types.Log{vLog} - // First write. if err := idx.writeChunkAtomic(nil, logs, 100); err != nil { t.Fatalf("first writeChunkAtomic: %v", err) } - - // Replay the same chunk — simulates crash-restart where cursor was not yet - // advanced or was re-read from a previous checkpoint. if err := idx.writeChunkAtomic(nil, logs, 100); err != nil { t.Fatalf("second writeChunkAtomic (replay): %v", err) } var count int if err := d.QueryRow( - `SELECT COUNT(*) FROM deposits WHERE basket_address = ?`, - idempotBasket, + `SELECT COUNT(*) FROM deposits WHERE basket_address = ?`, idempotBasket, ).Scan(&count); err != nil { t.Fatalf("count: %v", err) } @@ -302,8 +284,6 @@ func TestWriteChunkAtomic_IdempotentOnReplay(t *testing.T) { } } -// TestWriteChunkAtomic_CursorNotAdvancedOnWriteError verifies that if a log -// write fails, the cursor stays at its original value. func TestWriteChunkAtomic_EmptyChunkAdvancesCursor(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) @@ -325,8 +305,6 @@ func TestWriteChunkAtomic_EmptyChunkAdvancesCursor(t *testing.T) { } } -// TestWriteChunkAtomic_MultipleEventTypes verifies that a chunk containing -// a deposit, a redemption, and a rebalance all write correctly in one transaction. func TestWriteChunkAtomic_MultipleEventTypes(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) @@ -342,7 +320,11 @@ func TestWriteChunkAtomic_MultipleEventTypes(t *testing.T) { t.Fatalf("seed basket: %v", err) } - investor := common.HexToAddress("0x4e4b989abe79381c1b8a4871d6af481b175f4865") + idx.mu.Lock() + idx.basketAddrs[common.HexToAddress(basketAddr)] = true + idx.mu.Unlock() + + investor := common.HexToAddress("0x4e4b989abe79381c1b8a4871d6af481b175f4865") basketCommon := common.HexToAddress(basketAddr) depositData, _ := depositedABI.Pack(big.NewInt(1_000_000), big.NewInt(990_000_000_000_000_000), big.NewInt(10_000)) @@ -380,9 +362,9 @@ func TestWriteChunkAtomic_MultipleEventTypes(t *testing.T) { } var deposits, redemptions, rebalances int - d.QueryRow(`SELECT COUNT(*) FROM deposits WHERE basket_address = ?`, "0x474835c4da0393bc87d4e85e36fdce3f56edeaa6").Scan(&deposits) - d.QueryRow(`SELECT COUNT(*) FROM redemptions WHERE basket_address = ?`, "0x474835c4da0393bc87d4e85e36fdce3f56edeaa6").Scan(&redemptions) - d.QueryRow(`SELECT COUNT(*) FROM rebalances WHERE basket_address = ?`, "0x474835c4da0393bc87d4e85e36fdce3f56edeaa6").Scan(&rebalances) + d.QueryRow(`SELECT COUNT(*) FROM deposits WHERE basket_address = ?`, basketAddr).Scan(&deposits) + d.QueryRow(`SELECT COUNT(*) FROM redemptions WHERE basket_address = ?`, basketAddr).Scan(&redemptions) + d.QueryRow(`SELECT COUNT(*) FROM rebalances WHERE basket_address = ?`, basketAddr).Scan(&rebalances) if deposits != 1 { t.Errorf("expected 1 deposit, got %d", deposits) @@ -395,15 +377,104 @@ func TestWriteChunkAtomic_MultipleEventTypes(t *testing.T) { } } -// handleBasketCreated database tests +// TestWriteChunkAtomic_BasketCreatedBeforeDeposited verifies the pre-pass +// correctly handles the factory emitting Deposited before BasketCreated +// in the same transaction, so the deposit is not dropped. +func TestWriteChunkAtomic_BasketCreatedBeforeDeposited(t *testing.T) { + d := newTestDB(t) + idx := newTestIndexer(t, d) + + if _, err := d.Exec(`INSERT INTO sync_cursors (key, block_num) VALUES ('events', 0)`); err != nil { + t.Fatalf("seed cursor: %v", err) + } + + insertAsset(t, d, "0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e", "TSLA") + insertAsset(t, d, "0x5884ad2f920c162cfbbacc88c9c51aa75ec09e02", "AMZN") + insertAsset(t, d, "0x71178bac73cbeb415514eb542a8995b82669778d", "AMD") + + basket := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") + creatorToken := common.HexToAddress("0x29ba5c3470b3a6c06bd6cce2e43c019d846c01c0") + creator := common.HexToAddress("0x4e4b989abe79381c1b8a4871d6af481b175f4865") + constituents := []common.Address{ + common.HexToAddress("0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e"), + common.HexToAddress("0x5884ad2f920c162cfbbacc88c9c51aa75ec09e02"), + common.HexToAddress("0x71178bac73cbeb415514eb542a8995b82669778d"), + } + weights := []*big.Int{big.NewInt(5000), big.NewInt(3000), big.NewInt(2000)} + + basketCreatedData, _ := basketCreatedABI.Pack("AI Infra", "AIIB", "thesis", constituents, weights, false) + basketCreatedLog := types.Log{ + Topics: []common.Hash{ + topicBasketCreated, + common.BytesToHash(basket.Bytes()), + common.BytesToHash(creatorToken.Bytes()), + common.BytesToHash(creator.Bytes()), + }, + Data: basketCreatedData, + BlockNumber: 500, + TxHash: common.HexToHash("0xcreatetx01"), + Index: 2, // emitted last in the transaction + Address: common.HexToAddress("0xE9854c4734cd4A9dbC5086398A11df3c11f40b21"), + } + + // Deposited is emitted before BasketCreated by the factory contract. + usdg := big.NewInt(10_000_000) + tokens, _ := new(big.Int).SetString("9950000000000000000", 10) + fee := big.NewInt(50_000) + depositData, _ := depositedABI.Pack(usdg, tokens, fee) + depositedLog := types.Log{ + Topics: []common.Hash{topicDeposited, common.BytesToHash(creator.Bytes())}, + Data: depositData, + BlockNumber: 500, + TxHash: common.HexToHash("0xcreatetx01"), + Index: 1, // emitted before BasketCreated + Address: basket, + } + + // Logs arrive in emission order: Deposited first, BasketCreated second. + logs := []types.Log{depositedLog, basketCreatedLog} + + if err := idx.writeChunkAtomic(nil, logs, 500); err != nil { + t.Fatalf("writeChunkAtomic: %v", err) + } -// TestHandleBasketCreated_WritesBasketAndConstituents verifies that -// handleBasketCreated writes the basket row and all constituent rows atomically. -func TestHandleBasketCreated_WritesBasketAndConstituents(t *testing.T) { + // Basket row must exist. + var name string + if err := d.QueryRow(`SELECT name FROM baskets WHERE address = ?`, + strings.ToLower(basket.Hex())).Scan(&name); err != nil { + t.Fatalf("basket not written: %v", err) + } + if name != "AI Infra" { + t.Errorf("basket name: expected AI Infra, got %q", name) + } + + // Deposit must have been written despite arriving before BasketCreated. + var depositCount int + d.QueryRow(`SELECT COUNT(*) FROM deposits WHERE basket_address = ?`, + strings.ToLower(basket.Hex())).Scan(&depositCount) + if depositCount != 1 { + t.Errorf("expected 1 deposit row, got %d — pre-pass failed to make basket known before deposit", depositCount) + } + + // Address sets must be updated after commit. + idx.mu.RLock() + basketKnown := idx.basketAddrs[basket] + _, ctKnown := idx.creatorTokenToBasket[creatorToken] + idx.mu.RUnlock() + if !basketKnown { + t.Error("basket not in basketAddrs after commit") + } + if !ctKnown { + t.Error("creator token not in creatorTokenToBasket after commit") + } +} + +// writeBasketCreated tests + +func TestWriteBasketCreated_WritesBasketAndConstituents(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) - // Seed the constituent asset so symbol lookup succeeds. insertAsset(t, d, "0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e", "TSLA") insertAsset(t, d, "0x5884ad2f920c162cfbbacc88c9c51aa75ec09e02", "AMZN") @@ -430,13 +501,22 @@ func TestHandleBasketCreated_WritesBasketAndConstituents(t *testing.T) { Address: common.HexToAddress("0xE9854c4734cd4A9dbC5086398A11df3c11f40b21"), } - // nil client — blockTimestamp will return 0, which is acceptable here. - idx.handleBasketCreated(nil, vLog) + tx, err := d.Begin() + if err != nil { + t.Fatalf("begin: %v", err) + } + local := newChunkAddrs() + if err := idx.writeBasketCreated(tx, nil, vLog, local); err != nil { + tx.Rollback() + t.Fatalf("writeBasketCreated: %v", err) + } + if err := tx.Commit(); err != nil { + t.Fatalf("commit: %v", err) + } - // Basket row must exist. var name, symbol string var suspended int - err := d.QueryRow( + err = d.QueryRow( `SELECT name, symbol, suspended FROM baskets WHERE address = ?`, strings.ToLower(basket.Hex()), ).Scan(&name, &symbol, &suspended) @@ -453,7 +533,6 @@ func TestHandleBasketCreated_WritesBasketAndConstituents(t *testing.T) { t.Errorf("suspended: expected 0, got %d", suspended) } - // Both constituent rows must exist with correct weights. var constituentCount int d.QueryRow( `SELECT COUNT(*) FROM basket_constituents WHERE basket_address = ?`, @@ -473,26 +552,24 @@ func TestHandleBasketCreated_WritesBasketAndConstituents(t *testing.T) { t.Errorf("TSLA weight: expected 7000, got %d", tslaWeight) } - // basketAddrs map must be updated. - idx.mu.RLock() - _, present := idx.basketAddrs[basket] - idx.mu.RUnlock() - if !present { - t.Error("basket not added to basketAddrs map after handleBasketCreated") + // local set must be populated immediately after writeBasketCreated. + if !local.baskets[basket] { + t.Error("basket not in chunk-local set after writeBasketCreated") + } + if _, ok := local.creatorTokens[creatorToken]; !ok { + t.Error("creator token not in chunk-local set after writeBasketCreated") } } -// TestHandleBasketCreated_Idempotent verifies that calling handleBasketCreated -// twice for the same basket does not duplicate rows. -func TestHandleBasketCreated_Idempotent(t *testing.T) { +func TestWriteBasketCreated_Idempotent(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) insertAsset(t, d, "0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e", "TSLA") - basket := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") + basket := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") creatorToken := common.HexToAddress("0x29ba5c3470b3a6c06bd6cce2e43c019d846c01c0") - creator := common.HexToAddress("0x4e4b989abe79381c1b8a4871d6af481b175f4865") + creator := common.HexToAddress("0x4e4b989abe79381c1b8a4871d6af481b175f4865") constituents := []common.Address{common.HexToAddress("0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e")} weights := []*big.Int{big.NewInt(10000)} @@ -504,13 +581,24 @@ func TestHandleBasketCreated_Idempotent(t *testing.T) { common.BytesToHash(creatorToken.Bytes()), common.BytesToHash(creator.Bytes()), }, - Data: data, - BlockNumber: 100, - TxHash: common.HexToHash("0xidem01"), + Data: data, + TxHash: common.HexToHash("0xidem01"), + Address: common.HexToAddress("0xE9854c4734cd4A9dbC5086398A11df3c11f40b21"), } - idx.handleBasketCreated(nil, vLog) - idx.handleBasketCreated(nil, vLog) // second call — must not duplicate + for i := 0; i < 2; i++ { + tx, err := d.Begin() + if err != nil { + t.Fatalf("begin: %v", err) + } + if err := idx.writeBasketCreated(tx, nil, vLog, newChunkAddrs()); err != nil { + tx.Rollback() + t.Fatalf("writeBasketCreated call %d: %v", i+1, err) + } + if err := tx.Commit(); err != nil { + t.Fatalf("commit call %d: %v", i+1, err) + } + } var basketCount, constituentCount int d.QueryRow(`SELECT COUNT(*) FROM baskets WHERE address = ?`, strings.ToLower(basket.Hex())).Scan(&basketCount) @@ -524,19 +612,25 @@ func TestHandleBasketCreated_Idempotent(t *testing.T) { } } -// TestHandleBasketCreated_InsufficientTopics verifies that a log with fewer -// than 4 topics is rejected without panicking or writing anything. -func TestHandleBasketCreated_InsufficientTopics(t *testing.T) { +func TestWriteBasketCreated_InsufficientTopics(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) vLog := types.Log{ - Topics: []common.Hash{topicBasketCreated, {}, {}}, // only 3 - Data: []byte{}, + Topics: []common.Hash{topicBasketCreated, {}, {}}, + Data: []byte{}, + Address: common.HexToAddress("0xE9854c4734cd4A9dbC5086398A11df3c11f40b21"), } - // Must not panic. - idx.handleBasketCreated(nil, vLog) + tx, err := d.Begin() + if err != nil { + t.Fatalf("begin: %v", err) + } + if err := idx.writeBasketCreated(tx, nil, vLog, newChunkAddrs()); err != nil { + tx.Rollback() + t.Fatalf("writeBasketCreated returned error on insufficient topics: %v", err) + } + tx.Commit() var count int d.QueryRow(`SELECT COUNT(*) FROM baskets`).Scan(&count) @@ -564,12 +658,12 @@ func TestHandleAssetAdded_WritesAssetRow(t *testing.T) { idx.handleAssetAdded(vLog) - var symbol, name, sector string + var symbol string var isActive int err := d.QueryRow( - `SELECT symbol, name, sector, is_active FROM supported_assets WHERE address = ?`, + `SELECT symbol, is_active FROM supported_assets WHERE address = ?`, strings.ToLower(token.Hex()), - ).Scan(&symbol, &name, §or, &isActive) + ).Scan(&symbol, &isActive) if err != nil { t.Fatalf("asset not written: %v", err) } @@ -637,14 +731,11 @@ func TestHandleBasketSuspended_SetsSuspendedFlag(t *testing.T) { // syncBaskets suspended-state propagation test -// TestSyncBaskets_SuspendedBasketWrittenCorrectly verifies that a basket whose -// active=false field in getAllBaskets is written with suspended=1. func TestSyncBaskets_SuspendedStateUpdate(t *testing.T) { d := newTestDB(t) basketAddr := "0x474835c4da0393bc87d4e85e36fdce3f56edeaa6" - // Insert basket as active (suspended=0), simulating a first syncBaskets run. _, err := d.Exec(` INSERT INTO baskets (address, creator_token_address, creator_address, name, symbol, thesis, rebalancing_enabled, created_at, created_tx, suspended) @@ -656,7 +747,6 @@ func TestSyncBaskets_SuspendedStateUpdate(t *testing.T) { t.Fatalf("initial insert: %v", err) } - // Simulate a subsequent syncBaskets run where the chain reports active=false. _, err = d.Exec(` INSERT INTO baskets (address, creator_token_address, creator_address, name, symbol, thesis, rebalancing_enabled, created_at, created_tx, suspended) @@ -710,9 +800,6 @@ func TestRecordSeedFailure_UpdatesLastAttempt(t *testing.T) { } } -// TestSeedMissingConstituents_SkipsBasketExceedingMaxAttempts verifies that a -// basket with attempts >= seedMaxAttempts is excluded from the seeding query -// and seedOneBasket is never called for it. func TestSeedMissingConstituents_SkipsBasketAtMaxAttempts(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) @@ -724,33 +811,23 @@ func TestSeedMissingConstituents_SkipsBasketAtMaxAttempts(t *testing.T) { VALUES (?, '0x0', '0x0', '', '', '', 0, 0, '', 0)`, basketAddr); err != nil { t.Fatalf("seed basket: %v", err) } - - // Record exactly seedMaxAttempts failures — this basket should be skipped. if _, err := d.Exec(` INSERT INTO basket_seed_failures (basket_address, attempts, last_attempt) VALUES (?, ?, ?)`, basketAddr, seedMaxAttempts, time.Now().Unix()); err != nil { t.Fatalf("seed failures: %v", err) } - // seedMissingConstituents with no RPC client, if it tries to seed the - // basket it will call newHTTPClient which will fail to dial and log an error - // but not panic. The basket should be skipped entirely (zero constituent rows). idx.seedMissingConstituents() var constituentCount int d.QueryRow( `SELECT COUNT(*) FROM basket_constituents WHERE basket_address = ?`, basketAddr, ).Scan(&constituentCount) - - // Constituent count must still be 0 — the basket was skipped, not attempted. if constituentCount != 0 { t.Errorf("expected 0 constituents (basket skipped), got %d", constituentCount) } } -// TestSeedMissingConstituents_NoOp_WhenAllSeeded verifies that -// seedMissingConstituents returns immediately with no work when all baskets -// already have constituents. func TestSeedMissingConstituents_NoOp_WhenAllSeeded(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) @@ -771,204 +848,76 @@ func TestSeedMissingConstituents_NoOp_WhenAllSeeded(t *testing.T) { idx.seedMissingConstituents() } -// filterAddresses tests +// isKnownBasket and isKnownCreatorToken tests -func TestFilterAddresses_AlwaysContainsRegistryAndFactory(t *testing.T) { +func TestIsKnownBasket_GlobalSet(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) - addrs := idx.filterAddresses() - if len(addrs) < 2 { - t.Fatalf("expected at least registry + factory, got %d addresses", len(addrs)) - } - - registry := strings.ToLower("0x19Ab3408af6503a7D4BeC255b064f8B02A345D04") - factory := strings.ToLower("0xE9854c4734cd4A9dbC5086398A11df3c11f40b21") - - found := make(map[string]bool) - for _, a := range addrs { - found[strings.ToLower(a.Hex())] = true - } - - if !found[registry] { - t.Errorf("registry address %s not in filterAddresses result", registry) - } - if !found[factory] { - t.Errorf("factory address %s not in filterAddresses result", factory) - } -} - -func TestFilterAddresses_IncludesNewlyIndexedBasket(t *testing.T) { - d := newTestDB(t) - idx := newTestIndexer(t, d) - - newBasket := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") - + addr := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") idx.mu.Lock() - idx.basketAddrs[newBasket] = true + idx.basketAddrs[addr] = true idx.mu.Unlock() - addrs := idx.filterAddresses() - found := false - for _, a := range addrs { - if strings.ToLower(a.Hex()) == strings.ToLower(newBasket.Hex()) { - found = true - break - } - } - if !found { - t.Error("newly indexed basket not included in filterAddresses result") - } -} - -// Migrate idempotency tests - -func TestMigrate_Idempotent(t *testing.T) { - d := newTestDB(t) - - // Run once — must succeed. - if err := d.Migrate(); err != nil { - t.Fatalf("first Migrate: %v", err) - } - - // Run again — must not error even though columns already exist. - if err := d.Migrate(); err != nil { - t.Fatalf("second Migrate (idempotent): %v", err) - } -} - -func TestMigrate_LogIndexColumnExists(t *testing.T) { - d := newTestDB(t) - - if err := d.Migrate(); err != nil { - t.Fatalf("Migrate: %v", err) - } - - // INSERT with an explicit log_index — proves the column exists after migration. - _, err := d.Exec(` - INSERT INTO deposits - (basket_address, investor_address, usdg_amount, basket_tokens_minted, fee_usdg, timestamp, tx_hash, log_index) - VALUES ('0xbasket', '0xinvestor', '1000', '990', '10', 1700000000, '0xtx01', 3)`) - if err != nil { - t.Errorf("log_index column missing after Migrate: %v", err) + if !idx.isKnownBasket(addr, newChunkAddrs()) { + t.Error("expected isKnownBasket to return true for address in global set") } } -func TestMigrate_DeduplicationConstraintEnforced(t *testing.T) { +func TestIsKnownBasket_LocalSet(t *testing.T) { d := newTestDB(t) + idx := newTestIndexer(t, d) - if err := d.Migrate(); err != nil { - t.Fatalf("Migrate: %v", err) - } - - // Insert a deposit. - _, err := d.Exec(` - INSERT INTO deposits - (basket_address, investor_address, usdg_amount, basket_tokens_minted, fee_usdg, timestamp, tx_hash, log_index) - VALUES ('0xbasket', '0xinvestor', '1000', '990', '10', 1700000000, '0xdup', 0)`) - if err != nil { - t.Fatalf("first insert: %v", err) - } - - // Insert the same (tx_hash, log_index) with ON CONFLICT DO NOTHING. - _, err = d.Exec(` - INSERT INTO deposits - (basket_address, investor_address, usdg_amount, basket_tokens_minted, fee_usdg, timestamp, tx_hash, log_index) - VALUES ('0xbasket', '0xinvestor', '1000', '990', '10', 1700000000, '0xdup', 0) - ON CONFLICT(tx_hash, log_index) DO NOTHING`) - if err != nil { - t.Fatalf("duplicate insert with ON CONFLICT DO NOTHING: %v", err) - } - - // Insert the same (tx_hash, log_index) without ON CONFLICT. - _, err = d.Exec(` - INSERT INTO deposits - (basket_address, investor_address, usdg_amount, basket_tokens_minted, fee_usdg, timestamp, tx_hash, log_index) - VALUES ('0xbasket', '0xinvestor', '1000', '990', '10', 1700000000, '0xdup', 0)`) - if err == nil { - t.Error("expected unique constraint error for duplicate (tx_hash, log_index), got nil") - } + addr := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") + local := newChunkAddrs() + local.baskets[addr] = true - var count int - d.QueryRow(`SELECT COUNT(*) FROM deposits WHERE tx_hash = '0xdup'`).Scan(&count) - if count != 1 { - t.Errorf("expected exactly 1 row after duplicate attempts, got %d", count) + if !idx.isKnownBasket(addr, local) { + t.Error("expected isKnownBasket to return true for address in chunk-local set") } } -func TestMigrate_BasketSeedFailuresTableExists(t *testing.T) { +func TestIsKnownBasket_Unknown(t *testing.T) { d := newTestDB(t) + idx := newTestIndexer(t, d) - if err := d.Migrate(); err != nil { - t.Fatalf("Migrate: %v", err) - } + addr := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") - _, err := d.Exec(` - INSERT INTO basket_seed_failures (basket_address, attempts, last_attempt) - VALUES ('0xbasket', 1, 1700000000)`) - if err != nil { - t.Errorf("basket_seed_failures table missing after Migrate: %v", err) + if idx.isKnownBasket(addr, newChunkAddrs()) { + t.Error("expected isKnownBasket to return false for unknown address") } } -// blockTimestamp cache correctness - -// TestBlockTimestamp_CacheHit_NoPanic verifies that a pre-warmed cache entry -// is returned without reaching the RPC path. -func TestBlockTimestamp_CacheHit_NoPanic(t *testing.T) { +func TestIsKnownCreatorToken_GlobalMap(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) - const blockNum = uint64(68391200) - const expected = int64(1780525194) - - idx.blockTsMu.Lock() - idx.blockTs[blockNum] = uint64(expected) - idx.blockTsFIFO = append(idx.blockTsFIFO, blockTsEntry{blockNumber: blockNum, timestamp: uint64(expected)}) - idx.blockTsMu.Unlock() + ct := common.HexToAddress("0x29ba5c3470b3a6c06bd6cce2e43c019d846c01c0") + basket := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") + idx.mu.Lock() + idx.creatorTokenToBasket[ct] = basket + idx.mu.Unlock() - got := idx.blockTimestamp(nil, blockNum) // nil client — must not panic - if got != expected { - t.Errorf("blockTimestamp cache hit: expected %d, got %d", expected, got) + if !idx.isKnownCreatorToken(ct, newChunkAddrs()) { + t.Error("expected isKnownCreatorToken to return true for address in global map") } } -// TestBlockTimestamp_CacheEviction verifies the FIFO eviction at blockTsCacheMax -// keeps the map size bounded and evicts the oldest entry. -func TestBlockTimestamp_CacheEviction_BoundsMapSize(t *testing.T) { +func TestIsKnownCreatorToken_LocalMap(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) - // Populate to exactly blockTsCacheMax. - for i := uint64(0); i < blockTsCacheMax; i++ { - idx.blockTsMu.Lock() - idx.blockTs[i] = i * 1000 - idx.blockTsFIFO = append(idx.blockTsFIFO, blockTsEntry{blockNumber: i, timestamp: i * 1000}) - idx.blockTsMu.Unlock() - } + ct := common.HexToAddress("0x29ba5c3470b3a6c06bd6cce2e43c019d846c01c0") + basket := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") + local := newChunkAddrs() + local.creatorTokens[ct] = basket - // Trigger eviction by adding one more entry through the cache logic directly. - idx.blockTsMu.Lock() - if len(idx.blockTsFIFO) >= blockTsCacheMax { - oldest := idx.blockTsFIFO[0] - idx.blockTsFIFO = idx.blockTsFIFO[1:] - delete(idx.blockTs, oldest.blockNumber) - } - idx.blockTs[blockTsCacheMax] = 9999999 - idx.blockTsFIFO = append(idx.blockTsFIFO, blockTsEntry{blockNumber: blockTsCacheMax, timestamp: 9999999}) - mapLen := len(idx.blockTs) - _, block0Present := idx.blockTs[0] - idx.blockTsMu.Unlock() - - if mapLen != blockTsCacheMax { - t.Errorf("cache size: expected %d after eviction, got %d", blockTsCacheMax, mapLen) - } - if block0Present { - t.Error("block 0 (oldest) should have been evicted") + if !idx.isKnownCreatorToken(ct, local) { + t.Error("expected isKnownCreatorToken to return true for address in chunk-local map") } } -// FeeSnapshot write test +// writeFeeSnapshot test func TestWriteFeeSnapshot_WritesCorrectSnapshotID(t *testing.T) { d := newTestDB(t) @@ -987,16 +936,18 @@ func TestWriteFeeSnapshot_WritesCorrectSnapshotID(t *testing.T) { t.Fatalf("seed basket: %v", err) } + // Pre-populate the global map so writeFeeSnapshot resolves the basket. + idx.mu.Lock() + idx.creatorTokenToBasket[creatorTokenAddr] = basketAddr + idx.mu.Unlock() + snapshotID := int64(7) usdgAmount := big.NewInt(80_000) totalSupply := new(big.Int).Mul(big.NewInt(1_000_000), new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil)) data, _ := feeSnapshotABI.Pack(usdgAmount, totalSupply) vLog := types.Log{ - Topics: []common.Hash{ - topicFeeSnapshoted, - common.BigToHash(big.NewInt(snapshotID)), - }, + Topics: []common.Hash{topicFeeSnapshoted, common.BigToHash(big.NewInt(snapshotID))}, Data: data, BlockNumber: 300, TxHash: common.HexToHash("0xsnap01"), @@ -1008,7 +959,7 @@ func TestWriteFeeSnapshot_WritesCorrectSnapshotID(t *testing.T) { if err != nil { t.Fatalf("begin: %v", err) } - if err := idx.writeFeeSnapshot(tx, nil, vLog); err != nil { + if err := idx.writeFeeSnapshot(tx, nil, vLog, newChunkAddrs()); err != nil { tx.Rollback() t.Fatalf("writeFeeSnapshot: %v", err) } @@ -1031,4 +982,55 @@ func TestWriteFeeSnapshot_WritesCorrectSnapshotID(t *testing.T) { if gotAmount != "80000" { t.Errorf("usdg_amount: expected 80000, got %s", gotAmount) } +} + +// blockTimestamp cache tests + +func TestBlockTimestamp_CacheHit_NoPanic(t *testing.T) { + d := newTestDB(t) + idx := newTestIndexer(t, d) + + const blockNum = uint64(68391200) + const expected = int64(1780525194) + + idx.blockTsMu.Lock() + idx.blockTs[blockNum] = uint64(expected) + idx.blockTsFIFO = append(idx.blockTsFIFO, blockTsEntry{blockNumber: blockNum, timestamp: uint64(expected)}) + idx.blockTsMu.Unlock() + + got := idx.blockTimestamp(nil, blockNum) + if got != expected { + t.Errorf("blockTimestamp cache hit: expected %d, got %d", expected, got) + } +} + +func TestBlockTimestamp_CacheEviction_BoundsMapSize(t *testing.T) { + d := newTestDB(t) + idx := newTestIndexer(t, d) + + for i := uint64(0); i < blockTsCacheMax; i++ { + idx.blockTsMu.Lock() + idx.blockTs[i] = i * 1000 + idx.blockTsFIFO = append(idx.blockTsFIFO, blockTsEntry{blockNumber: i, timestamp: i * 1000}) + idx.blockTsMu.Unlock() + } + + idx.blockTsMu.Lock() + if len(idx.blockTsFIFO) >= blockTsCacheMax { + oldest := idx.blockTsFIFO[0] + idx.blockTsFIFO = idx.blockTsFIFO[1:] + delete(idx.blockTs, oldest.blockNumber) + } + idx.blockTs[blockTsCacheMax] = 9999999 + idx.blockTsFIFO = append(idx.blockTsFIFO, blockTsEntry{blockNumber: blockTsCacheMax, timestamp: 9999999}) + mapLen := len(idx.blockTs) + _, block0Present := idx.blockTs[0] + idx.blockTsMu.Unlock() + + if mapLen != blockTsCacheMax { + t.Errorf("cache size: expected %d after eviction, got %d", blockTsCacheMax, mapLen) + } + if block0Present { + t.Error("block 0 (oldest) should have been evicted") + } } \ No newline at end of file diff --git a/server/indexer/indexer_test.go b/server/indexer/indexer_test.go index b4bf826..5286c32 100644 --- a/server/indexer/indexer_test.go +++ b/server/indexer/indexer_test.go @@ -93,12 +93,8 @@ func TestABINewType_InvalidType_ReturnsError(t *testing.T) { func buildAssetAddedLog(token, oracle common.Address, symbol, name, sector string) types.Log { data, _ := assetAddedABI.Pack(symbol, name, sector, oracle) - return types.Log{ - Topics: []common.Hash{ - topicAssetAdded, - common.BytesToHash(token.Bytes()), - }, + Topics: []common.Hash{topicAssetAdded, common.BytesToHash(token.Bytes())}, Data: data, BlockNumber: 67344644, TxHash: common.HexToHash("0xabc"), @@ -109,8 +105,7 @@ func buildAssetAddedLog(token, oracle common.Address, symbol, name, sector strin func TestDecodeAssetAddedLog_AllFields(t *testing.T) { token := common.HexToAddress("0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e") oracle := common.HexToAddress("0x26daf42381ced15760c5f47a5072a228370b100b") - - vLog := buildAssetAddedLog(token, oracle, "TSLA", "Tesla Inc", "Consumer Discretionary") + vLog := buildAssetAddedLog(token, oracle, "TSLA", "Tesla Inc", "Consumer Discretionary") decoded, err := assetAddedABI.Unpack(vLog.Data) if err != nil { @@ -147,7 +142,6 @@ func TestDecodeAssetAddedLog_TokenAddress(t *testing.T) { if len(vLog.Topics) < 2 { t.Fatal("expected at least 2 topics") } - got := common.HexToAddress(vLog.Topics[1].Hex()) if strings.ToLower(got.Hex()) != strings.ToLower(expected.Hex()) { t.Errorf("token address: expected %s, got %s", expected.Hex(), got.Hex()) @@ -155,10 +149,7 @@ func TestDecodeAssetAddedLog_TokenAddress(t *testing.T) { } func TestDecodeAssetAddedLog_InsufficientTopics(t *testing.T) { - vLog := types.Log{ - Topics: []common.Hash{topicAssetAdded}, - Data: []byte{}, - } + vLog := types.Log{Topics: []common.Hash{topicAssetAdded}, Data: []byte{}} if len(vLog.Topics) >= 2 { t.Error("test setup error: expected fewer than 2 topics") } @@ -167,20 +158,15 @@ func TestDecodeAssetAddedLog_InsufficientTopics(t *testing.T) { func TestDecodeAssetAddedLog_MalformedData(t *testing.T) { token := common.HexToAddress("0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e") vLog := types.Log{ - Topics: []common.Hash{ - topicAssetAdded, - common.BytesToHash(token.Bytes()), - }, + Topics: []common.Hash{topicAssetAdded, common.BytesToHash(token.Bytes())}, Data: []byte("not valid abi encoded data at all"), BlockNumber: 1, } - defer func() { if r := recover(); r != nil { t.Errorf("assetAddedABI.Unpack panicked on malformed data: %v", r) } }() - _, err := assetAddedABI.Unpack(vLog.Data) if err == nil { t.Error("expected decode error for malformed data, got nil") @@ -197,7 +183,6 @@ func buildBasketCreatedLog( rebalancing bool, ) types.Log { data, _ := basketCreatedABI.Pack(name, symbol, thesis, constituents, weights, rebalancing) - return types.Log{ Topics: []common.Hash{ topicBasketCreated, @@ -215,20 +200,14 @@ func TestDecodeBasketCreatedLog_AllFields(t *testing.T) { basket := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") creatorToken := common.HexToAddress("0x29ba5c3470b3a6c06bd6cce2e43c019d846c01c0") creator := common.HexToAddress("0x4e4b989abe79381c1b8a4871d6af481b175f4865") - constituents := []common.Address{ common.HexToAddress("0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e"), common.HexToAddress("0x5884ad2f920c162cfbbacc88c9c51aa75ec09e02"), common.HexToAddress("0x71178bac73cbeb415514eb542a8995b82669778d"), } - weights := []*big.Int{ - big.NewInt(5000), - big.NewInt(3000), - big.NewInt(2000), - } + weights := []*big.Int{big.NewInt(5000), big.NewInt(3000), big.NewInt(2000)} - vLog := buildBasketCreatedLog( - basket, creatorToken, creator, + vLog := buildBasketCreatedLog(basket, creatorToken, creator, "AI Infrastructure", "AIIB", "Companies building the physical infrastructure for AI", constituents, weights, false, @@ -282,7 +261,6 @@ func TestDecodeBasketCreatedLog_AllFields(t *testing.T) { if len(gotWeights) != 3 { t.Errorf("expected 3 weights, got %d", len(gotWeights)) } - totalWeight := int64(0) for _, w := range gotWeights { totalWeight += w.Int64() @@ -290,17 +268,13 @@ func TestDecodeBasketCreatedLog_AllFields(t *testing.T) { if totalWeight != 10000 { t.Errorf("weights must sum to 10000, got %d", totalWeight) } - if gotRebal != false { t.Errorf("rebalancing: expected false, got %v", gotRebal) } } func TestDecodeBasketCreatedLog_InsufficientTopics(t *testing.T) { - vLog := types.Log{ - Topics: []common.Hash{topicBasketCreated, {}, {}}, - Data: []byte{}, - } + vLog := types.Log{Topics: []common.Hash{topicBasketCreated, {}, {}}, Data: []byte{}} if len(vLog.Topics) >= 4 { t.Error("test setup error: expected fewer than 4 topics") } @@ -316,13 +290,11 @@ func TestDecodeBasketCreatedLog_MalformedData(t *testing.T) { }, Data: []byte("garbage"), } - defer func() { if r := recover(); r != nil { t.Errorf("basketCreatedABI.Unpack panicked on malformed data: %v", r) } }() - decoded, err := basketCreatedABI.Unpack(vLog.Data) if err == nil && len(decoded) >= 6 { t.Error("expected decode error or insufficient fields for garbage data") @@ -331,16 +303,10 @@ func TestDecodeBasketCreatedLog_MalformedData(t *testing.T) { // Deposited log tests -// buildDepositedLog constructs a Deposited log using the ABI packer — -// matching exactly what the chain emits. func buildDepositedLog(investor common.Address, usdgAmount, tokensMinted, feeUsdg *big.Int) types.Log { data, _ := depositedABI.Pack(usdgAmount, tokensMinted, feeUsdg) - return types.Log{ - Topics: []common.Hash{ - topicDeposited, - common.BytesToHash(investor.Bytes()), - }, + Topics: []common.Hash{topicDeposited, common.BytesToHash(investor.Bytes())}, Data: data, BlockNumber: 67369113, TxHash: common.HexToHash("0xdeposittx"), @@ -360,19 +326,14 @@ func TestDecodeDepositedLog_ViaABI(t *testing.T) { if err != nil || len(decoded) < 3 { t.Fatalf("depositedABI.Unpack failed: %v", err) } - - gotUsdg := decoded[0].(*big.Int) - gotTokens := decoded[1].(*big.Int) - gotFee := decoded[2].(*big.Int) - - if gotUsdg.Cmp(usdgAmount) != 0 { - t.Errorf("usdgAmount: expected %s, got %s", usdgAmount, gotUsdg) + if decoded[0].(*big.Int).Cmp(usdgAmount) != 0 { + t.Errorf("usdgAmount: expected %s, got %s", usdgAmount, decoded[0]) } - if gotTokens.Cmp(tokensMinted) != 0 { - t.Errorf("tokensMinted: expected %s, got %s", tokensMinted, gotTokens) + if decoded[1].(*big.Int).Cmp(tokensMinted) != 0 { + t.Errorf("tokensMinted: expected %s, got %s", tokensMinted, decoded[1]) } - if gotFee.Cmp(feeUsdg) != 0 { - t.Errorf("feeUsdg: expected %s, got %s", feeUsdg, gotFee) + if decoded[2].(*big.Int).Cmp(feeUsdg) != 0 { + t.Errorf("feeUsdg: expected %s, got %s", feeUsdg, decoded[2]) } } @@ -388,7 +349,6 @@ func TestDecodeDepositedLog_Amounts(t *testing.T) { if err != nil || len(decoded) < 3 { t.Fatalf("depositedABI.Unpack failed: %v", err) } - if decoded[0].(*big.Int).Cmp(usdgAmount) != 0 { t.Errorf("usdg: expected %s, got %s", usdgAmount, decoded[0]) } @@ -402,7 +362,7 @@ func TestDecodeDepositedLog_Amounts(t *testing.T) { func TestDecodeDepositedLog_InvestorAddress(t *testing.T) { expected := common.HexToAddress("0x4e4B989abE79381C1B8a4871d6aF481B175F4865") - vLog := buildDepositedLog(expected, big.NewInt(1), big.NewInt(1), big.NewInt(0)) + vLog := buildDepositedLog(expected, big.NewInt(1), big.NewInt(1), big.NewInt(0)) got := common.HexToAddress(vLog.Topics[1].Hex()) if strings.ToLower(got.Hex()) != strings.ToLower(expected.Hex()) { @@ -426,12 +386,8 @@ func TestDecodeDepositedLog_InsufficientTopics(t *testing.T) { func buildRedeemedLog(investor common.Address, tokensBurned, usdgReturned, feeUsdg *big.Int) types.Log { data, _ := redeemedABI.Pack(tokensBurned, usdgReturned, feeUsdg) - return types.Log{ - Topics: []common.Hash{ - topicRedeemed, - common.BytesToHash(investor.Bytes()), - }, + Topics: []common.Hash{topicRedeemed, common.BytesToHash(investor.Bytes())}, Data: data, BlockNumber: 67370000, TxHash: common.HexToHash("0xredeemtx"), @@ -451,19 +407,14 @@ func TestDecodeRedeemedLog_ViaABI(t *testing.T) { if err != nil || len(decoded) < 3 { t.Fatalf("redeemedABI.Unpack failed: %v", err) } - - gotBurned := decoded[0].(*big.Int) - gotReturned := decoded[1].(*big.Int) - gotFee := decoded[2].(*big.Int) - - if gotBurned.Cmp(tokensBurned) != 0 { - t.Errorf("tokensBurned: expected %s, got %s", tokensBurned, gotBurned) + if decoded[0].(*big.Int).Cmp(tokensBurned) != 0 { + t.Errorf("tokensBurned: expected %s, got %s", tokensBurned, decoded[0]) } - if gotReturned.Cmp(usdgReturned) != 0 { - t.Errorf("usdgReturned: expected %s, got %s", usdgReturned, gotReturned) + if decoded[1].(*big.Int).Cmp(usdgReturned) != 0 { + t.Errorf("usdgReturned: expected %s, got %s", usdgReturned, decoded[1]) } - if gotFee.Cmp(feeUsdg) != 0 { - t.Errorf("feeUsdg: expected %s, got %s", feeUsdg, gotFee) + if decoded[2].(*big.Int).Cmp(feeUsdg) != 0 { + t.Errorf("feeUsdg: expected %s, got %s", feeUsdg, decoded[2]) } } @@ -479,7 +430,6 @@ func TestDecodeRedeemedLog_Amounts(t *testing.T) { if err != nil || len(decoded) < 3 { t.Fatalf("redeemedABI.Unpack failed: %v", err) } - if decoded[0].(*big.Int).Cmp(tokensBurned) != 0 { t.Errorf("tokens burned: expected %s, got %s", tokensBurned, decoded[0]) } @@ -493,30 +443,24 @@ func TestDecodeRedeemedLog_Amounts(t *testing.T) { // FeeSnapshot log tests -func buildFeeSnapshotLog(basketAddr common.Address, snapshotID int64, usdgAmount *big.Int) types.Log { +func buildFeeSnapshotLog(addr common.Address, snapshotID int64, usdgAmount *big.Int) types.Log { totalSupply := new(big.Int).Mul(big.NewInt(1_000_000), new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil)) data, _ := feeSnapshotABI.Pack(usdgAmount, totalSupply) - - snapIDHash := common.BigToHash(big.NewInt(snapshotID)) - return types.Log{ - Topics: []common.Hash{ - topicFeeSnapshoted, - snapIDHash, - }, + Topics: []common.Hash{topicFeeSnapshoted, common.BigToHash(big.NewInt(snapshotID))}, Data: data, BlockNumber: 67369113, TxHash: common.HexToHash("0xfeesnaptx"), - Address: basketAddr, + Address: addr, } } func TestDecodeFeeSnapshotLog_ViaABI(t *testing.T) { - basket := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") + addr := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") snapshotID := int64(3) usdgAmount := big.NewInt(40_000) - vLog := buildFeeSnapshotLog(basket, snapshotID, usdgAmount) + vLog := buildFeeSnapshotLog(addr, snapshotID, usdgAmount) gotSnapshotID := new(big.Int).SetBytes(vLog.Topics[1].Bytes()).Int64() if gotSnapshotID != snapshotID { @@ -527,19 +471,17 @@ func TestDecodeFeeSnapshotLog_ViaABI(t *testing.T) { if err != nil || len(decoded) < 1 { t.Fatalf("feeSnapshotABI.Unpack failed: %v", err) } - - gotAmount := decoded[0].(*big.Int) - if gotAmount.Cmp(usdgAmount) != 0 { - t.Errorf("usdgAmount: expected %s, got %s", usdgAmount, gotAmount) + if decoded[0].(*big.Int).Cmp(usdgAmount) != 0 { + t.Errorf("usdgAmount: expected %s, got %s", usdgAmount, decoded[0]) } } func TestDecodeFeeSnapshotLog(t *testing.T) { - basket := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") + addr := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") snapshotID := int64(1) usdgAmount := big.NewInt(40_000) - vLog := buildFeeSnapshotLog(basket, snapshotID, usdgAmount) + vLog := buildFeeSnapshotLog(addr, snapshotID, usdgAmount) if len(vLog.Topics) < 2 { t.Fatal("expected at least 2 topics") @@ -554,10 +496,8 @@ func TestDecodeFeeSnapshotLog(t *testing.T) { if err != nil || len(decoded) < 1 { t.Fatalf("feeSnapshotABI.Unpack failed: %v", err) } - - gotAmount := decoded[0].(*big.Int) - if gotAmount.Cmp(usdgAmount) != 0 { - t.Errorf("usdgAmount: expected %s, got %s", usdgAmount, gotAmount) + if decoded[0].(*big.Int).Cmp(usdgAmount) != 0 { + t.Errorf("usdgAmount: expected %s, got %s", usdgAmount, decoded[0]) } } @@ -647,7 +587,6 @@ func TestBlockTsCache_Eviction(t *testing.T) { idx, _ := New(nil, "", "", "0x1", 0, nil) - // Fill the cache to exactly blockTsCacheMax entries. for i := uint64(0); i < blockTsCacheMax; i++ { idx.blockTsMu.Lock() if len(idx.blockTsFIFO) >= blockTsCacheMax { @@ -664,7 +603,6 @@ func TestBlockTsCache_Eviction(t *testing.T) { t.Fatalf("cache should have exactly %d entries, got %d", blockTsCacheMax, len(idx.blockTs)) } - // Adding one more should evict the oldest (block 0). newBlock := uint64(blockTsCacheMax) idx.blockTsMu.Lock() if len(idx.blockTsFIFO) >= blockTsCacheMax { @@ -677,7 +615,7 @@ func TestBlockTsCache_Eviction(t *testing.T) { idx.blockTsMu.Unlock() idx.blockTsMu.Lock() - _, block0Present := idx.blockTs[0] + _, block0Present := idx.blockTs[0] _, newBlockPresent := idx.blockTs[newBlock] cacheLen := len(idx.blockTs) idx.blockTsMu.Unlock() @@ -704,8 +642,6 @@ func TestBlockTsCache_HitReturnsCachedValue(t *testing.T) { idx.blockTsFIFO = append(idx.blockTsFIFO, blockTsEntry{blockNumber: 12345, timestamp: 1780525194}) idx.blockTsMu.Unlock() - // blockTimestamp with a nil client — should return the cached value without - // making any RPC call, so no panic despite the nil client. ts := idx.blockTimestamp(nil, 12345) if ts != 1780525194 { t.Errorf("cache hit: expected 1780525194, got %d", ts) @@ -734,15 +670,9 @@ func FuzzDecodeAssetAddedData(f *testing.F) { } func FuzzDecodeBasketCreatedData(f *testing.F) { - constituents := []common.Address{ - common.HexToAddress("0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e"), - } - weights := []*big.Int{big.NewInt(10000)} - validData, _ := basketCreatedABI.Pack( - "AI Infrastructure", "AIIB", - "Companies building AI infrastructure", - constituents, weights, false, - ) + constituents := []common.Address{common.HexToAddress("0xc9f9c86933092bbbfff3ccb4b105a4a94bf3bd4e")} + weights := []*big.Int{big.NewInt(10000)} + validData, _ := basketCreatedABI.Pack("AI Infrastructure", "AIIB", "Companies building AI infrastructure", constituents, weights, false) f.Add(validData) f.Add([]byte{}) f.Add([]byte("garbage")) diff --git a/server/main.go b/server/main.go index 020b2d8..f75b500 100644 --- a/server/main.go +++ b/server/main.go @@ -40,11 +40,6 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err = database.Migrate() - if err != nil { - log.Fatalf("failed to migrate database: %v", err) - } - idx, err := indexer.New(ctx, wsURL, rpcURL, registryAddr, deployBlock, database) if err != nil { log.Fatalf("failed to start indexer: %v", err) From 765d1da783697b5ec17eb1fd0bc7a21a7b026a3f Mon Sep 17 00:00:00 2001 From: 0xull Date: Fri, 12 Jun 2026 07:12:38 +0100 Subject: [PATCH 2/5] [server]: rewrite the indexer to poll live event logs in sized chunks instead Signed-off-by: 0xull --- server/indexer/indexer.go | 164 +++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 90 deletions(-) diff --git a/server/indexer/indexer.go b/server/indexer/indexer.go index a34bb3f..f95f0c8 100644 --- a/server/indexer/indexer.go +++ b/server/indexer/indexer.go @@ -26,8 +26,8 @@ const rpcTimeout = 15 * time.Second const chunkSize = int64(2000) const seedMaxAttempts = 5 const blockTsCacheMax = 4096 -const reconnectBaseDelay = 1 * time.Second -const reconnectMaxDelay = 5 * time.Minute +const liveChunkSize = int64(50) +const livePollInterval = 2 * time.Second var ( topicBasketCreated = eventTopic("BasketCreated(address,address,address,string,string,string,address[],uint256[],bool)") @@ -183,6 +183,7 @@ func init() { } // RevenueSnapshoted(uint256 indexed snapshotId, uint256 usdgAmount, uint256 totalSupply) + // snapshotId is Topics[1]; usdgAmount and totalSupply are in Data. feeSnapshotABI = abi.Arguments{ {Name: "usdgAmount", Type: uint256Type}, {Name: "totalSupply", Type: uint256Type}, @@ -212,11 +213,10 @@ func newChunkAddrs() *chunkAddrs { } } -// Indexer subscribes to new block headers via WebSocket and fetches event logs -// per block via HTTP using a topic-only filter. Address verification is +// Indexer polls for new blocks on a fixed interval and fetches event logs as +// a batched range query using a topic-only filter. Address verification is // performed against in-memory sets populated from the DB at startup and kept -// current as new baskets are indexed. The topic-only filter means the -// subscription never needs to be rebuilt when new contracts are deployed. +// current as new baskets are indexed. type Indexer struct { ctx context.Context wsURL string @@ -265,45 +265,13 @@ func New( }, nil } -// Run starts the indexer. Chain state is synced before any live processing -// begins so the address sets are fully populated. +// Run starts the indexer. Chain state is synced first so the address sets are +// fully populated before any event processing begins. The historical scan and +// live poller then run sequentially on the same cursor. func (idx *Indexer) Run() { idx.syncFromChain() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - idx.scanHistoricalEvents() - }() - - delay := reconnectBaseDelay - for { - select { - case <-idx.ctx.Done(): - wg.Wait() - return - default: - } - - if err := idx.subscribeBlocks(); err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - wg.Wait() - return - } - log.Printf("indexer: block subscription error: %v — reconnecting in %s", err, delay) - select { - case <-idx.ctx.Done(): - wg.Wait() - return - case <-time.After(delay): - } - delay = minDuration(delay*2, reconnectMaxDelay) - } else { - wg.Wait() - return - } - } + idx.scanHistoricalEvents() + idx.pollLiveBlocks() } func (idx *Indexer) syncFromChain() { @@ -814,71 +782,85 @@ func (idx *Indexer) scanHistoricalEvents() { log.Printf("indexer: historical scan complete through block %d", toBlock) } -// subscribeBlocks subscribes to new block headers via WebSocket. For each -// header, event logs for that block are fetched via HTTP using a topic-only -// filter. -func (idx *Indexer) subscribeBlocks() error { - dialCtx, dialCancel := context.WithTimeout(idx.ctx, rpcTimeout) - defer dialCancel() - - wsClient, err := ethclient.DialContext(dialCtx, idx.wsURL) +// pollLiveBlocks runs after the historical scan completes and polls for new +// blocks on a fixed interval. Each tick fetches at most liveChunkSize blocks +// as a single eth_getLogs call, advancing the cursor on every successful +// write. +func (idx *Indexer) pollLiveBlocks() { + client, err := idx.newHTTPClient() if err != nil { - return fmt.Errorf("ws dial: %w", err) + log.Printf("indexer: pollLiveBlocks dial error: %v", err) + return } - defer wsClient.Close() + defer client.Close() - headers := make(chan *types.Header, 16) - sub, err := wsClient.SubscribeNewHead(idx.ctx, headers) - if err != nil { - return fmt.Errorf("subscribe new head: %w", err) - } - defer sub.Unsubscribe() + log.Printf("indexer: live polling active (interval=%s, chunkSize=%d)", livePollInterval, liveChunkSize) - httpClient, err := idx.newHTTPClient() - if err != nil { - return fmt.Errorf("http dial: %w", err) - } - defer httpClient.Close() - - log.Println("indexer: live block subscription active") + ticker := time.NewTicker(livePollInterval) + defer ticker.Stop() for { select { case <-idx.ctx.Done(): - return nil + return + case <-ticker.C: + if err := idx.pollOnce(client); err != nil { + log.Printf("indexer: pollOnce error: %v", err) + } + } + } +} - case err := <-sub.Err(): - return fmt.Errorf("subscription: %w", err) +// pollOnce fetches the next batch of blocks from the cursor to the current +// chain tip, capped at liveChunkSize, and writes any matching logs atomically. +func (idx *Indexer) pollOnce(client *ethclient.Client) error { + var fromBlock int64 + if err := idx.db.QueryRow( + `SELECT block_num FROM sync_cursors WHERE key = 'events'`, + ).Scan(&fromBlock); err != nil { + return fmt.Errorf("read cursor: %w", err) + } - case header := <-headers: - blockNum := header.Number.Int64() + hctx, hcancel := context.WithTimeout(idx.ctx, rpcTimeout) + latestHeader, err := client.HeaderByNumber(hctx, nil) + hcancel() + if err != nil { + return fmt.Errorf("latest block: %w", err) + } + toBlock := latestHeader.Number.Int64() - fetchCtx, fetchCancel := context.WithTimeout(idx.ctx, rpcTimeout) - logs, err := httpClient.FilterLogs(fetchCtx, ethereum.FilterQuery{ - FromBlock: header.Number, - ToBlock: header.Number, - Topics: knownTopics, - }) - fetchCancel() + if fromBlock >= toBlock { + return nil + } - if err != nil { - log.Printf("indexer: FilterLogs block %d: %v — skipping block", blockNum, err) - continue - } + end := fromBlock + liveChunkSize + if end > toBlock { + end = toBlock + } - if len(logs) == 0 { - continue - } + filterCtx, filterCancel := context.WithTimeout(idx.ctx, rpcTimeout) + logs, err := client.FilterLogs(filterCtx, ethereum.FilterQuery{ + FromBlock: big.NewInt(fromBlock + 1), + ToBlock: big.NewInt(end), + Topics: knownTopics, + }) + filterCancel() - if err := idx.writeChunkAtomic(httpClient, logs, blockNum); err != nil { - log.Printf("indexer: writeChunkAtomic block %d: %v", blockNum, err) - } - } + if err != nil { + return fmt.Errorf("FilterLogs [%d-%d]: %w", fromBlock+1, end, err) } + + return idx.writeChunkAtomic(client, logs, end) } // writeChunkAtomic writes all logs for a block or scan chunk atomically and // advances the event cursor to endBlock. +// +// BasketFactory emits Deposited before BasketCreated in the same transaction, +// so both appear in the same block and therefore the same chunk. A pre-pass +// processes all BasketCreated logs first and populates the chunk-local address +// set so that Deposited events from the same transaction are correctly +// attributed before the global sets are updated after commit. func (idx *Indexer) writeChunkAtomic(client *ethclient.Client, logs []types.Log, endBlock int64) error { tx, err := idx.db.Begin() if err != nil { @@ -1220,7 +1202,9 @@ func (idx *Indexer) writeRebalanced(tx *sql.Tx, client *ethclient.Client, vLog t } // writeFeeSnapshot handles RevenueSnapshoted events emitted by CreatorToken -// contracts. vLog.Address is the CreatorToken contract address. +// contracts. vLog.Address is the CreatorToken contract address, not the basket +// proxy. The basket address is resolved from creatorTokenToBasket then the +// chunk-local map. func (idx *Indexer) writeFeeSnapshot(tx *sql.Tx, client *ethclient.Client, vLog types.Log, local *chunkAddrs) error { if len(vLog.Topics) < 2 { return nil From 12382b6c8b024f6501cb02088382b9a315eaf361 Mon Sep 17 00:00:00 2001 From: 0xull Date: Fri, 12 Jun 2026 07:52:24 +0100 Subject: [PATCH 3/5] =?UTF-8?q?[backend]:=20fix=20api=20=E2=80=94=20shared?= =?UTF-8?q?=20RPC=20client,=20package-level=20ABI=20parsing,=20correct=20n?= =?UTF-8?q?eedsRebalancing,=20default=20nav=20to=200?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 0xull --- server/api/api.go | 194 ++++++++++++++++++++++++++-------------------- 1 file changed, 109 insertions(+), 85 deletions(-) diff --git a/server/api/api.go b/server/api/api.go index 2a48ea5..57dfbeb 100644 --- a/server/api/api.go +++ b/server/api/api.go @@ -25,6 +25,52 @@ import ( // made by the API handlers. const rpcTimeout = 15 * time.Second +// basketStateABI is parsed once at startup and reused across all handler +// invocations. +var basketStateABI abi.ABI + +// claimableRevenueABI is parsed once at startup and reused by getClaimableSnapshots. +var claimableRevenueABI abi.ABI + +func init() { + var err error + + basketStateABI, err = abi.JSON(strings.NewReader(`[{ + "inputs": [], + "name": "basketState", + "outputs": [ + {"internalType":"address[]","name":"constituents", "type":"address[]"}, + {"internalType":"uint256[]","name":"targetWeights", "type":"uint256[]"}, + {"internalType":"uint256[]","name":"currentWeights", "type":"uint256[]"}, + {"internalType":"uint256[]","name":"balances", "type":"uint256[]"}, + {"internalType":"uint256", "name":"totalValue", "type":"uint256"}, + {"internalType":"uint256", "name":"nav", "type":"uint256"}, + {"internalType":"bool", "name":"rebalancingEnabled", "type":"bool"}, + {"internalType":"uint256", "name":"driftThresholdBps", "type":"uint256"}, + {"internalType":"uint256", "name":"maxDrift", "type":"uint256"} + ], + "stateMutability":"view", + "type":"function" + }]`)) + if err != nil { + panic(fmt.Sprintf("api: parse basketStateABI: %v", err)) + } + + claimableRevenueABI, err = abi.JSON(strings.NewReader(`[{ + "inputs":[ + {"internalType":"address","name":"account", "type":"address"}, + {"internalType":"uint256","name":"snapshotId", "type":"uint256"} + ], + "name":"claimableRevenue", + "outputs":[{"internalType":"uint256","name":"","type":"uint256"}], + "stateMutability":"view", + "type":"function" + }]`)) + if err != nil { + panic(fmt.Sprintf("api: parse claimableRevenueABI: %v", err)) + } +} + // snapshotEntry is shared between getCreatorDashboard and getClaimableSnapshots. type snapshotEntry struct { SnapshotID int64 `json:"snapshotId"` @@ -34,14 +80,31 @@ type snapshotEntry struct { ClaimableUsdg string `json:"claimableByWallet"` } -// NewRouter wires all HTTP routes. +// NewRouter wires all HTTP routes. The RPC client is initialised once here +// and shared across all handlers that need it, avoiding per-request dial +// overhead on cache misses. func NewRouter(database *db.DB, openAIKey string, openAIModel string) http.Handler { mux := http.NewServeMux() + rpcURL := os.Getenv("RPC_URL") + if rpcURL == "" { + rpcURL = "https://rpc.testnet.chain.robinhood.com" + } + + dialCtx, dialCancel := context.WithTimeout(context.Background(), rpcTimeout) + defer dialCancel() + + rpcClient, err := ethclient.DialContext(dialCtx, rpcURL) + if err != nil { + log.Printf("api: RPC dial warning: %v — RPC-dependent handlers will degrade gracefully", err) + rpcClient = nil + } + h := &handler{ db: database, openAIKey: openAIKey, openAIModel: openAIModel, + rpcClient: rpcClient, } mux.HandleFunc("GET /baskets", h.listBaskets) @@ -65,6 +128,7 @@ type handler struct { db *db.DB openAIKey string openAIModel string + rpcClient *ethclient.Client } // Marketplace @@ -320,7 +384,8 @@ func (h *handler) getBasket(w http.ResponseWriter, r *http.Request) { } } - var navPerToken, totalValueUsdg string + navPerToken := "0" + totalValueUsdg := "0" var maxDriftBps int64 var needsRebalancing bool @@ -330,10 +395,14 @@ func (h *handler) getBasket(w http.ResponseWriter, r *http.Request) { maxDriftBps = state.MaxDriftBps needsRebalancing = state.NeedsRebalancing } else { - h.db.QueryRow(` + var dbNav, dbTv string + if h.db.QueryRow(` SELECT nav_per_token, total_value_usdg FROM nav_history WHERE basket_address = ? ORDER BY timestamp DESC LIMIT 1`, addr, - ).Scan(&navPerToken, &totalValueUsdg) + ).Scan(&dbNav, &dbTv) == nil { + navPerToken = dbNav + totalValueUsdg = dbTv + } } navChange24h := h.navChangePct(addr, 86400) @@ -441,44 +510,16 @@ func (h *handler) getBasketStateFromCache(ctx context.Context, basketAddr string } func (h *handler) fetchBasketStateRPC(ctx context.Context, basketAddr string) (*basketStateCache, error) { - basketStateABI, _ := abi.JSON(strings.NewReader(`[{ - "inputs": [], - "name": "basketState", - "outputs": [ - {"internalType":"address[]","name":"constituents", "type":"address[]"}, - {"internalType":"uint256[]","name":"targetWeights", "type":"uint256[]"}, - {"internalType":"uint256[]","name":"currentWeights", "type":"uint256[]"}, - {"internalType":"uint256[]","name":"balances", "type":"uint256[]"}, - {"internalType":"uint256", "name":"totalValue", "type":"uint256"}, - {"internalType":"uint256", "name":"nav", "type":"uint256"}, - {"internalType":"bool", "name":"rebalancingEnabled", "type":"bool"}, - {"internalType":"uint256", "name":"driftThresholdBps", "type":"uint256"}, - {"internalType":"uint256", "name":"maxDrift", "type":"uint256"} - ], - "stateMutability":"view", - "type":"function" - }]`)) - - rpcURL := os.Getenv("RPC_URL") - if rpcURL == "" { - rpcURL = "https://rpc.testnet.chain.robinhood.com" - } - - dialCtx, dialCancel := context.WithTimeout(ctx, rpcTimeout) - defer dialCancel() - - client, err := ethclient.DialContext(dialCtx, rpcURL) - if err != nil { - return nil, fmt.Errorf("dial: %w", err) + if h.rpcClient == nil { + return nil, fmt.Errorf("RPC client not available") } - defer client.Close() addr := common.HexToAddress(basketAddr) callCtx, callCancel := context.WithTimeout(ctx, rpcTimeout) defer callCancel() - data, err := client.CallContract(callCtx, ethereum.CallMsg{ + data, err := h.rpcClient.CallContract(callCtx, ethereum.CallMsg{ To: &addr, Data: basketStateABI.Methods["basketState"].ID, }, nil) @@ -497,9 +538,17 @@ func (h *handler) fetchBasketStateRPC(ctx context.Context, basketAddr string) (* balances, _ := unpacked[3].([]*big.Int) totalValue, _ := unpacked[4].(*big.Int) nav, _ := unpacked[5].(*big.Int) - needsRebal, _ := unpacked[6].(bool) + rebalancingEnabled, _ := unpacked[6].(bool) + driftThresholdBps, _ := unpacked[7].(*big.Int) maxDrift, _ := unpacked[8].(*big.Int) + // needsRebalancing is true when rebalancing is enabled and the maximum + // drift across all constituents meets or exceeds the basket's threshold. + needsRebal := false + if rebalancingEnabled && maxDrift != nil && driftThresholdBps != nil { + needsRebal = maxDrift.Cmp(driftThresholdBps) >= 0 + } + constituentAddrs := make([]string, len(constituents)) for i, c := range constituents { constituentAddrs[i] = strings.ToLower(c.Hex()) @@ -1329,23 +1378,6 @@ func (h *handler) getCreatorDashboard(w http.ResponseWriter, r *http.Request) { } } - rpcURL := os.Getenv("RPC_URL") - if rpcURL == "" { - rpcURL = "https://rpc.testnet.chain.robinhood.com" - } - - rpcCtx, rpcCancel := context.WithTimeout(r.Context(), rpcTimeout) - defer rpcCancel() - - rpcClient, rpcErr := ethclient.DialContext(rpcCtx, rpcURL) - if rpcErr != nil { - log.Printf("api: getCreatorDashboard dial: %v", rpcErr) - rpcClient = nil - } - if rpcClient != nil { - defer rpcClient.Close() - } - totalClaimable := new(big.Int) result := make([]BasketEntry, 0, len(basketOrder)) @@ -1356,7 +1388,7 @@ func (h *handler) getCreatorDashboard(w http.ResponseWriter, r *http.Request) { snaps = []snapshotEntry{} } - unclaimed := h.getClaimableSnapshots(rpcCtx, rpcClient, wallet, addr, m.creatorToken, snaps) + unclaimed := h.getClaimableSnapshots(r.Context(), wallet, addr, m.creatorToken, snaps) basketClaimable := new(big.Int) for _, s := range unclaimed { @@ -1387,10 +1419,9 @@ func (h *handler) getCreatorDashboard(w http.ResponseWriter, r *http.Request) { // getClaimableSnapshots returns claimable amounts per snapshot, reading from // creator_claimable_cache if fresh (< 60s), otherwise calling claimableRevenue() -// on the contract. +// on the contract via the shared RPC client. func (h *handler) getClaimableSnapshots( ctx context.Context, - client *ethclient.Client, wallet, basketAddr, creatorTokenAddr string, snapshots []snapshotEntry, ) []snapshotEntry { @@ -1398,17 +1429,6 @@ func (h *handler) getClaimableSnapshots( return []snapshotEntry{} } - claimableABI, _ := abi.JSON(strings.NewReader(`[{ - "inputs":[ - {"internalType":"address","name":"account", "type":"address"}, - {"internalType":"uint256","name":"snapshotId", "type":"uint256"} - ], - "name":"claimableRevenue", - "outputs":[{"internalType":"uint256","name":"","type":"uint256"}], - "stateMutability":"view", - "type":"function" - }]`)) - now := time.Now().Unix() type cachedEntry struct { @@ -1446,15 +1466,16 @@ func (h *handler) getClaimableSnapshots( } claimable := "0" - if client != nil { - input, err := claimableABI.Pack("claimableRevenue", + rpcSucceeded := false + + if h.rpcClient != nil { + input, err := claimableRevenueABI.Pack("claimableRevenue", walletAddr, new(big.Int).SetInt64(snap.SnapshotID), ) if err == nil { - // Per-call timeout within the overall request context. callCtx, callCancel := context.WithTimeout(ctx, rpcTimeout) - data, err := client.CallContract(callCtx, ethereum.CallMsg{ + data, err := h.rpcClient.CallContract(callCtx, ethereum.CallMsg{ To: &ctAddr, Data: input, }, nil) @@ -1463,26 +1484,31 @@ func (h *handler) getClaimableSnapshots( if err != nil { log.Printf("api: claimableRevenue RPC snapshot=%d wallet=%s: %v", snap.SnapshotID, wallet, err) } else { - unpacked, err := claimableABI.Methods["claimableRevenue"].Outputs.Unpack(data) + unpacked, err := claimableRevenueABI.Methods["claimableRevenue"].Outputs.Unpack(data) if err == nil && len(unpacked) > 0 { if amount, ok := unpacked[0].(*big.Int); ok && amount != nil { claimable = amount.String() + rpcSucceeded = true } } - - h.db.Exec(` - INSERT INTO creator_claimable_cache - (wallet_address, snapshot_id, basket_address, claimable_usdg, cached_at) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(wallet_address, snapshot_id, basket_address) DO UPDATE SET - claimable_usdg = excluded.claimable_usdg, - cached_at = excluded.cached_at`, - wallet, snap.SnapshotID, basketAddr, claimable, now, - ) } } } + // Only write to cache on RPC success. Caching a zero on RPC failure + // would poison the cache and hide real claimable amounts. + if rpcSucceeded { + h.db.Exec(` + INSERT INTO creator_claimable_cache + (wallet_address, snapshot_id, basket_address, claimable_usdg, cached_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(wallet_address, snapshot_id, basket_address) DO UPDATE SET + claimable_usdg = excluded.claimable_usdg, + cached_at = excluded.cached_at`, + wallet, snap.SnapshotID, basketAddr, claimable, now, + ) + } + snap.ClaimableUsdg = claimable result = append(result, snap) } @@ -1544,7 +1570,6 @@ func (h *handler) aiCompose(w http.ResponseWriter, r *http.Request) { var req struct { Thesis string `json:"thesis"` } - // 32KB limit if err := json.NewDecoder(io.LimitReader(r.Body, 32768)).Decode(&req); err != nil || len(req.Thesis) < 20 { jsonError(w, "thesis must be at least 20 characters", http.StatusBadRequest) return @@ -1601,7 +1626,6 @@ func jsonOK(w http.ResponseWriter, v any) { json.NewEncoder(w).Encode(v) } -// jsonError returns a JSON error body containing only the message string. func jsonError(w http.ResponseWriter, msg string, code int) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) @@ -1662,4 +1686,4 @@ func (h *handler) serveDocs(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") w.WriteHeader(http.StatusOK) w.Write([]byte(swaggerHTML)) -} +} \ No newline at end of file From 3397a0a65a370b121c646d6d670d9763dacd380f Mon Sep 17 00:00:00 2001 From: 0xull Date: Fri, 12 Jun 2026 20:54:45 +0100 Subject: [PATCH 4/5] [backend]: add retry with exponential backoff on transient RPC errors in pollOnce Signed-off-by: 0xull --- server/indexer/indexer.go | 55 ++++++++++++++++++++-------- server/indexer/indexer_test.go | 65 +++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 15 deletions(-) diff --git a/server/indexer/indexer.go b/server/indexer/indexer.go index f95f0c8..9933ad6 100644 --- a/server/indexer/indexer.go +++ b/server/indexer/indexer.go @@ -821,14 +821,18 @@ func (idx *Indexer) pollOnce(client *ethclient.Client) error { return fmt.Errorf("read cursor: %w", err) } - hctx, hcancel := context.WithTimeout(idx.ctx, rpcTimeout) - latestHeader, err := client.HeaderByNumber(hctx, nil) - hcancel() - if err != nil { + var latestHeader *types.Header + if err := retryRPC(idx.ctx, 3, time.Second, func() error { + hctx, hcancel := context.WithTimeout(idx.ctx, rpcTimeout) + defer hcancel() + var rerr error + latestHeader, rerr = client.HeaderByNumber(hctx, nil) + return rerr + }); err != nil { return fmt.Errorf("latest block: %w", err) } - toBlock := latestHeader.Number.Int64() + toBlock := latestHeader.Number.Int64() if fromBlock >= toBlock { return nil } @@ -838,15 +842,18 @@ func (idx *Indexer) pollOnce(client *ethclient.Client) error { end = toBlock } - filterCtx, filterCancel := context.WithTimeout(idx.ctx, rpcTimeout) - logs, err := client.FilterLogs(filterCtx, ethereum.FilterQuery{ - FromBlock: big.NewInt(fromBlock + 1), - ToBlock: big.NewInt(end), - Topics: knownTopics, - }) - filterCancel() - - if err != nil { + var logs []types.Log + if err := retryRPC(idx.ctx, 3, time.Second, func() error { + filterCtx, filterCancel := context.WithTimeout(idx.ctx, rpcTimeout) + defer filterCancel() + var rerr error + logs, rerr = client.FilterLogs(filterCtx, ethereum.FilterQuery{ + FromBlock: big.NewInt(fromBlock + 1), + ToBlock: big.NewInt(end), + Topics: knownTopics, + }) + return rerr + }); err != nil { return fmt.Errorf("FilterLogs [%d-%d]: %w", fromBlock+1, end, err) } @@ -1331,6 +1338,26 @@ func minDuration(a, b time.Duration) time.Duration { return b } +func retryRPC(ctx context.Context, attempts int, base time.Duration, fn func() error) error { + var err error + for i := range attempts { + err = fn() + if err == nil { + return nil + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return err + } + delay := base * (1 << i) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + } + return err +} + func getEnv(key, fallback string) string { if v := strings.TrimSpace(os.Getenv(key)); v != "" { return v diff --git a/server/indexer/indexer_test.go b/server/indexer/indexer_test.go index 5286c32..4677953 100644 --- a/server/indexer/indexer_test.go +++ b/server/indexer/indexer_test.go @@ -1,7 +1,9 @@ package indexer import ( + "context" "encoding/hex" + "fmt" "math/big" "os" "strings" @@ -773,4 +775,65 @@ func FuzzDecodeBigIntFromLogData(f *testing.F) { new(big.Int).SetBytes(data[32:64]) new(big.Int).SetBytes(data[64:96]) }) -} \ No newline at end of file +} + +func TestRetryRPC_SucceedsOnFirstAttempt(t *testing.T) { + calls := 0 + err := retryRPC(context.Background(), 3, time.Millisecond, func() error { + calls++ + return nil + }) + if err != nil { + t.Errorf("expected nil error, got %v", err) + } + if calls != 1 { + t.Errorf("expected 1 call, got %d", calls) + } +} + +func TestRetryRPC_RetriesOnTransientError(t *testing.T) { + calls := 0 + err := retryRPC(context.Background(), 3, time.Millisecond, func() error { + calls++ + if calls < 3 { + return fmt.Errorf("transient error") + } + return nil + }) + if err != nil { + t.Errorf("expected nil after retries, got %v", err) + } + if calls != 3 { + t.Errorf("expected 3 calls, got %d", calls) + } +} + +func TestRetryRPC_StopsOnContextCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + calls := 0 + err := retryRPC(ctx, 3, time.Millisecond, func() error { + calls++ + return fmt.Errorf("some error") + }) + if err == nil { + t.Error("expected error on cancelled context") + } + if calls > 1 { + t.Errorf("expected at most 1 call with cancelled context, got %d", calls) + } +} + +func TestRetryRPC_ExhaustsAttemptsAndReturnsLastError(t *testing.T) { + calls := 0 + err := retryRPC(context.Background(), 3, time.Millisecond, func() error { + calls++ + return fmt.Errorf("persistent error %d", calls) + }) + if err == nil { + t.Error("expected error after exhausting attempts") + } + if calls != 3 { + t.Errorf("expected 3 calls, got %d", calls) + } +} From d1a0ed8cdbf2af94fd84945e2aa5fccbceb22f98 Mon Sep 17 00:00:00 2001 From: 0xull Date: Sat, 13 Jun 2026 05:56:53 +0100 Subject: [PATCH 5/5] [backend]: index RevenueClaimed events, add redemption and claim history to API responses Signed-off-by: 0xull --- server/api/api.go | 178 +++++++++++++++++++++++------------ server/db/schema.sql | 19 +++- server/indexer/indexer.go | 190 ++++++++++++++++++++++++-------------- 3 files changed, 260 insertions(+), 127 deletions(-) diff --git a/server/api/api.go b/server/api/api.go index 57dfbeb..8116b75 100644 --- a/server/api/api.go +++ b/server/api/api.go @@ -128,7 +128,7 @@ type handler struct { db *db.DB openAIKey string openAIModel string - rpcClient *ethclient.Client + rpcClient *ethclient.Client } // Marketplace @@ -316,72 +316,71 @@ func (h *handler) getBasket(w http.ResponseWriter, r *http.Request) { log.Printf("api: getBasketStateFromCache(%s): %v", addr, err) } - // Performance history - var perf []struct { - NavPerToken string `json:"navPerToken"` - TotalValueUsdg string `json:"totalValueUsdg"` - Timestamp int64 `json:"timestamp"` - } - perf = []struct { - NavPerToken string `json:"navPerToken"` - TotalValueUsdg string `json:"totalValueUsdg"` - Timestamp int64 `json:"timestamp"` - }{} - if perfRows, err := h.db.Query(` - SELECT nav_per_token, total_value_usdg, timestamp - FROM nav_history WHERE basket_address = ? - ORDER BY timestamp ASC`, addr); err == nil { - defer perfRows.Close() - for perfRows.Next() { - var p struct { - NavPerToken string `json:"navPerToken"` - TotalValueUsdg string `json:"totalValueUsdg"` - Timestamp int64 `json:"timestamp"` + // Performance history, rebalance history, deposit history, and redemption history. + perf := []PerfEntry{} + rebalHistory := []RebalanceEntry{} + depHistory := []DepositEntry{} + redemptHistory := []RedemptionEntry{} + + readTx, err := h.db.Begin() + if err != nil { + log.Printf("api: getBasket begin read tx: %v", err) + } else { + // Performance history. + if perfRows, err := readTx.Query(` + SELECT nav_per_token, total_value_usdg, timestamp + FROM nav_history WHERE basket_address = ? + ORDER BY timestamp ASC`, addr); err == nil { + defer perfRows.Close() + for perfRows.Next() { + var p PerfEntry + if perfRows.Scan(&p.NavPerToken, &p.TotalValueUsdg, &p.Timestamp) == nil { + perf = append(perf, p) + } } - if perfRows.Scan(&p.NavPerToken, &p.TotalValueUsdg, &p.Timestamp) == nil { - perf = append(perf, p) + } + + // Rebalance history. + if rebRows, err := readTx.Query(` + SELECT timestamp, tx_hash, triggered_by FROM rebalances + WHERE basket_address = ? ORDER BY timestamp DESC LIMIT 50`, addr); err == nil { + defer rebRows.Close() + for rebRows.Next() { + var e RebalanceEntry + if rebRows.Scan(&e.Timestamp, &e.TxHash, &e.TriggeredBy) == nil { + rebalHistory = append(rebalHistory, e) + } } } - } - // Rebalance history. - type RebalanceEntry struct { - Timestamp int64 `json:"timestamp"` - TxHash string `json:"txHash"` - TriggeredBy string `json:"triggeredBy"` - } - rebalHistory := []RebalanceEntry{} - if rebRows, err := h.db.Query(` - SELECT timestamp, tx_hash, triggered_by FROM rebalances - WHERE basket_address = ? ORDER BY timestamp DESC LIMIT 50`, addr); err == nil { - defer rebRows.Close() - for rebRows.Next() { - var e RebalanceEntry - if rebRows.Scan(&e.Timestamp, &e.TxHash, &e.TriggeredBy) == nil { - rebalHistory = append(rebalHistory, e) + // Deposit history. + if depRows, err := readTx.Query(` + SELECT investor_address, usdg_amount, basket_tokens_minted, timestamp, tx_hash + FROM deposits WHERE basket_address = ? ORDER BY timestamp DESC LIMIT 50`, addr); err == nil { + defer depRows.Close() + for depRows.Next() { + var e DepositEntry + if depRows.Scan(&e.Investor, &e.UsdgAmount, &e.BasketTokensMinted, &e.Timestamp, &e.TxHash) == nil { + depHistory = append(depHistory, e) + } } } - } - // Deposit history. - type DepositEntry struct { - Investor string `json:"investor"` - UsdgAmount string `json:"usdgAmount"` - BasketTokensMinted string `json:"basketTokensMinted"` - Timestamp int64 `json:"timestamp"` - TxHash string `json:"txHash"` - } - depHistory := []DepositEntry{} - if depRows, err := h.db.Query(` - SELECT investor_address, usdg_amount, basket_tokens_minted, timestamp, tx_hash - FROM deposits WHERE basket_address = ? ORDER BY timestamp DESC LIMIT 50`, addr); err == nil { - defer depRows.Close() - for depRows.Next() { - var e DepositEntry - if depRows.Scan(&e.Investor, &e.UsdgAmount, &e.BasketTokensMinted, &e.Timestamp, &e.TxHash) == nil { - depHistory = append(depHistory, e) + // Redemption history. + if redRows, err := readTx.Query(` + SELECT investor_address, usdg_returned, basket_tokens_burned, timestamp, tx_hash + FROM redemptions WHERE basket_address = ? ORDER BY timestamp DESC LIMIT 50`, addr); err == nil { + defer redRows.Close() + for redRows.Next() { + var e RedemptionEntry + if redRows.Scan(&e.Investor, &e.UsdgReturned, &e.BasketTokensBurned, &e.Timestamp, &e.TxHash) == nil { + redemptHistory = append(redemptHistory, e) + } } } + + // Read-only transaction — rollback is a no-op but correct. + readTx.Rollback() } navPerToken := "0" @@ -437,9 +436,35 @@ func (h *handler) getBasket(w http.ResponseWriter, r *http.Request) { PerformanceHistory: perf, RebalanceHistory: rebalHistory, DepositHistory: depHistory, + RedemptionHistory: redemptHistory, }) } +type PerfEntry struct { + NavPerToken string `json:"navPerToken"` + TotalValueUsdg string `json:"totalValueUsdg"` + Timestamp int64 `json:"timestamp"` +} +type RebalanceEntry struct { + Timestamp int64 `json:"timestamp"` + TxHash string `json:"txHash"` + TriggeredBy string `json:"triggeredBy"` +} +type DepositEntry struct { + Investor string `json:"investor"` + UsdgAmount string `json:"usdgAmount"` + BasketTokensMinted string `json:"basketTokensMinted"` + Timestamp int64 `json:"timestamp"` + TxHash string `json:"txHash"` +} +type RedemptionEntry struct { + Investor string `json:"investor"` + UsdgReturned string `json:"usdgReturned"` + BasketTokensBurned string `json:"basketTokensBurned"` + Timestamp int64 `json:"timestamp"` + TxHash string `json:"txHash"` +} + type BasketDetailResponse struct { Address string `json:"address"` CreatorToken string `json:"creatorToken"` @@ -462,6 +487,7 @@ type BasketDetailResponse struct { PerformanceHistory any `json:"performanceHistory"` RebalanceHistory any `json:"rebalanceHistory"` DepositHistory any `json:"depositHistory"` + RedemptionHistory any `json:"redemptionHistory"` } // basketStateCache is the shape stored in and read from basket_state_cache. @@ -1298,6 +1324,15 @@ type BasketEntry struct { TotalClaimableUsdg string `json:"totalClaimableUsdg"` UnclaimedSnapshots []snapshotEntry `json:"unclaimedSnapshots"` RevenueHistory []snapshotEntry `json:"revenueHistory"` + ClaimHistory []ClaimEntry `json:"claimHistory"` +} + +type ClaimEntry struct { + Claimer string `json:"claimer"` + SnapshotID int64 `json:"snapshotId"` + UsdgAmount string `json:"usdgAmount"` + Timestamp int64 `json:"timestamp"` + TxHash string `json:"txHash"` } func (h *handler) getCreatorDashboard(w http.ResponseWriter, r *http.Request) { @@ -1378,6 +1413,25 @@ func (h *handler) getCreatorDashboard(w http.ResponseWriter, r *http.Request) { } } + claimsByBasket := make(map[string][]ClaimEntry) + claimRows, err := h.db.Query( + `SELECT basket_address, claimer_address, snapshot_id, usdg_amount, timestamp, tx_hash + FROM revenue_claims + WHERE basket_address IN (`+strings.Join(placeholders, ",")+`) + ORDER BY basket_address, timestamp DESC`, + args..., + ) + if err == nil { + defer claimRows.Close() + for claimRows.Next() { + var bAddr string + var c ClaimEntry + if claimRows.Scan(&bAddr, &c.Claimer, &c.SnapshotID, &c.UsdgAmount, &c.Timestamp, &c.TxHash) == nil { + claimsByBasket[bAddr] = append(claimsByBasket[bAddr], c) + } + } + } + totalClaimable := new(big.Int) result := make([]BasketEntry, 0, len(basketOrder)) @@ -1398,6 +1452,11 @@ func (h *handler) getCreatorDashboard(w http.ResponseWriter, r *http.Request) { } totalClaimable.Add(totalClaimable, basketClaimable) + claims := claimsByBasket[addr] + if claims == nil { + claims = []ClaimEntry{} + } + result = append(result, BasketEntry{ BasketAddress: addr, BasketName: m.name, @@ -1407,6 +1466,7 @@ func (h *handler) getCreatorDashboard(w http.ResponseWriter, r *http.Request) { TotalClaimableUsdg: basketClaimable.String(), UnclaimedSnapshots: unclaimed, RevenueHistory: snaps, + ClaimHistory: claims, }) } @@ -1686,4 +1746,4 @@ func (h *handler) serveDocs(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") w.WriteHeader(http.StatusOK) w.Write([]byte(swaggerHTML)) -} \ No newline at end of file +} diff --git a/server/db/schema.sql b/server/db/schema.sql index 670ba64..1849981 100644 --- a/server/db/schema.sql +++ b/server/db/schema.sql @@ -138,4 +138,21 @@ CREATE INDEX IF NOT EXISTS idx_price_history_addr ON price_history(stock_addre CREATE INDEX IF NOT EXISTS idx_nav_history_basket ON nav_history(basket_address, timestamp); CREATE INDEX IF NOT EXISTS idx_fee_snapshots_basket ON fee_snapshots(basket_address); CREATE INDEX IF NOT EXISTS idx_basket_state_cache ON basket_state_cache(cached_at); -CREATE INDEX IF NOT EXISTS idx_creator_claimable ON creator_claimable_cache(wallet_address, cached_at); \ No newline at end of file +CREATE INDEX IF NOT EXISTS idx_creator_claimable ON creator_claimable_cache(wallet_address, cached_at); + +CREATE TABLE IF NOT EXISTS revenue_claims ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + creator_token_address TEXT NOT NULL, + basket_address TEXT NOT NULL, + claimer_address TEXT NOT NULL, + snapshot_id INTEGER NOT NULL, + usdg_amount TEXT NOT NULL, + timestamp INTEGER NOT NULL, + tx_hash TEXT NOT NULL, + log_index INTEGER NOT NULL, + UNIQUE(tx_hash, log_index) +); + +CREATE INDEX IF NOT EXISTS idx_revenue_claims_basket ON revenue_claims(basket_address); +CREATE INDEX IF NOT EXISTS idx_revenue_claims_claimer ON revenue_claims(claimer_address); +CREATE INDEX IF NOT EXISTS idx_revenue_claims_ct ON revenue_claims(creator_token_address); \ No newline at end of file diff --git a/server/indexer/indexer.go b/server/indexer/indexer.go index 9933ad6..4164431 100644 --- a/server/indexer/indexer.go +++ b/server/indexer/indexer.go @@ -30,14 +30,15 @@ const liveChunkSize = int64(50) const livePollInterval = 2 * time.Second var ( - topicBasketCreated = eventTopic("BasketCreated(address,address,address,string,string,string,address[],uint256[],bool)") - topicDeposited = eventTopic("Deposited(address,uint256,uint256,uint256)") - topicRedeemed = eventTopic("Redeemed(address,uint256,uint256,uint256)") - topicRebalanced = eventTopic("Rebalanced(address)") - topicFeeSnapshoted = eventTopic("RevenueSnapshoted(uint256,uint256,uint256)") - topicAssetAdded = eventTopic("AssetAdded(address,string,string,string,address)") - topicAssetDeact = eventTopic("AssetDeactivated(address)") - topicBasketSuspend = eventTopic("Suspended()") + topicBasketCreated = eventTopic("BasketCreated(address,address,address,string,string,string,address[],uint256[],bool)") + topicDeposited = eventTopic("Deposited(address,uint256,uint256,uint256)") + topicRedeemed = eventTopic("Redeemed(address,uint256,uint256,uint256)") + topicRebalanced = eventTopic("Rebalanced(address)") + topicFeeSnapshoted = eventTopic("RevenueSnapshoted(uint256,uint256,uint256)") + topicAssetAdded = eventTopic("AssetAdded(address,string,string,string,address)") + topicAssetDeact = eventTopic("AssetDeactivated(address)") + topicBasketSuspend = eventTopic("Suspended()") + topicRevenueClaimed = eventTopic("RevenueClaimed(address,uint256,uint256)") ) // knownTopics is the complete set of event signatures this indexer handles. @@ -52,19 +53,21 @@ var knownTopics = [][]common.Hash{{ topicAssetAdded, topicAssetDeact, topicBasketSuspend, + topicRevenueClaimed, }} var ( - depositedABI abi.Arguments - redeemedABI abi.Arguments - feeSnapshotABI abi.Arguments - basketCreatedABI abi.Arguments - assetAddedABI abi.Arguments + depositedABI abi.Arguments + redeemedABI abi.Arguments + feeSnapshotABI abi.Arguments + basketCreatedABI abi.Arguments + assetAddedABI abi.Arguments + revenueClaimedABI abi.Arguments ) var ( getSupportedAssetsABI abi.ABI - getAllBasketsABI abi.ABI + getAllBasketsABI abi.ABI basketStateABI abi.ABI basketMetaABI abi.ABI ) @@ -147,47 +150,51 @@ func init() { panic(fmt.Sprintf("indexer: parse basketMetaABI: %v", err)) } - addrType, _ := abi.NewType("address", "", nil) - stringType, _ := abi.NewType("string", "", nil) - boolType, _ := abi.NewType("bool", "", nil) - uint256Type, _ := abi.NewType("uint256", "", nil) - addrSlice, _ := abi.NewType("address[]", "", nil) + addrType, _ := abi.NewType("address", "", nil) + stringType, _ := abi.NewType("string", "", nil) + boolType, _ := abi.NewType("bool", "", nil) + uint256Type, _ := abi.NewType("uint256", "", nil) + addrSlice, _ := abi.NewType("address[]", "", nil) uint256Slice, _ := abi.NewType("uint256[]", "", nil) basketCreatedABI = abi.Arguments{ - {Name: "name", Type: stringType}, - {Name: "symbol", Type: stringType}, - {Name: "thesis", Type: stringType}, - {Name: "constituents", Type: addrSlice}, - {Name: "targetWeightsBps", Type: uint256Slice}, + {Name: "name", Type: stringType}, + {Name: "symbol", Type: stringType}, + {Name: "thesis", Type: stringType}, + {Name: "constituents", Type: addrSlice}, + {Name: "targetWeightsBps", Type: uint256Slice}, {Name: "rebalancingEnabled", Type: boolType}, } assetAddedABI = abi.Arguments{ {Name: "symbol", Type: stringType}, - {Name: "name", Type: stringType}, + {Name: "name", Type: stringType}, {Name: "sector", Type: stringType}, {Name: "oracle", Type: addrType}, } depositedABI = abi.Arguments{ - {Name: "usdgAmount", Type: uint256Type}, + {Name: "usdgAmount", Type: uint256Type}, {Name: "basketTokensMinted", Type: uint256Type}, - {Name: "feeUsdg", Type: uint256Type}, + {Name: "feeUsdg", Type: uint256Type}, } redeemedABI = abi.Arguments{ {Name: "basketTokensBurned", Type: uint256Type}, - {Name: "usdgReturned", Type: uint256Type}, - {Name: "feeUsdg", Type: uint256Type}, + {Name: "usdgReturned", Type: uint256Type}, + {Name: "feeUsdg", Type: uint256Type}, } // RevenueSnapshoted(uint256 indexed snapshotId, uint256 usdgAmount, uint256 totalSupply) // snapshotId is Topics[1]; usdgAmount and totalSupply are in Data. feeSnapshotABI = abi.Arguments{ - {Name: "usdgAmount", Type: uint256Type}, + {Name: "usdgAmount", Type: uint256Type}, {Name: "totalSupply", Type: uint256Type}, } + + revenueClaimedABI = abi.Arguments{ + {Name: "usdgAmount", Type: uint256Type}, + } } type blockTsEntry struct { @@ -332,10 +339,10 @@ func (idx *Indexer) syncAssets(client *ethclient.Client) { elem = elem.Elem() } - tokenField := elem.FieldByName("TokenAddress") + tokenField := elem.FieldByName("TokenAddress") oracleField := elem.FieldByName("Oracle") symbolField := elem.FieldByName("Symbol") - nameField := elem.FieldByName("Name") + nameField := elem.FieldByName("Name") sectorField := elem.FieldByName("Sector") activeField := elem.FieldByName("Active") @@ -344,10 +351,10 @@ func (idx *Indexer) syncAssets(client *ethclient.Client) { continue } - token := strings.ToLower(tokenField.Interface().(common.Address).Hex()) + token := strings.ToLower(tokenField.Interface().(common.Address).Hex()) oracle := strings.ToLower(oracleField.Interface().(common.Address).Hex()) symbol := symbolField.String() - name := nameField.String() + name := nameField.String() sector := sectorField.String() active := boolToInt(activeField.Bool()) @@ -421,18 +428,18 @@ func (idx *Indexer) syncBaskets(client *ethclient.Client) { elem = elem.Elem() } - basketField := elem.FieldByName("Basket") + basketField := elem.FieldByName("Basket") creatorTokenField := elem.FieldByName("CreatorToken") - creatorField := elem.FieldByName("Creator") - activeField := elem.FieldByName("Active") - createdAtField := elem.FieldByName("CreatedAt") + creatorField := elem.FieldByName("Creator") + activeField := elem.FieldByName("Active") + createdAtField := elem.FieldByName("CreatedAt") if !basketField.IsValid() || !creatorField.IsValid() || !activeField.IsValid() { log.Printf("indexer: syncBaskets: element %d missing expected fields, skipping", i) continue } - basketCommon := basketField.Interface().(common.Address) + basketCommon := basketField.Interface().(common.Address) creatorTokenCommon := creatorTokenField.Interface().(common.Address) suspended := 0 if !activeField.Bool() { @@ -592,10 +599,10 @@ func (idx *Indexer) seedOneBasket(client *ethclient.Client, basketAddr string) e return fmt.Errorf("basketState unpack: %w", err) } - constituents, ok1 := unpacked[0].([]common.Address) - targetWeights, ok2 := unpacked[1].([]*big.Int) + constituents, ok1 := unpacked[0].([]common.Address) + targetWeights, ok2 := unpacked[1].([]*big.Int) rebalancingEnabled, _ := unpacked[6].(bool) - driftThresholdBps, _ := unpacked[7].(*big.Int) + driftThresholdBps, _ := unpacked[7].(*big.Int) if !ok1 || !ok2 || len(constituents) == 0 { return fmt.Errorf("basketState: unexpected type or empty constituents") @@ -970,11 +977,14 @@ func (idx *Indexer) writeLog(tx *sql.Tx, client *ethclient.Client, vLog types.Lo } } - if topic == topicFeeSnapshoted { + if topic == topicFeeSnapshoted || topic == topicRevenueClaimed { if !idx.isKnownCreatorToken(vLog.Address, local) { return nil } - return idx.writeFeeSnapshot(tx, client, vLog, local) + if topic == topicFeeSnapshoted { + return idx.writeFeeSnapshot(tx, client, vLog, local) + } + return idx.writeRevenueClaimed(tx, client, vLog, local) } return nil @@ -1015,11 +1025,11 @@ func (idx *Indexer) writeBasketCreated(tx *sql.Tx, client *ethclient.Client, vLo return nil } - basketCommon := common.HexToAddress(vLog.Topics[1].Hex()) + basketCommon := common.HexToAddress(vLog.Topics[1].Hex()) creatorTokenCommon := common.HexToAddress(vLog.Topics[2].Hex()) - basket := strings.ToLower(basketCommon.Hex()) - creatorToken := strings.ToLower(creatorTokenCommon.Hex()) - creator := strings.ToLower(common.HexToAddress(vLog.Topics[3].Hex()).Hex()) + basket := strings.ToLower(basketCommon.Hex()) + creatorToken := strings.ToLower(creatorTokenCommon.Hex()) + creator := strings.ToLower(common.HexToAddress(vLog.Topics[3].Hex()).Hex()) decoded, err := basketCreatedABI.Unpack(vLog.Data) if err != nil || len(decoded) < 6 { @@ -1027,12 +1037,12 @@ func (idx *Indexer) writeBasketCreated(tx *sql.Tx, client *ethclient.Client, vLo return nil } - name, _ := decoded[0].(string) - symbol, _ := decoded[1].(string) - thesis, _ := decoded[2].(string) - constituents, _ := decoded[3].([]common.Address) + name, _ := decoded[0].(string) + symbol, _ := decoded[1].(string) + thesis, _ := decoded[2].(string) + constituents, _ := decoded[3].([]common.Address) targetWeights, _ := decoded[4].([]*big.Int) - rebalancing, _ := decoded[5].(bool) + rebalancing, _ := decoded[5].(bool) type constituentRow struct { addr string @@ -1109,7 +1119,7 @@ func (idx *Indexer) handleAssetAdded(vLog types.Log) { } symbol, _ := decoded[0].(string) - name, _ := decoded[1].(string) + name, _ := decoded[1].(string) sector, _ := decoded[2].(string) oracle, _ := decoded[3].(common.Address) @@ -1142,12 +1152,12 @@ func (idx *Indexer) writeDeposited(tx *sql.Tx, client *ethclient.Client, vLog ty return fmt.Errorf("Deposited unpack tx=%s: %w", vLog.TxHash.Hex(), err) } - investor := strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) - basket := strings.ToLower(vLog.Address.Hex()) - usdgAmount := decoded[0].(*big.Int) + investor := strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) + basket := strings.ToLower(vLog.Address.Hex()) + usdgAmount := decoded[0].(*big.Int) tokensMinted := decoded[1].(*big.Int) - feeUsdg := decoded[2].(*big.Int) - ts := idx.blockTimestamp(client, vLog.BlockNumber) + feeUsdg := decoded[2].(*big.Int) + ts := idx.blockTimestamp(client, vLog.BlockNumber) _, err = tx.Exec(` INSERT INTO deposits @@ -1171,12 +1181,12 @@ func (idx *Indexer) writeRedeemed(tx *sql.Tx, client *ethclient.Client, vLog typ return fmt.Errorf("Redeemed unpack tx=%s: %w", vLog.TxHash.Hex(), err) } - investor := strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) - basket := strings.ToLower(vLog.Address.Hex()) + investor := strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) + basket := strings.ToLower(vLog.Address.Hex()) tokensBurned := decoded[0].(*big.Int) usdgReturned := decoded[1].(*big.Int) - feeUsdg := decoded[2].(*big.Int) - ts := idx.blockTimestamp(client, vLog.BlockNumber) + feeUsdg := decoded[2].(*big.Int) + ts := idx.blockTimestamp(client, vLog.BlockNumber) _, err = tx.Exec(` INSERT INTO redemptions @@ -1196,8 +1206,8 @@ func (idx *Indexer) writeRebalanced(tx *sql.Tx, client *ethclient.Client, vLog t } triggeredBy := strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) - basket := strings.ToLower(vLog.Address.Hex()) - ts := idx.blockTimestamp(client, vLog.BlockNumber) + basket := strings.ToLower(vLog.Address.Hex()) + ts := idx.blockTimestamp(client, vLog.BlockNumber) _, err := tx.Exec(` INSERT INTO rebalances (basket_address, triggered_by, timestamp, tx_hash, log_index) @@ -1242,7 +1252,7 @@ func (idx *Indexer) writeFeeSnapshot(tx *sql.Tx, client *ethclient.Client, vLog } usdgAmount := decoded[0].(*big.Int) - ts := idx.blockTimestamp(client, vLog.BlockNumber) + ts := idx.blockTimestamp(client, vLog.BlockNumber) _, err = tx.Exec(` INSERT INTO fee_snapshots (basket_address, snapshot_id, usdg_amount, timestamp, tx_hash, log_index) @@ -1253,6 +1263,52 @@ func (idx *Indexer) writeFeeSnapshot(tx *sql.Tx, client *ethclient.Client, vLog return err } +// writeRevenueClaimed handles RevenueClaimed events emitted by CreatorToken +// contracts. +func (idx *Indexer) writeRevenueClaimed(tx *sql.Tx, client *ethclient.Client, vLog types.Log, local *chunkAddrs) error { + if len(vLog.Topics) < 3 { + return nil + } + + creatorTokenCommon := vLog.Address + claimer := strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) + snapshotID := new(big.Int).SetBytes(vLog.Topics[2].Bytes()).Int64() + + idx.mu.RLock() + basketCommon, found := idx.creatorTokenToBasket[creatorTokenCommon] + idx.mu.RUnlock() + if !found { + basketCommon, found = local.creatorTokens[creatorTokenCommon] + } + if !found { + log.Printf("indexer: writeRevenueClaimed: no basket for creatorToken %s — skipping", + strings.ToLower(creatorTokenCommon.Hex())) + return nil + } + + basketAddr := strings.ToLower(basketCommon.Hex()) + creatorTokenAddr := strings.ToLower(creatorTokenCommon.Hex()) + + decoded, err := revenueClaimedABI.Unpack(vLog.Data) + if err != nil || len(decoded) < 1 { + return fmt.Errorf("RevenueClaimed unpack tx=%s: %w", vLog.TxHash.Hex(), err) + } + + usdgAmount := decoded[0].(*big.Int) + ts := idx.blockTimestamp(client, vLog.BlockNumber) + + _, err = tx.Exec(` + INSERT INTO revenue_claims + (creator_token_address, basket_address, claimer_address, snapshot_id, + usdg_amount, timestamp, tx_hash, log_index) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(tx_hash, log_index) DO NOTHING`, + creatorTokenAddr, basketAddr, claimer, snapshotID, + usdgAmount.String(), ts, vLog.TxHash.Hex(), vLog.Index, + ) + return err +} + func (idx *Indexer) handleAssetDeactivated(vLog types.Log) { if len(vLog.Topics) < 2 { return @@ -1363,4 +1419,4 @@ func getEnv(key, fallback string) string { return v } return fallback -} \ No newline at end of file +}