From f9159da1df730c340f21305ad5f87cbcb21f293a Mon Sep 17 00:00:00 2001 From: 0xull Date: Tue, 9 Jun 2026 18:29:26 +0100 Subject: [PATCH 1/3] ops(ci): remove the unused CI file Signed-off-by: 0xull --- .github/workflows/update-prices.yml | 30 ----------------------------- 1 file changed, 30 deletions(-) delete mode 100644 .github/workflows/update-prices.yml diff --git a/.github/workflows/update-prices.yml b/.github/workflows/update-prices.yml deleted file mode 100644 index 81b743c..0000000 --- a/.github/workflows/update-prices.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: update-prices - -on: - schedule: - - cron: '*/5 * * * *' - workflow_dispatch: - -jobs: - update: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Install Foundry - uses: foundry-rs/foundry-toolchain@v1 - - - name: Run price update (first pass) - env: - PRIVATE_KEY: ${{ secrets.PRICE_UPDATER_PRIVATE_KEY }} - RPC_URL: https://rpc.testnet.chain.robinhood.com - run: bash scripts/update-prices.sh - - - name: Wait 2.5 minutes - run: sleep 150 - - - name: Run price update (second pass) - env: - PRIVATE_KEY: ${{ secrets.PRICE_UPDATER_PRIVATE_KEY }} - RPC_URL: https://rpc.testnet.chain.robinhood.com - run: bash scripts/update-prices.sh \ No newline at end of file From 48d001703b06b2425953ff27629358b3e7651143 Mon Sep 17 00:00:00 2001 From: 0xull Date: Thu, 11 Jun 2026 08:45:35 +0100 Subject: [PATCH 2/3] [server]: fix to index revenue from the snapshots on creator tokens Signed-off-by: 0xull --- server/db/migrate.go | 88 ++++++++++++++++++--------- server/indexer/indexer.go | 124 ++++++++++++++++++++++---------------- 2 files changed, 133 insertions(+), 79 deletions(-) diff --git a/server/db/migrate.go b/server/db/migrate.go index e4a09ec..8cfc5c2 100644 --- a/server/db/migrate.go +++ b/server/db/migrate.go @@ -6,12 +6,22 @@ import ( ) // Migrate applies schema migrations that cannot be expressed as CREATE TABLE IF NOT EXISTS -// in schema.sql because they modify existing tables on a live volume. +// in schema.sql because they modify existing tables on a live Railway volume. +// +// Each migration is idempotent: +// - ALTER TABLE statements are skipped if the column already exists. +// - CREATE UNIQUE INDEX IF NOT EXISTS is safe to re-run. +// - CREATE TABLE IF NOT EXISTS is safe to re-run. +// - UPDATE/DELETE statements whose effects are already applied are no-ops. +// +// Call Migrate() in main.go immediately after db.Open() and before starting +// the indexer. A migration failure is fatal. func (d *DB) Migrate() error { migrations := []struct { name string sql string }{ + // Phase 1 — add log_index to event tables for idempotent re-scan inserts. { name: "deposits.log_index", sql: `ALTER TABLE deposits ADD COLUMN log_index INTEGER NOT NULL DEFAULT 0`, @@ -28,49 +38,24 @@ func (d *DB) Migrate() error { name: "fee_snapshots.log_index", sql: `ALTER TABLE fee_snapshots ADD COLUMN log_index INTEGER NOT NULL DEFAULT 0`, }, - // Deduplicate before creating unique indexes — production volume may - // contain duplicate rows written by the old indexer before this fix. - // Keep the row with the lowest rowid (earliest write) and discard the rest. - { - name: "deduplicate_deposits", - sql: `DELETE FROM deposits WHERE rowid NOT IN ( - SELECT MIN(rowid) FROM deposits GROUP BY tx_hash, log_index - )`, - }, + // Unique deduplication indexes. { name: "idx_deposits_dedup", sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_deposits_dedup ON deposits(tx_hash, log_index)`, }, - { - name: "deduplicate_redemptions", - sql: `DELETE FROM redemptions WHERE rowid NOT IN ( - SELECT MIN(rowid) FROM redemptions GROUP BY tx_hash, log_index - )`, - }, { name: "idx_redemptions_dedup", sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_redemptions_dedup ON redemptions(tx_hash, log_index)`, }, - { - name: "deduplicate_rebalances", - sql: `DELETE FROM rebalances WHERE rowid NOT IN ( - SELECT MIN(rowid) FROM rebalances GROUP BY tx_hash, log_index - )`, - }, { name: "idx_rebalances_dedup", sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_rebalances_dedup ON rebalances(tx_hash, log_index)`, }, - { - name: "deduplicate_fee_snapshots", - sql: `DELETE FROM fee_snapshots WHERE rowid NOT IN ( - SELECT MIN(rowid) FROM fee_snapshots GROUP BY tx_hash, log_index - )`, - }, { name: "idx_fee_snapshots_dedup", sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_fee_snapshots_dedup ON fee_snapshots(tx_hash, log_index)`, }, + // Seed failure tracker. { name: "basket_seed_failures", sql: `CREATE TABLE IF NOT EXISTS basket_seed_failures ( @@ -79,6 +64,53 @@ func (d *DB) Migrate() error { last_attempt INTEGER NOT NULL )`, }, + + // Phase 2 — fix RevenueSnapshoted indexing. + // + // Root cause: the indexer never included CreatorToken contract addresses in + // its FilterLogs/WebSocket subscription filter, so every RevenueSnapshoted + // event was silently dropped. Additionally, even if the event had been + // received, handleFeeSnapshot stored vLog.Address (the CreatorToken address) + // as basket_address — the wrong value. + // + // Fix applied in indexer.go: + // 1. creatorTokenToBasket map tracks CreatorToken → basket address. + // 2. filterAddresses() now includes all CreatorToken addresses. + // 3. writeFeeSnapshot resolves the basket from creatorTokenToBasket (DB fallback). + // + // To make the already-emitted RevenueSnapshoted events visible we must: + // a. Delete any fee_snapshot rows written with the wrong basket_address + // (i.e. rows whose basket_address is a creator token address, not a basket). + // These were stored under the creator token address due to the old bug. + // b. Reset the event cursor to the deploy block so the historical scan + // replays from the beginning and re-indexes all events including the + // missed RevenueSnapshoted events with the correct basket_address. + // c. Clear the creator_claimable_cache so stale zeros are not returned + // while the rescan is in progress. + // + // All three steps are idempotent: deleting already-absent rows is a no-op, + // setting the cursor to a value it already has is a no-op, and deleting + // cache rows that don't exist is a no-op. + { + name: "fix_fee_snapshots_wrong_basket_address", + sql: `DELETE FROM fee_snapshots + WHERE basket_address NOT IN (SELECT address FROM baskets)`, + }, + { + name: "reset_event_cursor_for_creator_token_rescan", + sql: `UPDATE sync_cursors SET block_num = 68391146 WHERE key = 'events'`, + }, + { + name: "clear_creator_claimable_cache", + sql: `DELETE FROM creator_claimable_cache`, + }, + // Index creator_token_address so the writeFeeSnapshot DB fallback + // (SELECT address FROM baskets WHERE creator_token_address = ?) is + // an index seek rather than a full table scan during historical rescan. + { + name: "idx_baskets_creator_token", + sql: `CREATE INDEX IF NOT EXISTS idx_baskets_creator_token ON baskets(creator_token_address)`, + }, } for _, m := range migrations { diff --git a/server/indexer/indexer.go b/server/indexer/indexer.go index 92f55d9..5031714 100644 --- a/server/indexer/indexer.go +++ b/server/indexer/indexer.go @@ -22,24 +22,11 @@ import ( "github.com/ethereum/go-ethereum/ethclient" ) -// rpcTimeout is the per-call deadline applied to every RPC request. const rpcTimeout = 15 * time.Second - -// chunkSize is the number of blocks fetched per FilterLogs call. const chunkSize = int64(2000) - -// seedMaxAttempts is the maximum number of times seedOneBasket will be -// retried for a given basket across all startups before it is skipped permanently. const seedMaxAttempts = 5 - -// blockTsCacheMax is the maximum number of block timestamps held in the -// bounded FIFO cache. const blockTsCacheMax = 4096 - -// reconnectBaseDelay is the starting delay for WebSocket reconnection backoff. const reconnectBaseDelay = 1 * time.Second - -// reconnectMaxDelay is the ceiling for WebSocket reconnection backoff. const reconnectMaxDelay = 5 * time.Minute var ( @@ -169,14 +156,12 @@ func init() { {Name: "oracle", Type: addrType}, } - // Deposited(address indexed investor, uint256 usdgAmount, uint256 basketTokensMinted, uint256 feeUsdg) depositedABI = abi.Arguments{ {Name: "usdgAmount", Type: uint256Type}, {Name: "basketTokensMinted", Type: uint256Type}, {Name: "feeUsdg", Type: uint256Type}, } - // Redeemed(address indexed investor, uint256 basketTokensBurned, uint256 usdgReturned, uint256 feeUsdg) redeemedABI = abi.Arguments{ {Name: "basketTokensBurned", Type: uint256Type}, {Name: "usdgReturned", Type: uint256Type}, @@ -184,13 +169,13 @@ func init() { } // RevenueSnapshoted(uint256 indexed snapshotId, uint256 usdgAmount, uint256 totalSupply) + // snapshotId is Topics[1]; usdgAmount and totalSupply are in Data. feeSnapshotABI = abi.Arguments{ {Name: "usdgAmount", Type: uint256Type}, {Name: "totalSupply", Type: uint256Type}, } } -// blockTsEntry is a single slot in the bounded FIFO timestamp cache. type blockTsEntry struct { blockNumber uint64 timestamp uint64 @@ -206,11 +191,16 @@ type Indexer struct { deployBlock int64 db *db.DB + // basketAddrs is the set of known basket proxy addresses for filter construction. mu sync.RWMutex basketAddrs map[common.Address]bool - // blockTs is a bounded FIFO cache of block number → Unix timestamp. - // Protected by blockTsMu. + // creatorTokenAddrs maps each CreatorToken address → its basket address. + // RevenueSnapshoted is emitted by CreatorToken contracts, not basket proxies. + // These addresses must be included in the event filter, and the mapping lets + // handleFeeSnapshot store the correct basket_address in fee_snapshots. + creatorTokenToBasket map[common.Address]common.Address + blockTsMu sync.Mutex blockTs map[uint64]uint64 blockTsFIFO []blockTsEntry @@ -229,21 +219,21 @@ func New( } return &Indexer{ - ctx: ctx, - wsURL: wsURL, - rpcURL: rpcURL, - registryAddr: common.HexToAddress(registryAddr), - factoryAddr: common.HexToAddress(factoryAddrStr), - deployBlock: deployBlock, - db: database, - basketAddrs: make(map[common.Address]bool), - blockTs: make(map[uint64]uint64, blockTsCacheMax), - blockTsFIFO: make([]blockTsEntry, 0, blockTsCacheMax), + ctx: ctx, + wsURL: wsURL, + rpcURL: rpcURL, + registryAddr: common.HexToAddress(registryAddr), + factoryAddr: common.HexToAddress(factoryAddrStr), + deployBlock: deployBlock, + db: database, + basketAddrs: make(map[common.Address]bool), + creatorTokenToBasket: make(map[common.Address]common.Address), + blockTs: make(map[uint64]uint64, blockTsCacheMax), + blockTsFIFO: make([]blockTsEntry, 0, blockTsCacheMax), }, nil } -// Run starts the indexer. It syncs chain state, then launches the historical -// scan and live subscription concurrently so no events are missed during catchup. +// Run starts the indexer. func (idx *Indexer) Run() { idx.syncFromChain() @@ -432,8 +422,11 @@ func (idx *Indexer) syncBaskets(client *ethclient.Client) { continue } - basket := strings.ToLower(basketField.Interface().(common.Address).Hex()) - creatorToken := strings.ToLower(creatorTokenField.Interface().(common.Address).Hex()) + basketCommon := basketField.Interface().(common.Address) + creatorTokenCommon := creatorTokenField.Interface().(common.Address) + + basket := strings.ToLower(basketCommon.Hex()) + creatorToken := strings.ToLower(creatorTokenCommon.Hex()) creator := strings.ToLower(creatorField.Interface().(common.Address).Hex()) createdAt := createdAtField.Interface().(*big.Int).Int64() suspended := 0 @@ -457,7 +450,8 @@ func (idx *Indexer) syncBaskets(client *ethclient.Client) { } idx.mu.Lock() - idx.basketAddrs[common.HexToAddress(basket)] = true + idx.basketAddrs[basketCommon] = true + idx.creatorTokenToBasket[creatorTokenCommon] = basketCommon idx.mu.Unlock() count++ } @@ -615,8 +609,6 @@ func (idx *Indexer) seedOneBasket(client *ethclient.Client, basketAddr string) e return err } - // Resolve constituent symbols before opening the write transaction to - // avoid holding a write lock during reads. type constituentRow struct { addr string symbol string @@ -787,10 +779,6 @@ func (idx *Indexer) writeChunkAtomic(client *ethclient.Client, logs []types.Log, } if vLog.Topics[0] == topicBasketCreated { - // BasketCreated writes to two tables and updates basketAddrs. - // It needs its own transaction. Commit the current chunk tx - // first so its writes are durable, handle the basket, then - // open a fresh chunk tx for any remaining logs. if err := tx.Commit(); err != nil { return fmt.Errorf("pre-basket commit: %w", err) } @@ -924,9 +912,11 @@ func (idx *Indexer) handleBasketCreated(client *ethclient.Client, vLog types.Log return } - basket := strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) - creatorToken := strings.ToLower(common.HexToAddress(vLog.Topics[2].Hex()).Hex()) - creator := strings.ToLower(common.HexToAddress(vLog.Topics[3].Hex()).Hex()) + basketCommon := common.HexToAddress(vLog.Topics[1].Hex()) + creatorTokenCommon := common.HexToAddress(vLog.Topics[2].Hex()) + basket := strings.ToLower(basketCommon.Hex()) + creatorToken := strings.ToLower(creatorTokenCommon.Hex()) + creator := strings.ToLower(common.HexToAddress(vLog.Topics[3].Hex()).Hex()) decoded, err := basketCreatedABI.Unpack(vLog.Data) if err != nil || len(decoded) < 6 { @@ -1011,10 +1001,11 @@ func (idx *Indexer) handleBasketCreated(client *ethclient.Client, vLog types.Log } idx.mu.Lock() - idx.basketAddrs[common.HexToAddress(basket)] = true + idx.basketAddrs[basketCommon] = true + idx.creatorTokenToBasket[creatorTokenCommon] = basketCommon idx.mu.Unlock() - log.Printf("indexer: basket %s indexed with %d constituents", basket, len(cRows)) + log.Printf("indexer: basket %s indexed with %d constituents (creatorToken=%s)", basket, len(cRows), creatorToken) } func (idx *Indexer) handleAssetAdded(vLog types.Log) { @@ -1130,11 +1121,40 @@ func (idx *Indexer) writeRebalanced(tx *sql.Tx, client *ethclient.Client, vLog t return err } +// writeFeeSnapshot handles RevenueSnapshoted events emitted by CreatorToken contracts. func (idx *Indexer) writeFeeSnapshot(tx *sql.Tx, client *ethclient.Client, vLog types.Log) error { if len(vLog.Topics) < 2 { return nil } + creatorTokenAddr := strings.ToLower(vLog.Address.Hex()) + creatorTokenCommon := vLog.Address + + idx.mu.RLock() + basketCommon, found := idx.creatorTokenToBasket[creatorTokenCommon] + idx.mu.RUnlock() + + var basketAddr string + if found { + basketAddr = strings.ToLower(basketCommon.Hex()) + } else { + err := idx.db.QueryRow( + `SELECT address FROM baskets WHERE creator_token_address = ?`, + creatorTokenAddr, + ).Scan(&basketAddr) + if errors.Is(err, sql.ErrNoRows) { + log.Printf("indexer: writeFeeSnapshot: no basket found for creatorToken %s — skipping", creatorTokenAddr) + return nil + } + if err != nil { + return fmt.Errorf("writeFeeSnapshot basket lookup for creatorToken %s: %w", creatorTokenAddr, err) + } + // Populate the in-memory map so subsequent events don't hit the DB. + idx.mu.Lock() + idx.creatorTokenToBasket[creatorTokenCommon] = common.HexToAddress(basketAddr) + idx.mu.Unlock() + } + snapshotID := new(big.Int).SetBytes(vLog.Topics[1].Bytes()).Int64() decoded, err := feeSnapshotABI.Unpack(vLog.Data) @@ -1142,7 +1162,6 @@ func (idx *Indexer) writeFeeSnapshot(tx *sql.Tx, client *ethclient.Client, vLog return fmt.Errorf("RevenueSnapshoted unpack tx=%s: %w", vLog.TxHash.Hex(), err) } - basket := strings.ToLower(vLog.Address.Hex()) usdgAmount := decoded[0].(*big.Int) ts := idx.blockTimestamp(client, vLog.BlockNumber) @@ -1150,7 +1169,7 @@ func (idx *Indexer) writeFeeSnapshot(tx *sql.Tx, client *ethclient.Client, vLog INSERT INTO fee_snapshots (basket_address, snapshot_id, usdg_amount, timestamp, tx_hash, log_index) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(tx_hash, log_index) DO NOTHING`, - basket, snapshotID, usdgAmount.String(), ts, vLog.TxHash.Hex(), vLog.Index, + basketAddr, snapshotID, usdgAmount.String(), ts, vLog.TxHash.Hex(), vLog.Index, ) return err } @@ -1218,8 +1237,7 @@ func (idx *Indexer) blockTimestamp(client *ethclient.Client, blockNumber uint64) } // subscribe opens a WebSocket connection and processes live events until -// ctx is cancelled or the connection drops. Events are drained in a separate -// goroutine so the receive loop never blocks on handler RPC calls. +// ctx is cancelled or the connection drops. func (idx *Indexer) subscribe() error { dialCtx, dialCancel := context.WithTimeout(idx.ctx, rpcTimeout) defer dialCancel() @@ -1258,18 +1276,22 @@ func (idx *Indexer) subscribe() error { } } -// filterAddresses returns the registry, factory, and all known basket proxy -// addresses to include in a FilterQuery. +// filterAddresses returns all addresses to include in FilterQuery: +// registry, factory, all known basket proxies, and all known creator token contracts. func (idx *Indexer) filterAddresses() []common.Address { idx.mu.RLock() defer idx.mu.RUnlock() - addresses := make([]common.Address, 0, len(idx.basketAddrs)+2) + addresses := make([]common.Address, 0, len(idx.basketAddrs)+len(idx.creatorTokenToBasket)+2) addresses = append(addresses, idx.registryAddr) addresses = append(addresses, idx.factoryAddr) for addr := range idx.basketAddrs { addresses = append(addresses, addr) } + // Include all known creator token addresses so RevenueSnapshoted events are received. + for creatorToken := range idx.creatorTokenToBasket { + addresses = append(addresses, creatorToken) + } return addresses } From a9643560343e8bbc22ad1bda439a6e75fa2435d9 Mon Sep 17 00:00:00 2001 From: 0xull Date: Thu, 11 Jun 2026 09:08:57 +0100 Subject: [PATCH 3/3] [server]: update the api handler for claiming snapshot revenues Signed-off-by: 0xull --- server/api/api.go | 24 +++++++++++++----------- server/indexer/indexer_db_test.go | 22 +++++++++++++++++----- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/server/api/api.go b/server/api/api.go index 808d717..2a48ea5 100644 --- a/server/api/api.go +++ b/server/api/api.go @@ -1460,25 +1460,27 @@ func (h *handler) getClaimableSnapshots( }, nil) callCancel() - if err == nil { + if err != nil { + log.Printf("api: claimableRevenue RPC snapshot=%d wallet=%s: %v", snap.SnapshotID, wallet, err) + } else { unpacked, err := claimableABI.Methods["claimableRevenue"].Outputs.Unpack(data) if err == nil && len(unpacked) > 0 { if amount, ok := unpacked[0].(*big.Int); ok && amount != nil { claimable = amount.String() } } + + h.db.Exec(` + INSERT INTO creator_claimable_cache + (wallet_address, snapshot_id, basket_address, claimable_usdg, cached_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(wallet_address, snapshot_id, basket_address) DO UPDATE SET + claimable_usdg = excluded.claimable_usdg, + cached_at = excluded.cached_at`, + wallet, snap.SnapshotID, basketAddr, claimable, now, + ) } } - - h.db.Exec(` - INSERT INTO creator_claimable_cache - (wallet_address, snapshot_id, basket_address, claimable_usdg, cached_at) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(wallet_address, snapshot_id, basket_address) DO UPDATE SET - claimable_usdg = excluded.claimable_usdg, - cached_at = excluded.cached_at`, - wallet, snap.SnapshotID, basketAddr, claimable, now, - ) } snap.ClaimableUsdg = claimable diff --git a/server/indexer/indexer_db_test.go b/server/indexer/indexer_db_test.go index 1bfbe02..f4f7740 100644 --- a/server/indexer/indexer_db_test.go +++ b/server/indexer/indexer_db_test.go @@ -974,11 +974,23 @@ func TestWriteFeeSnapshot_WritesCorrectSnapshotID(t *testing.T) { d := newTestDB(t) idx := newTestIndexer(t, d) - basketAddr := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") - snapshotID := int64(7) - usdgAmount := big.NewInt(80_000) + basketAddr := common.HexToAddress("0x474835c4da0393bc87d4e85e36fdce3f56edeaa6") + creatorTokenAddr := common.HexToAddress("0x29ba5c3470b3a6c06bd6cce2e43c019d846c01c0") + + if _, err := d.Exec(` + INSERT INTO baskets (address, creator_token_address, creator_address, name, symbol, thesis, + rebalancing_enabled, created_at, created_tx, suspended) + VALUES (?, ?, '0x0', '', '', '', 0, 0, '', 0)`, + strings.ToLower(basketAddr.Hex()), + strings.ToLower(creatorTokenAddr.Hex()), + ); err != nil { + t.Fatalf("seed basket: %v", err) + } + + snapshotID := int64(7) + usdgAmount := big.NewInt(80_000) totalSupply := new(big.Int).Mul(big.NewInt(1_000_000), new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil)) - data, _ := feeSnapshotABI.Pack(usdgAmount, totalSupply) + data, _ := feeSnapshotABI.Pack(usdgAmount, totalSupply) vLog := types.Log{ Topics: []common.Hash{ @@ -989,7 +1001,7 @@ func TestWriteFeeSnapshot_WritesCorrectSnapshotID(t *testing.T) { BlockNumber: 300, TxHash: common.HexToHash("0xsnap01"), Index: 0, - Address: basketAddr, + Address: creatorTokenAddr, } tx, err := d.Begin()