diff --git a/go.mod b/go.mod index 961fc9007..1714ffb53 100644 --- a/go.mod +++ b/go.mod @@ -19,8 +19,8 @@ require ( github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.37.0 - github.com/trufnetwork/kwil-db v0.10.3-0.20251022154021-25ca3428bace - github.com/trufnetwork/kwil-db/core v0.4.3-0.20251022154021-25ca3428bace + github.com/trufnetwork/kwil-db v0.10.3-0.20251204134443-d43bfe5b400b + github.com/trufnetwork/kwil-db/core v0.4.3-0.20251204134443-d43bfe5b400b github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa diff --git a/go.sum b/go.sum index 6e6b1c4f3..b1fccbd9f 100644 --- a/go.sum +++ b/go.sum @@ -1212,10 +1212,10 @@ github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZ github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY= github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPDo= github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= -github.com/trufnetwork/kwil-db v0.10.3-0.20251022154021-25ca3428bace h1:Cikf1fQSgTDPHU4yXhJgAKQ4qjn4PDF0znXPmVLK1b8= -github.com/trufnetwork/kwil-db v0.10.3-0.20251022154021-25ca3428bace/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20251022154021-25ca3428bace h1:CyU76505WNmFFZSB9w3007qAA8et3WxKqxEJcSh+Rfo= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20251022154021-25ca3428bace/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db v0.10.3-0.20251204134443-d43bfe5b400b h1:IAYxPaxlsn7+63cvZjrCcQKoWeKOXNPqp9NGifecKqw= +github.com/trufnetwork/kwil-db v0.10.3-0.20251204134443-d43bfe5b400b/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20251204134443-d43bfe5b400b h1:CGtjMi1JqG/z1g+hnLVpYNS0yHz4Yif7Zflj6tzyTM8= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20251204134443-d43bfe5b400b/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2/go.mod h1:Y0MJpPp9QXU5vC6Gpoilql2NkgmGNcbHm9HYC2v2N8s= github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 h1:d9EqPXIjbq/atzEncK5dM3Z9oStx1BxCGuL/sjefeCw= diff --git a/internal/migrations/032-order-book-actions.sql b/internal/migrations/032-order-book-actions.sql index 63638b7e2..61e9fb019 100644 --- a/internal/migrations/032-order-book-actions.sql +++ b/internal/migrations/032-order-book-actions.sql @@ -2265,8 +2265,7 @@ CREATE OR REPLACE ACTION settle_market( settled_at = @block_timestamp WHERE id = $query_id; - -- Success: Market settled - -- - All trading is now blocked - -- - winning_outcome is recorded (TRUE = YES wins, FALSE = NO wins) - -- - Users can now call claim_payout() to redeem winning shares (Issue 8) + -- Issue 8: Automatic atomic settlement processing + -- Process all payouts, refunds, and fee collection atomically + process_settlement($query_id, $winning_outcome); }; diff --git a/internal/migrations/033-order-book-settlement.sql b/internal/migrations/033-order-book-settlement.sql new file mode 100644 index 000000000..74621f9df --- /dev/null +++ b/internal/migrations/033-order-book-settlement.sql @@ -0,0 +1,164 @@ +/** + * MIGRATION 033: ORDER BOOK SETTLEMENT + * + * Automatic atomic settlement processing: + * - Bulk delete losing positions (efficient) + * - Pay winners (shares × $1.00 - 2% redemption fee) + * - Refund open buy orders (no fee) + * - Delete all positions atomically + * - Collect fees in vault (Issue 9 will distribute them) + * + * Implementation Note: + * Uses CTE + ARRAY_AGG to collect all payout data in a single query, then + * processes payouts via batch unlock. This avoids nested queries in the main + * settlement action (Kuneiform limitation: cannot call external functions + * like ethereum_bridge.unlock() inside FOR loops in the same action). + * + * The batch unlock helper (ob_batch_unlock_collateral) CAN loop with function + * calls because it's a separate action called ONCE with all aggregated data. + * + * Transaction Atomicity: + * All Kwil actions execute in a single database transaction. If ANY operation + * fails (including ethereum_bridge.unlock()), the ENTIRE action rolls back: + * - Database changes (position deletions, settled flag) are reverted + * - Blockchain state changes are NOT committed (Kwil's 2-phase approach) + * - The settled flag remains false, allowing the settlement extension to retry + * + * Retry Mechanism: + * The tn_settlement extension retries failed settlements (3 attempts with backoff). + * After exhaustion, the market remains unsettled and requires manual intervention + * or extension restart to resume retries. This is safe because: + * 1. The settled flag prevents duplicate settlement attempts within a transaction + * 2. Rollback ensures partial state never persists + * 3. Position data remains intact for retry attempts + */ + +-- Batch unlock collateral for multiple wallets +-- This helper processes all unlocks in a single call, avoiding nested queries in settlement +CREATE OR REPLACE ACTION ob_batch_unlock_collateral( + $wallet_addresses TEXT[], + $amounts NUMERIC(78, 0)[] +) PRIVATE { + -- Validate input arrays have same length + if COALESCE(array_length($wallet_addresses), 0) != COALESCE(array_length($amounts), 0) { + ERROR('wallet_addresses and amounts arrays must have the same length'); + } + + -- Process each unlock (this is the ONLY place we loop with function calls) + -- This is safe because the settlement action calls THIS function once with all data + for $payout in + SELECT wallet, amount + FROM UNNEST($wallet_addresses, $amounts) AS u(wallet, amount) + { + ethereum_bridge.unlock($payout.wallet, $payout.amount); + } +}; + +-- Process settlement: Pay winners, refund open buys, collect fees +CREATE OR REPLACE ACTION process_settlement( + $query_id INT, + $winning_outcome BOOL +) PRIVATE { + $redemption_fee_bps INT := 200; -- 2% (200 basis points) + $total_fees_collected NUMERIC(78, 0) := '0'::NUMERIC(78, 0); + $one_token NUMERIC(78, 0) := '1000000000000000000'::NUMERIC(78, 0); + + -- Step 1: Bulk delete all losing positions (efficient single operation) + -- Price semantics: price=0 (holdings), price>0 (open sells), price<0 (open buys) + -- Deletes losing outcome holdings and sells, which have zero value after settlement + -- This removes ~50% of positions upfront + DELETE FROM ob_positions + WHERE query_id = $query_id + AND outcome = NOT $winning_outcome + AND price >= 0; -- Holdings (price=0) and open sells (price>0) only + + -- Step 2: Collect ALL payout data using CTE + ARRAY_AGG (digest pattern!) + -- Calculate payouts and aggregate into arrays in a SINGLE query + $wallet_addresses TEXT[]; + $amounts NUMERIC(78, 0)[]; + + for $result in + WITH remaining_positions AS ( + SELECT + p.participant_id, + p.outcome, + p.price, + p.amount, + '0x' || encode(part.wallet_address, 'hex') as wallet_address + FROM ob_positions p + JOIN ob_participants part ON p.participant_id = part.id + WHERE p.query_id = $query_id + ), + calculated_values AS ( + SELECT + wallet_address, + price, + -- Pre-calculate all monetary values to avoid CASE type issues + -- All amounts cast to NUMERIC(78, 0) to match ethereum_bridge.unlock() API + (amount::NUMERIC(78, 0) * $one_token)::NUMERIC(78, 0) as gross_winner_payout, + ((amount::NUMERIC(78, 0) * $one_token * $redemption_fee_bps::NUMERIC(78, 0)) / 10000::NUMERIC(78, 0))::NUMERIC(78, 0) as winner_fee, + ((amount::NUMERIC(78, 0) * abs(price)::NUMERIC(78, 0) * $one_token) / 100::NUMERIC(78, 0))::NUMERIC(78, 0) as refund_amount + FROM remaining_positions + ), + payouts AS ( + SELECT + wallet_address, + -- Remaining positions after Step 1 are: + -- 1. Winning holdings/sells (price >= 0): Pay shares × $1 - 2% fee + -- 2. Open buy orders (price < 0): Refund locked collateral, no fee + CASE + WHEN price >= 0 THEN + gross_winner_payout - winner_fee + ELSE + refund_amount + END as payout_amount, + CASE + WHEN price >= 0 THEN + winner_fee + ELSE + '0'::NUMERIC(78, 0) + END as fee_amount + FROM calculated_values + ), + wallet_totals AS ( + -- Group by wallet to handle multiple positions per user + SELECT + wallet_address, + SUM(payout_amount) as total_payout, + SUM(fee_amount) as total_fees + FROM payouts + GROUP BY wallet_address + ), + aggregated AS ( + SELECT + ARRAY_AGG(wallet_address ORDER BY wallet_address) as wallets, + ARRAY_AGG(total_payout::NUMERIC(78, 0) ORDER BY wallet_address) as amounts, + SUM(total_fees)::NUMERIC(78, 0) as total_fees + FROM wallet_totals + ) + SELECT wallets, amounts, COALESCE(total_fees, 0::NUMERIC(78, 0)) as total_fees + FROM aggregated + { + $wallet_addresses := $result.wallets; + $amounts := $result.amounts; + $total_fees_collected := $result.total_fees; + } + + -- Step 3: Delete all processed positions (set-based, no loop!) + DELETE FROM ob_positions WHERE query_id = $query_id; + + -- Step 4: Process ALL payouts in a SINGLE batch call (no nested queries!) + if $wallet_addresses IS NOT NULL AND COALESCE(array_length($wallet_addresses), 0) > 0 { + ob_batch_unlock_collateral($wallet_addresses, $amounts); + } + + -- Step 5: Fee distribution (Issue 9 will implement this) + -- Fees are automatically kept in the vault by deducting from unlocked amounts. + -- Winners receive (shares × $1 - 2% fee), so 2% remains locked in vault. + -- + -- $total_fees_collected tracks the amount for future distribution: + -- TODO (Issue 9): Uncomment when distribute_fees() is implemented + -- distribute_fees($query_id, $total_fees_collected); + -- + -- Verification: Check vault balance via ethereum_bridge queries +} diff --git a/tests/streams/order_book/settlement_payout_test.go b/tests/streams/order_book/settlement_payout_test.go new file mode 100644 index 000000000..42993eca5 --- /dev/null +++ b/tests/streams/order_book/settlement_payout_test.go @@ -0,0 +1,270 @@ +//go:build kwiltest + +package order_book + +import ( + "context" + "math/big" + "testing" + + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/common" + kwilTypes "github.com/trufnetwork/kwil-db/core/types" + erc20bridge "github.com/trufnetwork/kwil-db/node/exts/erc20-bridge/erc20" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/extensions/tn_utils" + "github.com/trufnetwork/node/internal/migrations" + testutils "github.com/trufnetwork/node/tests/streams/utils" + "github.com/trufnetwork/node/tests/streams/utils/setup" + "github.com/trufnetwork/sdk-go/core/util" + + attestationTests "github.com/trufnetwork/node/tests/streams/attestation" +) + +// TestSettlementPayouts tests that process_settlement() correctly pays winners +func TestSettlementPayouts(t *testing.T) { + owner := util.Unsafe_NewEthereumAddressFromString("0x1111111111111111111111111111111111111111") + + testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "ORDER_BOOK_08_SettlementPayouts", + SeedStatements: migrations.GetSeedScriptStatements(), + Owner: owner.Address(), + FunctionTests: []kwilTesting.TestFunc{ + testWinnerReceives98PercentPayout(t), + }, + }, testutils.GetTestOptionsWithCache()) +} + +// testWinnerReceives98PercentPayout verifies that a winner receives 98% payout (2% fee deducted) +// Scenario: User creates 100 YES shares, market settles as YES, user receives 98 USDC +func testWinnerReceives98PercentPayout(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Reset balance point tracker for this test + lastBalancePoint = nil + + // CRITICAL: Initialize ERC20 extension singleton FIRST (like matching_engine_test.go) + err := erc20bridge.ForTestingInitializeExtension(ctx, platform) + require.NoError(t, err) + + // Use valid Ethereum address as user + userAddr := util.Unsafe_NewEthereumAddressFromString("0x2222222222222222222222222222222222222222") + platform.Deployer = userAddr.Bytes() + + // Setup attestation helper (for signing) + helper := attestationTests.NewAttestationTestHelper(t, ctx, platform) + + // Create data provider + err = setup.CreateDataProvider(ctx, platform, userAddr.Address()) + require.NoError(t, err) + + // Give user initial balance: 500 USDC + err = giveBalanceChained(ctx, platform, userAddr.Address(), "500000000000000000000") + require.NoError(t, err) + + // Get balance before market operations + balanceBefore, err := getBalance(ctx, platform, userAddr.Address()) + require.NoError(t, err) + t.Logf("User balance before: %s USDC", balanceBefore.String()) + + // Use simple stream ID (exactly 32 characters) + streamID := "stpayouttest00000000000000000000" // Exactly 32 chars + dataProvider := userAddr.Address() + + // CRITICAL: create_stream + insert_records + get_record must share the SAME + // engine context to ensure inserted data is visible within the transaction + engineCtx := helper.NewEngineContext() + + // Create primitive stream + createRes, err := platform.Engine.Call(engineCtx, platform.DB, "", "create_stream", + []any{streamID, "primitive"}, + nil) + require.NoError(t, err) + if createRes.Error != nil { + t.Fatalf("create_stream error: %v", createRes.Error) + } + t.Logf("✓ Created stream: %s", streamID) + + // Insert outcome data directly using the SAME engineCtx + eventTime := int64(1000) + + // Create decimal value (1.0 = YES outcome) + valueStr := "1.000000000000000000" // 1.0 with 18 decimal places + valueDecimal, err := kwilTypes.ParseDecimalExplicit(valueStr, 36, 18) + require.NoError(t, err) + t.Logf("✓ Parsed decimal value: %v", valueDecimal) + + // Insert using the same engineCtx so data is visible to subsequent calls + insertRes, err := platform.Engine.Call(engineCtx, platform.DB, "", "insert_records", + []any{ + []string{dataProvider}, + []string{streamID}, + []int64{eventTime}, + []*kwilTypes.Decimal{valueDecimal}, + }, + nil) + require.NoError(t, err) + if insertRes.Error != nil { + t.Fatalf("insert_records error: %v", insertRes.Error) + } + t.Logf("✓ Inserted record: provider=%s, stream=%s, time=%d, value=%s", + dataProvider, streamID, eventTime, valueStr) + + // Verify data was inserted by querying directly (reuse same context) + var foundData bool + getRes, err := platform.Engine.Call(engineCtx, platform.DB, "", "get_record", + []any{ + dataProvider, + streamID, + int64(500), + int64(1500), + nil, + false, + }, + func(row *common.Row) error { + t.Logf("Found data: event_time=%v, value=%v", row.Values[0], row.Values[1]) + foundData = true + return nil + }) + require.NoError(t, err) + if getRes.Error != nil { + t.Fatalf("get_record error: %v", getRes.Error) + } + require.True(t, foundData, "Data should be found in stream") + + // Request attestation for get_record + argsBytes, err := tn_utils.EncodeActionArgs([]any{ + dataProvider, + streamID, + int64(500), + int64(1500), + nil, + false, + }) + require.NoError(t, err) + + var requestTxID string + var attestationHash []byte + engineCtx = helper.NewEngineContext() + res, err := platform.Engine.Call(engineCtx, platform.DB, "", "request_attestation", + []any{ + dataProvider, + streamID, + "get_record", + argsBytes, + false, + nil, + }, + func(row *common.Row) error { + requestTxID = row.Values[0].(string) + attestationHash = append([]byte(nil), row.Values[1].([]byte)...) + return nil + }) + require.NoError(t, err) + if res.Error != nil { + t.Logf("request_attestation error: %v", res.Error) + require.NoError(t, res.Error, "request_attestation failed") + } + require.NotEmpty(t, requestTxID) + require.NotEmpty(t, attestationHash) + t.Logf("Created attestation: txID=%s, hash=%x", requestTxID, attestationHash) + + // Sign the attestation + helper.SignAttestation(requestTxID) + + // Create market using the attestation hash + settleTime := int64(100) // Future timestamp + maxSpread := int64(5) + minOrderSize := int64(1) + var queryID int + + // Use timestamp 50 for market creation (before settleTime) + engineCtx = helper.NewEngineContext() + engineCtx.TxContext.BlockContext.Timestamp = 50 + createMarketRes, err := platform.Engine.Call(engineCtx, platform.DB, "", "create_market", + []any{attestationHash, settleTime, maxSpread, minOrderSize}, + func(row *common.Row) error { + queryID = int(row.Values[0].(int64)) + return nil + }) + require.NoError(t, err) + if createMarketRes.Error != nil { + t.Logf("create_market error: %v", createMarketRes.Error) + require.NoError(t, createMarketRes.Error) + } + require.Greater(t, queryID, 0, "queryID should be positive") + t.Logf("Created market ID: %d", queryID) + + // User places split limit order to create 100 YES shares + // Cost: 100 USDC locked in vault + err = callPlaceSplitLimitOrder(ctx, platform, &userAddr, queryID, 60, 100) + require.NoError(t, err) + + // Verify user has 100 YES holdings + positions, err := getPositions(ctx, platform, queryID) + require.NoError(t, err) + var yesHoldings *Position + for i := range positions { + if positions[i].Outcome && positions[i].Price == 0 { + yesHoldings = &positions[i] + break + } + } + require.NotNil(t, yesHoldings, "User should have YES holdings") + require.Equal(t, int64(100), yesHoldings.Amount) + t.Logf("User has %d YES holdings before settlement", yesHoldings.Amount) + + // Settle the market with timestamp 200 (after settleTime) + engineCtx = helper.NewEngineContext() + engineCtx.TxContext.BlockContext.Timestamp = 200 + settleRes, err := platform.Engine.Call(engineCtx, platform.DB, "", "settle_market", + []any{queryID}, + nil) + require.NoError(t, err) + if settleRes.Error != nil { + t.Logf("settle_market error: %v", settleRes.Error) + } + require.Nil(t, settleRes.Error, "settle_market should succeed") + + // Verify market is settled + engineCtx = helper.NewEngineContext() + var settled bool + var winningOutcome *bool + err = platform.Engine.Execute(engineCtx, platform.DB, + `SELECT settled, winning_outcome FROM ob_queries WHERE id = $id`, + map[string]any{"id": queryID}, + func(row *common.Row) error { + settled = row.Values[0].(bool) + if row.Values[1] != nil { + outcome := row.Values[1].(bool) + winningOutcome = &outcome + } + return nil + }) + require.NoError(t, err) + require.True(t, settled, "market should be marked as settled") + require.NotNil(t, winningOutcome, "winning_outcome should be set") + require.True(t, *winningOutcome, "outcome should be TRUE (YES) since value was 1.0") + + // NEW: Verify all positions are deleted (payout processing) + positionsAfter, err := getPositions(ctx, platform, queryID) + require.NoError(t, err) + require.Empty(t, positionsAfter, "All positions should be deleted after settlement") + t.Logf("✓ All positions cleared after settlement") + + // NEW: Verify user received 98% payout + // Expected: 100 shares × $1.00 - 2% fee = 98 USDC + balanceAfter, err := getBalance(ctx, platform, userAddr.Address()) + require.NoError(t, err) + t.Logf("User balance after: %s USDC", balanceAfter.String()) + + // Net change should be: -2 USDC (market creation fee) -100 USDC (locked) + 98 USDC (payout) = -4 USDC + // (2 USDC market creation fee + 2 USDC settlement fee = 4 USDC total fees) + netChange := new(big.Int).Sub(balanceAfter, balanceBefore) + expectedNetChange := new(big.Int).Mul(big.NewInt(-4), big.NewInt(1e18)) // -4 USDC + require.Equal(t, expectedNetChange.String(), netChange.String(), + "Net change should be -4 USDC (2 market creation + 2 settlement fee)") + t.Logf("✓ Net balance change: %s (4 USDC total fees: 2 creation + 2 settlement)", netChange.String()) + + return nil + } +}