Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions .github/workflows/update-prices.yml

This file was deleted.

24 changes: 13 additions & 11 deletions server/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,25 +1460,27 @@ func (h *handler) getClaimableSnapshots(
}, nil)
callCancel()

if err == nil {
if err != nil {
log.Printf("api: claimableRevenue RPC snapshot=%d wallet=%s: %v", snap.SnapshotID, wallet, err)
} else {
unpacked, err := claimableABI.Methods["claimableRevenue"].Outputs.Unpack(data)
if err == nil && len(unpacked) > 0 {
if amount, ok := unpacked[0].(*big.Int); ok && amount != nil {
claimable = amount.String()
}
}

h.db.Exec(`
INSERT INTO creator_claimable_cache
(wallet_address, snapshot_id, basket_address, claimable_usdg, cached_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(wallet_address, snapshot_id, basket_address) DO UPDATE SET
claimable_usdg = excluded.claimable_usdg,
cached_at = excluded.cached_at`,
wallet, snap.SnapshotID, basketAddr, claimable, now,
)
}
}

h.db.Exec(`
INSERT INTO creator_claimable_cache
(wallet_address, snapshot_id, basket_address, claimable_usdg, cached_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(wallet_address, snapshot_id, basket_address) DO UPDATE SET
claimable_usdg = excluded.claimable_usdg,
cached_at = excluded.cached_at`,
wallet, snap.SnapshotID, basketAddr, claimable, now,
)
}

snap.ClaimableUsdg = claimable
Expand Down
88 changes: 60 additions & 28 deletions server/db/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@ import (
)

// Migrate applies schema migrations that cannot be expressed as CREATE TABLE IF NOT EXISTS
// in schema.sql because they modify existing tables on a live volume.
// in schema.sql because they modify existing tables on a live Railway volume.
//
// Each migration is idempotent:
// - ALTER TABLE statements are skipped if the column already exists.
// - CREATE UNIQUE INDEX IF NOT EXISTS is safe to re-run.
// - CREATE TABLE IF NOT EXISTS is safe to re-run.
// - UPDATE/DELETE statements whose effects are already applied are no-ops.
//
// Call Migrate() in main.go immediately after db.Open() and before starting
// the indexer. A migration failure is fatal.
func (d *DB) Migrate() error {
migrations := []struct {
name string
sql string
}{
// Phase 1 — add log_index to event tables for idempotent re-scan inserts.
{
name: "deposits.log_index",
sql: `ALTER TABLE deposits ADD COLUMN log_index INTEGER NOT NULL DEFAULT 0`,
Expand All @@ -28,49 +38,24 @@ func (d *DB) Migrate() error {
name: "fee_snapshots.log_index",
sql: `ALTER TABLE fee_snapshots ADD COLUMN log_index INTEGER NOT NULL DEFAULT 0`,
},
// Deduplicate before creating unique indexes — production volume may
// contain duplicate rows written by the old indexer before this fix.
// Keep the row with the lowest rowid (earliest write) and discard the rest.
{
name: "deduplicate_deposits",
sql: `DELETE FROM deposits WHERE rowid NOT IN (
SELECT MIN(rowid) FROM deposits GROUP BY tx_hash, log_index
)`,
},
// Unique deduplication indexes.
{
name: "idx_deposits_dedup",
sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_deposits_dedup ON deposits(tx_hash, log_index)`,
},
{
name: "deduplicate_redemptions",
sql: `DELETE FROM redemptions WHERE rowid NOT IN (
SELECT MIN(rowid) FROM redemptions GROUP BY tx_hash, log_index
)`,
},
{
name: "idx_redemptions_dedup",
sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_redemptions_dedup ON redemptions(tx_hash, log_index)`,
},
{
name: "deduplicate_rebalances",
sql: `DELETE FROM rebalances WHERE rowid NOT IN (
SELECT MIN(rowid) FROM rebalances GROUP BY tx_hash, log_index
)`,
},
{
name: "idx_rebalances_dedup",
sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_rebalances_dedup ON rebalances(tx_hash, log_index)`,
},
{
name: "deduplicate_fee_snapshots",
sql: `DELETE FROM fee_snapshots WHERE rowid NOT IN (
SELECT MIN(rowid) FROM fee_snapshots GROUP BY tx_hash, log_index
)`,
},
{
name: "idx_fee_snapshots_dedup",
sql: `CREATE UNIQUE INDEX IF NOT EXISTS idx_fee_snapshots_dedup ON fee_snapshots(tx_hash, log_index)`,
},
// Seed failure tracker.
{
name: "basket_seed_failures",
sql: `CREATE TABLE IF NOT EXISTS basket_seed_failures (
Expand All @@ -79,6 +64,53 @@ func (d *DB) Migrate() error {
last_attempt INTEGER NOT NULL
)`,
},

// Phase 2 — fix RevenueSnapshoted indexing.
//
// Root cause: the indexer never included CreatorToken contract addresses in
// its FilterLogs/WebSocket subscription filter, so every RevenueSnapshoted
// event was silently dropped. Additionally, even if the event had been
// received, handleFeeSnapshot stored vLog.Address (the CreatorToken address)
// as basket_address — the wrong value.
//
// Fix applied in indexer.go:
// 1. creatorTokenToBasket map tracks CreatorToken → basket address.
// 2. filterAddresses() now includes all CreatorToken addresses.
// 3. writeFeeSnapshot resolves the basket from creatorTokenToBasket (DB fallback).
//
// To make the already-emitted RevenueSnapshoted events visible we must:
// a. Delete any fee_snapshot rows written with the wrong basket_address
// (i.e. rows whose basket_address is a creator token address, not a basket).
// These were stored under the creator token address due to the old bug.
// b. Reset the event cursor to the deploy block so the historical scan
// replays from the beginning and re-indexes all events including the
// missed RevenueSnapshoted events with the correct basket_address.
// c. Clear the creator_claimable_cache so stale zeros are not returned
// while the rescan is in progress.
//
// All three steps are idempotent: deleting already-absent rows is a no-op,
// setting the cursor to a value it already has is a no-op, and deleting
// cache rows that don't exist is a no-op.
{
name: "fix_fee_snapshots_wrong_basket_address",
sql: `DELETE FROM fee_snapshots
WHERE basket_address NOT IN (SELECT address FROM baskets)`,
},
{
name: "reset_event_cursor_for_creator_token_rescan",
sql: `UPDATE sync_cursors SET block_num = 68391146 WHERE key = 'events'`,
},
{
name: "clear_creator_claimable_cache",
sql: `DELETE FROM creator_claimable_cache`,
},
// Index creator_token_address so the writeFeeSnapshot DB fallback
// (SELECT address FROM baskets WHERE creator_token_address = ?) is
// an index seek rather than a full table scan during historical rescan.
{
name: "idx_baskets_creator_token",
sql: `CREATE INDEX IF NOT EXISTS idx_baskets_creator_token ON baskets(creator_token_address)`,
},
}

for _, m := range migrations {
Expand Down
Loading
Loading