diff --git a/internal/migrations/032-order-book-actions.sql b/internal/migrations/032-order-book-actions.sql index 3cd04ef83..8e3d47f8b 100644 --- a/internal/migrations/032-order-book-actions.sql +++ b/internal/migrations/032-order-book-actions.sql @@ -311,7 +311,7 @@ PUBLIC VIEW RETURNS (market_exists BOOLEAN) { -- ============================================================================= -- ============================================================================= --- MATCHING ENGINE (Issue 6) +-- MATCHING ENGINE -- ============================================================================= /** * The matching engine automatically executes trades by matching compatible orders. @@ -1064,7 +1064,6 @@ CREATE OR REPLACE ACTION place_buy_order( -- ========================================================================== -- Attempt to match this buy order with existing sell orders - -- Note: This is a stub in Issue 2, full implementation in Issue 6 match_orders($query_id, $outcome, $price); -- Success: Order placed (may be partially or fully matched by future matching engine) @@ -1239,7 +1238,6 @@ CREATE OR REPLACE ACTION place_sell_order( -- ========================================================================== -- Attempt to match this sell order with existing buy orders - -- Note: This is a stub in Issue 3, full implementation in Issue 6 match_orders($query_id, $outcome, $price); -- Success: Order placed (may be partially or fully matched by future matching engine) @@ -1273,7 +1271,7 @@ CREATE OR REPLACE ACTION place_sell_order( * - NO price: 100 - $true_price (e.g., 100 - 56 = 44 cents = $0.44) * - Total: Always $1.00 per share pair * - * LP Reward Eligibility (Issue 9 - TO BE IMPLEMENTED): + * LP Reward Eligibility: * * To qualify for LP rewards, ALL of the following must be true: * 1. BOTH buy and sell prices must be within max_spread of market midpoint (50): @@ -1290,8 +1288,6 @@ CREATE OR REPLACE ACTION place_sell_order( * - Split @ $0.56/$0.44: Distance = 6¢ → OUTSIDE spread → NOT qualified ❌ * - Split @ $0.52/$0.48: Distance = 2¢ → WITHIN spread → QUALIFIED ✅ * - * Implementation: LP tracking deferred to Issue 9 (LP rewards and fee distribution) - * * Examples: * place_split_limit_order(1, 56, 100) -- Mint 100 pairs: hold YES, sell NO @ $0.44 * place_split_limit_order(1, 35, 50) -- Mint 50 pairs: hold YES, sell NO @ $0.65 @@ -1453,7 +1449,6 @@ CREATE OR REPLACE ACTION place_split_limit_order( -- ========================================================================== -- Attempt to match the NO sell order with existing buy orders - -- Note: This is a stub in Issue 4, full implementation in Issue 6 -- Match is attempted on the FALSE (NO) outcome at the false_price match_orders($query_id, FALSE, $false_price); @@ -1870,7 +1865,7 @@ CREATE OR REPLACE ACTION change_bid( -- SECTION 9: TRIGGER MATCHING ENGINE -- ========================================================================== - -- Try to match new order immediately (stub in Issue 5B, full implementation in Issue 6) + -- Try to match new order immediately -- Note: match_orders expects positive price (1-99), so use $new_abs_price not $new_price match_orders($query_id, $outcome, $new_abs_price); @@ -2119,7 +2114,7 @@ CREATE OR REPLACE ACTION change_ask( -- SECTION 8: TRIGGER MATCHING ENGINE -- ========================================================================== - -- Try to match new order immediately (stub in Issue 5B, full implementation in Issue 6) + -- Try to match new order immediately match_orders($query_id, $outcome, $new_price); -- Success: Sell order price modified atomically @@ -2145,7 +2140,7 @@ CREATE OR REPLACE ACTION change_ask( * * After settlement: * - All trading is permanently blocked (buy, sell, split, cancel, change orders) - * - Users must call claim_payout() to redeem winning shares (Issue 8) + * - Users must call claim_payout() to redeem winning shares * - Market state is frozen and cannot be changed * * Parameters: @@ -2163,9 +2158,6 @@ CREATE OR REPLACE ACTION change_ask( * * Examples: * settle_market(1) -- Settle market ID 1 using its attestation - * - * Note: This action is part of Issue 7 (Manual Settlement). Automatic settlement - * via extension (Issue 7B) will call this action on a schedule. */ CREATE OR REPLACE ACTION settle_market( $query_id INT @@ -2264,7 +2256,6 @@ CREATE OR REPLACE ACTION settle_market( settled_at = @block_timestamp WHERE id = $query_id; - -- Issue 8: Automatic atomic settlement processing -- Process all payouts, refunds, and fee collection atomically process_settlement($query_id, $winning_outcome); }; diff --git a/internal/migrations/033-order-book-settlement.sql b/internal/migrations/033-order-book-settlement.sql index 61a765eb0..82a6a332d 100644 --- a/internal/migrations/033-order-book-settlement.sql +++ b/internal/migrations/033-order-book-settlement.sql @@ -6,7 +6,7 @@ * - Pay winners (shares × $1.00 - 2% redemption fee) * - Refund open buy orders (no fee) * - Delete all positions atomically - * - Collect fees in vault (Issue 9 will distribute them) + * - Collect fees in vault * * Implementation Note: * Uses CTE + ARRAY_AGG to collect all payout data in a single query, then @@ -55,7 +55,7 @@ CREATE OR REPLACE ACTION ob_batch_unlock_collateral( }; -- ============================================================================ --- Fee Distribution to Liquidity Providers (Issue 9B) +-- Fee Distribution to Liquidity Providers -- ============================================================================ /** @@ -64,7 +64,7 @@ CREATE OR REPLACE ACTION ob_batch_unlock_collateral( * Distributes settlement fees to liquidity providers based on sampled rewards. * Called automatically after winner payouts in process_settlement(). * - * DYNAMIC REWARDS MODEL (Issue 9 Refactor): + * DYNAMIC REWARDS MODEL: * Uses the ob_rewards table populated by periodic sample_lp_rewards() calls. * Fees are distributed proportionally across all sampled blocks. * @@ -77,18 +77,27 @@ CREATE OR REPLACE ACTION ob_batch_unlock_collateral( * This approach minimizes truncation (single point vs per-block) and ensures * zero fee loss by giving the remainder to the first participant. * + * AUDIT TRAIL: + * Before deleting ob_rewards, creates immutable records in: + * - ob_fee_distributions: Summary (query_id, total_fees, LP count, timestamp) + * - ob_fee_distribution_details: Per-LP rewards (participant_id, amount, percent) + * + * This ensures full traceability for compliance and user verification. + * * Parameters: * - $query_id: Settled market ID * - $total_fees: Total fees collected (2% of redemptions), in wei * * Behavior: - * - No samples → fees remain in vault (safe accumulation) + * - No samples → fees remain in vault (safe accumulation), NO audit record * - Distributes proportionally across sampled blocks with zero loss + * - Creates audit records in ob_fee_distributions tables * - Deletes processed rewards from ob_rewards table * * Dependencies: - * - ob_rewards table (created in migration 034-order-book-rewards.sql) - * - ob_batch_unlock_collateral() helper (defined above in this migration) + * - ob_rewards table (created in migration 034) + * - ob_fee_distributions tables (created in migration 036) + * - ob_batch_unlock_collateral() helper (defined above) * - ethereum_bridge.unlock() (from Migration 031) */ CREATE OR REPLACE ACTION distribute_fees( @@ -175,7 +184,100 @@ CREATE OR REPLACE ACTION distribute_fees( ob_batch_unlock_collateral($wallet_addresses, $amounts); } + -- Step 5.5: CREATE AUDIT RECORDS + -- Insert distribution summary and per-LP details BEFORE deleting ob_rewards. + -- This ensures full traceability for compliance and user verification. + + -- Only create audit if distribution actually occurred + if $wallet_addresses IS NOT NULL AND COALESCE(array_length($wallet_addresses), 0) > 0 { + -- Generate distribution ID (MAX+1 pattern, safe in Kwil sequential execution) + $distribution_id INT; + for $row in SELECT COALESCE(MAX(id), 0) + 1 as next_id FROM ob_fee_distributions { + $distribution_id := $row.next_id; + } + + -- Insert distribution summary + INSERT INTO ob_fee_distributions ( + id, + query_id, + total_fees_distributed, + total_lp_count, + block_count, + distributed_at + ) VALUES ( + $distribution_id, + $query_id, + $total_fees, + COALESCE(array_length($wallet_addresses), 0), + $block_count, + @block_timestamp + ); + + -- Insert per-LP details + -- Match the distributed amounts (from arrays) with participant data from ob_rewards + -- This creates audit records showing exactly who got what + $idx INT := 1; + for $w_row in SELECT wallet FROM UNNEST($wallet_addresses) AS w(wallet) { + $wallet_hex TEXT := $w_row.wallet; + + -- Get corresponding amount (arrays are same length, ordered by participant_id) + $reward_amount NUMERIC(78, 0); + for $a_row in + SELECT amount + FROM UNNEST($amounts) AS a(amount) + LIMIT 1 OFFSET ($idx - 1) + { + $reward_amount := $a_row.amount; + } + + -- Get participant info by matching wallet address + $pid INT; + $wallet_bytes BYTEA; + $total_reward_pct NUMERIC(10, 2); + + for $p_data in + SELECT + p.id, + p.wallet_address + FROM ob_participants p + WHERE '0x' || encode(p.wallet_address, 'hex') = $wallet_hex + { + $pid := $p_data.id; + $wallet_bytes := $p_data.wallet_address; + + -- Calculate total_reward_percent in separate query to avoid type inference issues + $total_reward_pct := 0::NUMERIC(10,2); + for $pct_row in + SELECT SUM(reward_percent::NUMERIC(10,2))::NUMERIC(10,2) as sum_pct + FROM ob_rewards + WHERE query_id = $query_id AND participant_id = $pid + { + if $pct_row.sum_pct IS NOT NULL { + $total_reward_pct := $pct_row.sum_pct; + } + } + } + + INSERT INTO ob_fee_distribution_details ( + distribution_id, + participant_id, + wallet_address, + reward_amount, + total_reward_percent + ) VALUES ( + $distribution_id, + $pid, + $wallet_bytes, + $reward_amount, + $total_reward_pct + ); + + $idx := $idx + 1; + } + } + -- Step 6: Cleanup - delete processed rewards to save storage + -- NOW SAFE: Audit records created above preserve distribution history DELETE FROM ob_rewards WHERE query_id = $query_id; }; diff --git a/internal/migrations/034-order-book-rewards.sql b/internal/migrations/034-order-book-rewards.sql index c27e42d59..2de688ca2 100644 --- a/internal/migrations/034-order-book-rewards.sql +++ b/internal/migrations/034-order-book-rewards.sql @@ -1,5 +1,5 @@ /** - * MIGRATION 034: DYNAMIC LP REWARDS SAMPLING (ISSUE 9 REFACTOR) + * MIGRATION 034: DYNAMIC LP REWARDS SAMPLING * * Purpose: * Store periodic snapshots of liquidity provider reward eligibility. @@ -14,13 +14,6 @@ * * References: * - Polymarket Rewards: https://docs.polymarket.com/developers/rewards/overview - * - TRUF Spec: /PredictionMarketTasks/OrderbookSetupGoal/orderBookLiquidityRewardsProgram/0mainDoc.md - * - Reference SQL: /PredictionMarketTasks/OrderbookSetupGoal/orderBookLiquidityRewardsProgram/2DynamicRewardsCalculation.md - * - * Related Issues: - * - Task 9R1: Schema Migration + Cleanup (THIS MIGRATION) - * - Task 9R2: Rewards Sampling Implementation (sample_lp_rewards logic) - * - Task 9R3: Settlement Fee Distribution (uses this table) */ -- ============================================================================ diff --git a/internal/migrations/036-order-book-audit.sql b/internal/migrations/036-order-book-audit.sql new file mode 100644 index 000000000..7952b99af --- /dev/null +++ b/internal/migrations/036-order-book-audit.sql @@ -0,0 +1,261 @@ +/** + * MIGRATION 036: FEE DISTRIBUTION AUDIT TRAIL (ISSUE 9C PHASE 1) + * + * Purpose: + * Create immutable audit tables to track fee distribution history. + * Ensures compliance, user transparency, and data preservation. + * + * Problem: + * Currently, distribute_fees() deletes ob_rewards after distribution, + * leaving NO record of who received how much. This migration creates + * permanent audit tables that capture distribution history before deletion. + * + * Tables: + * - ob_fee_distributions: Summary of each distribution event + * - ob_fee_distribution_details: Per-participant reward details + * + * Related Issues: + * - Issue 9C Phase 1: Audit Trail (THIS MIGRATION) + * - Issue 9B: Fee Distribution Engine (modified to insert audit records) + * - Issue 9R3: Settlement Fee Distribution (uses zero-loss algorithm) + * + * Dependencies: + * - Migration 030: ob_queries, ob_participants tables + * - Migration 033: distribute_fees() action (will be modified) + * - Migration 034: ob_rewards table + */ + +-- ============================================================================ +-- SCHEMA: Fee Distribution Audit Tables +-- ============================================================================ + +/** + * ob_fee_distributions + * + * Stores one record per market settlement fee distribution. + * Acts as the parent table for ob_fee_distribution_details. + * + * ID Generation: MAX(id) + 1 pattern (safe in Kwil sequential execution) + * + * Lifecycle: + * - Created by distribute_fees() after successful distribution + * - CASCADE deleted when market (ob_queries) is deleted + * - Never updated (immutable audit log) + * + * Example row: + * id=1, query_id=5, total_fees=10000000000000000000 (10 TRUF), + * total_lp_count=2, block_count=1, distributed_at=1733850000 + */ +CREATE TABLE IF NOT EXISTS ob_fee_distributions ( + id INT PRIMARY KEY, + query_id INT NOT NULL, + total_fees_distributed NUMERIC(78, 0) NOT NULL, + total_lp_count INT NOT NULL, + block_count INT NOT NULL, + distributed_at INT8 NOT NULL, + FOREIGN KEY (query_id) REFERENCES ob_queries(id) ON DELETE CASCADE, + CHECK (total_fees_distributed >= 0), + CHECK (total_lp_count > 0), + CHECK (block_count > 0) +); + +/** + * ob_fee_distribution_details + * + * Stores individual LP rewards for each distribution. + * Multiple rows per distribution (one per LP who received fees). + * + * Composite PK: (distribution_id, participant_id) + * - Prevents duplicate LP records in same distribution + * - Efficient joins on distribution_id + * + * Fields: + * - wallet_address: Snapshot at distribution time (de-normalized for queries) + * - reward_amount: Actual wei received (includes dust if first LP) + * - total_reward_percent: SUM(reward_percent) across sampled blocks + * + * Example rows for distribution_id=1: + * | distribution_id | participant_id | wallet_address | reward_amount | total_reward_percent | + * |-----------------|----------------|----------------|----------------------|----------------------| + * | 1 | 10 | 0x1111... | 6400000000000000000 | 64.00 | + * | 1 | 20 | 0x2222... | 3600000000000000000 | 36.00 | + */ +CREATE TABLE IF NOT EXISTS ob_fee_distribution_details ( + distribution_id INT NOT NULL, + participant_id INT NOT NULL, + wallet_address BYTEA NOT NULL, + reward_amount NUMERIC(78, 0) NOT NULL, + total_reward_percent NUMERIC(10, 2) NOT NULL, + PRIMARY KEY (distribution_id, participant_id), + FOREIGN KEY (distribution_id) REFERENCES ob_fee_distributions(id) ON DELETE CASCADE, + FOREIGN KEY (participant_id) REFERENCES ob_participants(id), + CHECK (reward_amount > 0), + CHECK (total_reward_percent > 0) +); + +-- ============================================================================ +-- Indexes for Query Performance +-- ============================================================================ + +/** + * Query pattern: "Show all distributions for Market #123" + * Used by: Analytics queries, market detail pages + */ +CREATE INDEX IF NOT EXISTS idx_fee_dist_query + ON ob_fee_distributions(query_id); + +/** + * Query pattern: "Show all rewards earned by Participant #456" + * Used by: User reward history, portfolio analytics + */ +CREATE INDEX IF NOT EXISTS idx_fee_dist_det_participant + ON ob_fee_distribution_details(participant_id); + +/** + * Query pattern: "Show recent distributions (last 7 days)" + * Used by: Analytics dashboards, monitoring + */ +CREATE INDEX IF NOT EXISTS idx_fee_dist_time + ON ob_fee_distributions(distributed_at); + +/** + * Composite index for efficient wallet-based queries + * Query pattern: "Find distributions for wallet 0xABCD..." + */ +CREATE INDEX IF NOT EXISTS idx_fee_dist_det_wallet + ON ob_fee_distribution_details(wallet_address); + +-- ============================================================================ +-- ACTIONS: Audit Query Helpers +-- ============================================================================ + +/** + * get_distribution_summary($query_id) + * + * Returns fee distribution summary for a specific market. + * + * Parameters: + * - $query_id: Market ID + * + * Returns: + * - distribution_id: Unique distribution identifier + * - total_fees_distributed: Total fees distributed (wei) + * - total_lp_count: Number of LPs who received fees + * - block_count: Number of blocks sampled + * - distributed_at: Unix timestamp of distribution + * + * Example usage: + * SELECT * FROM get_distribution_summary(1); + */ +CREATE OR REPLACE ACTION get_distribution_summary( + $query_id INT +) PUBLIC VIEW RETURNS TABLE( + distribution_id INT, + total_fees_distributed NUMERIC(78, 0), + total_lp_count INT, + block_count INT, + distributed_at INT8 +) { + for $row in + SELECT + id as distribution_id, + total_fees_distributed, + total_lp_count, + block_count, + distributed_at + FROM ob_fee_distributions + WHERE query_id = $query_id + ORDER BY distributed_at DESC + { + RETURN NEXT $row.distribution_id, $row.total_fees_distributed, $row.total_lp_count, $row.block_count, $row.distributed_at; + } +}; + +/** + * get_distribution_details($distribution_id) + * + * Returns per-LP reward details for a specific distribution. + * + * Parameters: + * - $distribution_id: Distribution identifier + * + * Returns: + * - participant_id: Internal participant ID + * - wallet_address: Hex wallet address (0x prefix) + * - reward_amount: Amount received in wei + * - total_reward_percent: Sum of reward % across blocks + * + * Example usage: + * SELECT * FROM get_distribution_details(1); + */ +CREATE OR REPLACE ACTION get_distribution_details( + $distribution_id INT +) PUBLIC VIEW RETURNS TABLE( + participant_id INT, + wallet_address TEXT, + reward_amount NUMERIC(78, 0), + total_reward_percent NUMERIC(10, 2) +) { + for $row in + SELECT + participant_id, + '0x' || encode(wallet_address, 'hex') as wallet_hex, + reward_amount, + total_reward_percent + FROM ob_fee_distribution_details + WHERE distribution_id = $distribution_id + ORDER BY reward_amount DESC + { + RETURN NEXT $row.participant_id, $row.wallet_hex, $row.reward_amount, $row.total_reward_percent; + } +}; + +/** + * get_participant_reward_history($wallet_hex) + * + * Returns fee distributions for a wallet. + * Useful for "Your LP Rewards" history pages. + * + * Parameters: + * - $wallet_hex: Wallet address hex (with/out 0x) + * + * Returns: + * - query_id: Market that distributed fees + * - reward_amount: Amount received in wei + * - total_reward_percent: Sum of reward % + * - distributed_at: Unix timestamp + * - distribution_id: Distribution reference + * + * Example usage: + * get_participant_reward_history('0x1111...'); + */ +CREATE OR REPLACE ACTION get_participant_reward_history( + $wallet_hex TEXT +) PUBLIC VIEW RETURNS TABLE( + query_id INT, + reward_amount NUMERIC(78, 0), + total_reward_percent NUMERIC(10, 2), + distributed_at INT8, + distribution_id INT +) { + -- Remove 0x prefix if present + $clean_hex TEXT := $wallet_hex; + if substring($wallet_hex, 1, 2) = '0x' { + $clean_hex := substring($wallet_hex, 3); + } + + for $row in + SELECT + d.query_id, + dd.reward_amount, + dd.total_reward_percent, + d.distributed_at, + dd.distribution_id + FROM ob_fee_distribution_details dd + JOIN ob_fee_distributions d ON dd.distribution_id = d.id + WHERE LOWER(encode(dd.wallet_address, 'hex')) = LOWER($clean_hex) + ORDER BY d.distributed_at DESC + { + RETURN NEXT $row.query_id, $row.reward_amount, $row.total_reward_percent, $row.distributed_at, $row.distribution_id; + } +}; diff --git a/tests/streams/order_book/fee_distribution_audit_test.go b/tests/streams/order_book/fee_distribution_audit_test.go new file mode 100644 index 000000000..a73ec5d7e --- /dev/null +++ b/tests/streams/order_book/fee_distribution_audit_test.go @@ -0,0 +1,551 @@ +//go:build kwiltest + +package order_book + +import ( + "context" + "crypto/sha256" + "fmt" + "math/big" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/common" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + kwilTypes "github.com/trufnetwork/kwil-db/core/types" + erc20bridge "github.com/trufnetwork/kwil-db/node/exts/erc20-bridge/erc20" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/internal/migrations" + testutils "github.com/trufnetwork/node/tests/streams/utils" + "github.com/trufnetwork/sdk-go/core/util" +) + +// TestFeeDistributionAudit tests the audit trail for fee distribution (Issue 9C Phase 1) +func TestFeeDistributionAudit(t *testing.T) { + testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "ORDER_BOOK_09C_FeeDistributionAudit", + SeedStatements: migrations.GetSeedScriptStatements(), + FunctionTests: []kwilTesting.TestFunc{ + // Test 1: Verify audit record creation (basic flow) + testAuditRecordCreation(t), + // Test 2: Verify multi-block audit correctness + testAuditMultiBlock(t), + // Test 3: Verify no audit when no LPs + testAuditNoLPs(t), + // Test 4: Verify no audit when zero fees + testAuditZeroFees(t), + // Test 5: Verify audit data integrity (matches actual transfers) + testAuditDataIntegrity(t), + }, + }, testutils.GetTestOptionsWithCache()) +} + +// testAuditRecordCreation verifies that audit records are created correctly after distribution +func testAuditRecordCreation(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Reset balance point tracker + lastBalancePoint = nil + + // Initialize ERC20 extension + err := erc20bridge.ForTestingInitializeExtension(ctx, platform) + require.NoError(t, err) + + user1 := util.Unsafe_NewEthereumAddressFromString("0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") + user2 := util.Unsafe_NewEthereumAddressFromString("0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB") + + // Give both users balance + err = giveBalanceChained(ctx, platform, user1.Address(), "500000000000000000000") + require.NoError(t, err) + err = giveBalanceChained(ctx, platform, user2.Address(), "500000000000000000000") + require.NoError(t, err) + + // Create market + queryHash := sha256.Sum256([]byte("test_audit_record_creation")) + settleTime := time.Now().Add(24 * time.Hour).Unix() + + var marketID int64 + err = callCreateMarket(ctx, platform, &user1, queryHash[:], settleTime, 5, 20, func(row *common.Row) error { + marketID = row.Values[0].(int64) + return nil + }) + require.NoError(t, err) + t.Logf("Created market ID: %d", marketID) + + // Setup LP scenario (same as fee_distribution_test.go) + setupLPScenario(t, ctx, platform, &user1, &user2, int(marketID)) + + // Sample LP rewards + err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + require.NoError(t, err) + + // Fund vault and call distribute_fees + totalFees := new(big.Int).Mul(big.NewInt(10), big.NewInt(1e18)) // 10 TRUF + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees) + require.NoError(t, err) + + // Verify audit summary record exists + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + Timestamp: time.Now().Unix(), + }, + Signer: user1.Bytes(), + Caller: user1.Address(), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx, OverrideAuthz: true} + + // Query distribution summary using callback pattern + var distributionID int + var totalFeesStr string + var lpCount int + var blockCount int + var summaryRowCount int + + _, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_distribution_summary", []any{int(marketID)}, + func(row *common.Row) error { + distributionID = int(row.Values[0].(int64)) + // total_fees_distributed is NUMERIC(78, 0) which comes as *types.Decimal + totalFeesDecimal := row.Values[1].(*kwilTypes.Decimal) + totalFeesStr = totalFeesDecimal.String() + lpCount = int(row.Values[2].(int64)) + blockCount = int(row.Values[3].(int64)) + summaryRowCount++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 1, summaryRowCount, "Should have 1 distribution summary record") + + t.Logf("✅ Audit summary: distribution_id=%d, total_fees=%s, lp_count=%d, block_count=%d", + distributionID, totalFeesStr, lpCount, blockCount) + + require.Equal(t, 2, lpCount, "LP count should be 2") + require.Equal(t, 1, blockCount, "Block count should be 1") + + // Verify per-LP detail records using callback pattern with slice collection + type detailRow struct { + participantID int + wallet string + rewardAmount string + rewardPercent string + } + detailRows := make([]detailRow, 0, 2) + + _, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_distribution_details", []any{distributionID}, + func(row *common.Row) error { + // NUMERIC fields come as *types.Decimal + rewardAmtDecimal := row.Values[2].(*kwilTypes.Decimal) + rewardPctDecimal := row.Values[3].(*kwilTypes.Decimal) + + detailRows = append(detailRows, detailRow{ + participantID: int(row.Values[0].(int64)), + wallet: row.Values[1].(string), + rewardAmount: rewardAmtDecimal.String(), + rewardPercent: rewardPctDecimal.String(), + }) + return nil + }) + require.NoError(t, err) + require.Len(t, detailRows, 2, "Should have 2 LP detail records") + + // Verify zero-loss: SUM(reward_amount) = total_fees_distributed + totalFeesFromAudit, _ := new(big.Int).SetString(totalFeesStr, 10) + var totalDistributed big.Int + for _, detail := range detailRows { + amt, _ := new(big.Int).SetString(detail.rewardAmount, 10) + totalDistributed.Add(&totalDistributed, amt) + } + + require.Equal(t, totalFeesFromAudit.String(), totalDistributed.String(), + "Zero-loss audit: SUM(reward_amount) should equal total_fees_distributed") + + t.Logf("✅ Audit record creation verified: %s wei distributed across %d LPs", totalFeesFromAudit.String(), lpCount) + + return nil + } +} + +// testAuditMultiBlock verifies audit correctness with multiple block samples +func testAuditMultiBlock(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + lastBalancePoint = nil + err := erc20bridge.ForTestingInitializeExtension(ctx, platform) + require.NoError(t, err) + + user1 := util.Unsafe_NewEthereumAddressFromString("0xCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC") + user2 := util.Unsafe_NewEthereumAddressFromString("0xDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD") + + err = giveBalanceChained(ctx, platform, user1.Address(), "500000000000000000000") + require.NoError(t, err) + err = giveBalanceChained(ctx, platform, user2.Address(), "500000000000000000000") + require.NoError(t, err) + + queryHash := sha256.Sum256([]byte("test_audit_multiblock")) + settleTime := time.Now().Add(24 * time.Hour).Unix() + + var marketID int64 + err = callCreateMarket(ctx, platform, &user1, queryHash[:], settleTime, 5, 20, func(row *common.Row) error { + marketID = row.Values[0].(int64) + return nil + }) + require.NoError(t, err) + + // Setup LP scenario + setupLPScenario(t, ctx, platform, &user1, &user2, int(marketID)) + + // Sample 3 blocks + err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + require.NoError(t, err) + err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 2000, nil) + require.NoError(t, err) + err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 3000, nil) + require.NoError(t, err) + + // Distribute fees + totalFees := new(big.Int).Mul(big.NewInt(30), big.NewInt(1e18)) + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees) + require.NoError(t, err) + + // Verify audit summary + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{Height: 1, Timestamp: time.Now().Unix()}, + Signer: user1.Bytes(), + Caller: user1.Address(), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx, OverrideAuthz: true} + + // Query distribution summary using callback pattern + var blockCount int + var rowCount int + + _, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_distribution_summary", []any{int(marketID)}, + func(row *common.Row) error { + blockCount = int(row.Values[3].(int64)) + rowCount++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 1, rowCount, "Should have 1 distribution summary record") + require.Equal(t, 3, blockCount, "Block count should be 3 (3 samples)") + + t.Logf("✅ Multi-block audit verified: %d blocks sampled", blockCount) + + return nil + } +} + +// testAuditNoLPs verifies no audit record when no LPs (fees stay in vault) +func testAuditNoLPs(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + lastBalancePoint = nil + err := erc20bridge.ForTestingInitializeExtension(ctx, platform) + require.NoError(t, err) + + user1 := util.Unsafe_NewEthereumAddressFromString("0xEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE") + + err = giveBalanceChained(ctx, platform, user1.Address(), "500000000000000000000") + require.NoError(t, err) + + queryHash := sha256.Sum256([]byte("test_audit_no_lps")) + settleTime := time.Now().Add(24 * time.Hour).Unix() + + var marketID int64 + err = callCreateMarket(ctx, platform, &user1, queryHash[:], settleTime, 5, 20, func(row *common.Row) error { + marketID = row.Values[0].(int64) + return nil + }) + require.NoError(t, err) + + // Don't sample LP rewards (no LP samples) + totalFees := new(big.Int).Mul(big.NewInt(10), big.NewInt(1e18)) + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees) + require.NoError(t, err) + + // Verify NO audit summary record + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{Height: 1, Timestamp: time.Now().Unix()}, + Signer: user1.Bytes(), + Caller: user1.Address(), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx, OverrideAuthz: true} + + // Query distribution summary - should have 0 rows + var rowCount int + _, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_distribution_summary", []any{int(marketID)}, + func(row *common.Row) error { + rowCount++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 0, rowCount, "Should have 0 distribution records (no LPs)") + + t.Logf("✅ No audit record created when no LPs (fees stayed in vault)") + + return nil + } +} + +// testAuditZeroFees verifies no audit when zero fees collected +func testAuditZeroFees(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + lastBalancePoint = nil + err := erc20bridge.ForTestingInitializeExtension(ctx, platform) + require.NoError(t, err) + + user1 := util.Unsafe_NewEthereumAddressFromString("0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF") + + err = giveBalanceChained(ctx, platform, user1.Address(), "500000000000000000000") + require.NoError(t, err) + + queryHash := sha256.Sum256([]byte("test_audit_zero_fees")) + settleTime := time.Now().Add(24 * time.Hour).Unix() + + var marketID int64 + err = callCreateMarket(ctx, platform, &user1, queryHash[:], settleTime, 5, 20, func(row *common.Row) error { + marketID = row.Values[0].(int64) + return nil + }) + require.NoError(t, err) + + // Call distribute_fees with $0 fees (early return) + zeroFees := big.NewInt(0) + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), zeroFees) + require.NoError(t, err) + + // Verify NO audit record (zero fees early return) + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{Height: 1, Timestamp: time.Now().Unix()}, + Signer: user1.Bytes(), + Caller: user1.Address(), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx, OverrideAuthz: true} + + // Query distribution summary - should have 0 rows + var rowCount int + _, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_distribution_summary", []any{int(marketID)}, + func(row *common.Row) error { + rowCount++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 0, rowCount, "Should have 0 distribution records (zero fees)") + + t.Logf("✅ No audit record created when zero fees collected") + + return nil + } +} + +// testAuditDataIntegrity verifies audit data matches actual balance transfers +func testAuditDataIntegrity(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + lastBalancePoint = nil + err := erc20bridge.ForTestingInitializeExtension(ctx, platform) + require.NoError(t, err) + + user1 := util.Unsafe_NewEthereumAddressFromString("0x9999999999999999999999999999999999999999") + user2 := util.Unsafe_NewEthereumAddressFromString("0x8888888888888888888888888888888888888888") + + err = giveBalanceChained(ctx, platform, user1.Address(), "500000000000000000000") + require.NoError(t, err) + err = giveBalanceChained(ctx, platform, user2.Address(), "500000000000000000000") + require.NoError(t, err) + + queryHash := sha256.Sum256([]byte("test_audit_data_integrity")) + settleTime := time.Now().Add(24 * time.Hour).Unix() + + var marketID int64 + err = callCreateMarket(ctx, platform, &user1, queryHash[:], settleTime, 5, 20, func(row *common.Row) error { + marketID = row.Values[0].(int64) + return nil + }) + require.NoError(t, err) + + // Setup LP scenario (places orders which spend collateral) + setupLPScenario(t, ctx, platform, &user1, &user2, int(marketID)) + + // Get initial balances AFTER order placement (to measure only fee distribution increase) + bal1Before, err := getBalance(ctx, platform, user1.Address()) + require.NoError(t, err) + bal2Before, err := getBalance(ctx, platform, user2.Address()) + require.NoError(t, err) + + // Sample + err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + require.NoError(t, err) + + // Distribute + totalFees := new(big.Int).Mul(big.NewInt(10), big.NewInt(1e18)) + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees) + require.NoError(t, err) + + // Get final balances + bal1After, err := getBalance(ctx, platform, user1.Address()) + require.NoError(t, err) + bal2After, err := getBalance(ctx, platform, user2.Address()) + require.NoError(t, err) + + // Calculate actual balance increases + increase1 := new(big.Int).Sub(bal1After, bal1Before) + increase2 := new(big.Int).Sub(bal2After, bal2Before) + + // Get audit records + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{Height: 1, Timestamp: time.Now().Unix()}, + Signer: user1.Bytes(), + Caller: user1.Address(), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx, OverrideAuthz: true} + + // Query distribution summary using callback pattern + var distributionID int + var totalFeesStr string + var summaryRowCount int + + _, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_distribution_summary", []any{int(marketID)}, + func(row *common.Row) error { + distributionID = int(row.Values[0].(int64)) + // total_fees_distributed is NUMERIC(78, 0) which comes as *types.Decimal + totalFeesDecimal := row.Values[1].(*kwilTypes.Decimal) + totalFeesStr = totalFeesDecimal.String() + summaryRowCount++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 1, summaryRowCount, "Should have 1 distribution summary record") + + // Query distribution details using callback pattern with map collection + auditRewards := make(map[string]*big.Int) + var detailRowCount int + + _, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_distribution_details", []any{distributionID}, + func(row *common.Row) error { + walletHex := row.Values[1].(string) + // reward_amount is NUMERIC(78, 0) which comes as *types.Decimal + rewardDecimal := row.Values[2].(*kwilTypes.Decimal) + rewardStr := rewardDecimal.String() + reward, _ := new(big.Int).SetString(rewardStr, 10) + // Normalize to lowercase for consistent map lookup + key := strings.ToLower(walletHex) + auditRewards[key] = reward + detailRowCount++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, detailRowCount, "Should have 2 LP detail records") + + // Verify audit rewards match actual balance increases + // Note: user.Address() already includes "0x" prefix + user1Hex := strings.ToLower(user1.Address()) + user2Hex := strings.ToLower(user2.Address()) + + auditReward1 := auditRewards[user1Hex] + auditReward2 := auditRewards[user2Hex] + + require.NotNil(t, auditReward1, "User1 should have audit reward") + require.NotNil(t, auditReward2, "User2 should have audit reward") + require.Equal(t, increase1.String(), auditReward1.String(), "Audit reward1 should match balance increase") + require.Equal(t, increase2.String(), auditReward2.String(), "Audit reward2 should match balance increase") + + // Verify zero-loss in audit + totalFeesFromAudit, _ := new(big.Int).SetString(totalFeesStr, 10) + auditSum := new(big.Int).Add(auditReward1, auditReward2) + require.Equal(t, totalFeesFromAudit.String(), auditSum.String(), "Audit rewards should sum to total fees") + + t.Logf("✅ Audit data integrity verified:") + t.Logf(" - Audit User1 reward: %s wei (balance increase: %s)", auditReward1.String(), increase1.String()) + t.Logf(" - Audit User2 reward: %s wei (balance increase: %s)", auditReward2.String(), increase2.String()) + t.Logf(" - Audit total: %s wei (zero-loss verified)", auditSum.String()) + + return nil + } +} + +// setupLPScenario creates paired orders for both users to qualify as LPs +func setupLPScenario(t *testing.T, ctx context.Context, platform *kwilTesting.Platform, + user1, user2 *util.EthereumAddress, marketID int) { + + err := callPlaceSplitLimitOrder(ctx, platform, user1, marketID, 50, 300) + require.NoError(t, err) + + err = callPlaceBuyOrder(ctx, platform, user1, marketID, true, 46, 50) + require.NoError(t, err) + err = callPlaceSellOrder(ctx, platform, user1, marketID, true, 52, 200) + require.NoError(t, err) + + err = callPlaceSellOrder(ctx, platform, user1, marketID, true, 48, 100) + require.NoError(t, err) + err = callPlaceBuyOrder(ctx, platform, user1, marketID, false, 52, 100) + require.NoError(t, err) + + err = callPlaceSplitLimitOrder(ctx, platform, user2, marketID, 50, 100) + require.NoError(t, err) + err = callPlaceSellOrder(ctx, platform, user2, marketID, true, 49, 100) + require.NoError(t, err) + err = callPlaceBuyOrder(ctx, platform, user2, marketID, false, 51, 100) + require.NoError(t, err) +} + +// fundVaultAndDistributeFees funds the vault and calls distribute_fees +func fundVaultAndDistributeFees(t *testing.T, ctx context.Context, platform *kwilTesting.Platform, + user *util.EthereumAddress, marketID int, totalFees *big.Int) error { + + // Fund vault if fees > 0 + if totalFees.Sign() > 0 { + err := giveBalanceChained(ctx, platform, testEscrow, totalFees.String()) + if err != nil { + return err + } + + _, err = erc20bridge.ForTestingForceSyncInstance(ctx, platform, testChain, testEscrow, testERC20, 18) + if err != nil { + return err + } + } + + // Convert to NUMERIC(78, 0) + totalFeesDecimal, err := kwilTypes.ParseDecimalExplicit(totalFees.String(), 78, 0) + if err != nil { + return err + } + + // Call distribute_fees + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + Timestamp: time.Now().Unix(), + }, + Signer: user.Bytes(), + Caller: user.Address(), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx, OverrideAuthz: true} + + res, err := platform.Engine.Call(engineCtx, platform.DB, "", "distribute_fees", []any{marketID, totalFeesDecimal}, nil) + if err != nil { + return err + } + if res.Error != nil { + return fmt.Errorf("distribute_fees error: %v", res.Error) + } + + return nil +}