From 6b02ca0af27db73979144809b857fff812bca266 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Fri, 27 Feb 2026 18:41:16 +0700 Subject: [PATCH 1/3] chore: increase max limit of LP rewards sampling and batch lp --- .../tn_lp_rewards/internal/engine_ops.go | 88 +----- extensions/tn_lp_rewards/tn_lp_rewards.go | 277 +++--------------- internal/benchmark/types.go | 4 +- .../migrations/034-order-book-rewards.sql | 111 ++++--- .../order_book/fee_distribution_audit_test.go | 10 +- .../order_book/fee_distribution_test.go | 12 +- tests/streams/order_book/rewards_test.go | 66 +++-- 7 files changed, 162 insertions(+), 406 deletions(-) diff --git a/extensions/tn_lp_rewards/internal/engine_ops.go b/extensions/tn_lp_rewards/internal/engine_ops.go index c1c25f135..bbd8e9aa4 100644 --- a/extensions/tn_lp_rewards/internal/engine_ops.go +++ b/extensions/tn_lp_rewards/internal/engine_ops.go @@ -6,9 +6,7 @@ import ( "strings" "github.com/trufnetwork/kwil-db/common" - "github.com/trufnetwork/kwil-db/core/crypto/auth" "github.com/trufnetwork/kwil-db/core/log" - ktypes "github.com/trufnetwork/kwil-db/core/types" "github.com/trufnetwork/kwil-db/node/types/sql" ) @@ -64,7 +62,7 @@ func (e *EngineOperations) LoadLPRewardsConfig(ctx context.Context) (bool, int, var ( enabled bool = true interval int = 10 - maxMarkets int = 50 + maxMarkets int = 1000 found bool ) @@ -164,87 +162,3 @@ func (e *EngineOperations) GetActiveMarkets(ctx context.Context, limit int) ([]i return markets, nil } - -// BroadcastSampleLPRewards broadcasts a sample_lp_rewards transaction -func (e *EngineOperations) BroadcastSampleLPRewards( - ctx context.Context, - chainID string, - signer auth.Signer, - broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), - queryID int, - blockHeight int64, -) error { - // Get signer account ID - signerAccountID, err := ktypes.GetSignerAccount(signer) - if err != nil { - return fmt.Errorf("get signer account: %w", err) - } - - // Fetch fresh nonce from database using validated read transaction - readTx, cleanup, err := e.getFreshReadTx(ctx) - if err != nil { - return fmt.Errorf("get fresh read tx: %w", err) - } - defer cleanup() - - var nextNonce uint64 - account, err := e.accounts.GetAccount(ctx, readTx, signerAccountID) - if err != nil { - if !isAccountNotFoundError(err) { - return fmt.Errorf("get account: %w", err) - } - nextNonce = 1 - } else { - nextNonce = uint64(account.Nonce + 1) - } - - // Encode arguments for sample_lp_rewards action - // Parameters: $query_id INT, $block INT8 - queryIDArg, err := ktypes.EncodeValue(int64(queryID)) - if err != nil { - return fmt.Errorf("encode query_id: %w", err) - } - blockArg, err := ktypes.EncodeValue(blockHeight) - if err != nil { - return fmt.Errorf("encode block: %w", err) - } - - // Build ActionExecution payload - payload := &ktypes.ActionExecution{ - Namespace: "main", - Action: "sample_lp_rewards", - Arguments: [][]*ktypes.EncodedValue{{queryIDArg, blockArg}}, - } - - // Create transaction - tx, err := ktypes.CreateNodeTransaction(payload, chainID, nextNonce) - if err != nil { - return fmt.Errorf("create tx: %w", err) - } - - // Sign transaction - if err := tx.Sign(signer); err != nil { - return fmt.Errorf("sign tx: %w", err) - } - - // Broadcast with sync mode = 1 (wait for commit) to ensure nonce increments properly - // before broadcasting next transaction - hash, txResult, err := broadcaster(ctx, tx, 1) - if err != nil { - return fmt.Errorf("broadcast tx: %w", err) - } - - // Check immediate result (may not have error yet in async mode) - if txResult != nil && txResult.Code != uint32(ktypes.CodeOk) { - return fmt.Errorf("transaction failed with code %d: %s", - txResult.Code, txResult.Log) - } - - e.logger.Debug("sample_lp_rewards transaction broadcast", - "query_id", queryID, - "block", blockHeight, - "tx_hash", hash.String(), - "nonce", nextNonce) - - return nil -} diff --git a/extensions/tn_lp_rewards/tn_lp_rewards.go b/extensions/tn_lp_rewards/tn_lp_rewards.go index b1694f467..6ee8ce08e 100644 --- a/extensions/tn_lp_rewards/tn_lp_rewards.go +++ b/extensions/tn_lp_rewards/tn_lp_rewards.go @@ -3,25 +3,14 @@ package tn_lp_rewards import ( "context" "fmt" - "net" - "net/url" - "strings" "sync" "sync/atomic" - "github.com/trufnetwork/kwil-db/app/key" - appconf "github.com/trufnetwork/kwil-db/app/node/conf" "github.com/trufnetwork/kwil-db/common" - "github.com/trufnetwork/kwil-db/config" - "github.com/trufnetwork/kwil-db/core/crypto/auth" "github.com/trufnetwork/kwil-db/core/log" - rpcclient "github.com/trufnetwork/kwil-db/core/rpc/client" - rpcuser "github.com/trufnetwork/kwil-db/core/rpc/client/user/jsonrpc" - ktypes "github.com/trufnetwork/kwil-db/core/types" "github.com/trufnetwork/kwil-db/extensions/hooks" "github.com/trufnetwork/kwil-db/extensions/precompiles" sql "github.com/trufnetwork/kwil-db/node/types/sql" - "github.com/trufnetwork/node/extensions/leaderwatch" "github.com/trufnetwork/node/extensions/tn_lp_rewards/internal" ) @@ -33,18 +22,6 @@ const ( DefaultMaxMarketsPerRun = 50 ) -// TxBroadcaster interface for broadcasting transactions -type TxBroadcaster interface { - BroadcastTx(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) -} - -// txBroadcasterFunc adapts a function to the TxBroadcaster interface -type txBroadcasterFunc func(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) - -func (f txBroadcasterFunc) BroadcastTx(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) { - return f(ctx, tx, sync) -} - // Extension holds the singleton state for LP rewards sampling type Extension struct { mu sync.RWMutex @@ -52,7 +29,6 @@ type Extension struct { logger log.Logger service *common.Service engOps *internal.EngineOperations - isLeader atomic.Bool // Configuration (loaded from database) enabled bool @@ -60,13 +36,6 @@ type Extension struct { maxMarketsPerRun int configReloadInterval int64 // Reload config every N blocks lastCheckedHeight int64 - - // Sampling state - prevents overlapping runs - isSampling atomic.Bool - - // Transaction broadcasting - signer auth.Signer - broadcaster TxBroadcaster } var ( @@ -111,13 +80,10 @@ func InitializeExtension() { panic(fmt.Sprintf("failed to register %s engine ready hook: %v", ExtensionName, err)) } - // Register with leaderwatch for leadership coordination - if err := leaderwatch.Register(ExtensionName, leaderwatch.Callbacks{ - OnAcquire: leaderAcquire, - OnLose: leaderLose, - OnEndBlock: leaderEndBlock, - }); err != nil { - panic(fmt.Sprintf("failed to register %s leader watcher: %v", ExtensionName, err)) + // Register end-block hook for KEYLESS internal sampling + // This hook runs on ALL nodes during block finalisation. + if err := hooks.RegisterEndBlockHook(ExtensionName+"_end_block", endBlockHook); err != nil { + panic(fmt.Sprintf("failed to register %s end block hook: %v", ExtensionName, err)) } } @@ -169,10 +135,7 @@ func engineReadyHook(ctx context.Context, app *common.App) error { } } - // Wire signer and broadcaster - wireSignerAndBroadcaster(app, ext) - - logger.Info("LP rewards extension initialized", + logger.Info("LP rewards extension initialized (Internal Hook mode)", "enabled", ext.enabled, "sampling_interval_blocks", ext.samplingIntervalBlocks, "max_markets_per_run", ext.maxMarketsPerRun) @@ -180,134 +143,13 @@ func engineReadyHook(ctx context.Context, app *common.App) error { return nil } -// wireSignerAndBroadcaster fills in signer and broadcaster if not already set -func wireSignerAndBroadcaster(app *common.App, ext *Extension) { - if app == nil || app.Service == nil || app.Service.LocalConfig == nil { - return - } - // Signer (from node key file) - if ext.signer == nil { - rootDir := appconf.RootDir() - if rootDir == "" { - ext.logger.Warn("root dir is empty; cannot load node key for signer") - } else { - keyPath := config.NodeKeyFilePath(rootDir) - if pk, err := key.LoadNodeKey(keyPath); err != nil { - ext.logger.Warn("failed to load node key for signer; LP rewards disabled until available", "path", keyPath, "error", err) - } else { - ext.signer = auth.GetUserSigner(pk) - } - } - } - // Broadcaster (JSON-RPC user service) - if ext.broadcaster == nil { - // Optional override: [extensions.tn_lp_rewards].rpc_url - if m, ok := app.Service.LocalConfig.Extensions[ExtensionName]; ok { - if rpcURL := m["rpc_url"]; rpcURL != "" { - if u, err := normalizeListenAddressForClient(rpcURL); err == nil { - ext.broadcaster = makeBroadcasterFromURL(u) - return - } else { - ext.logger.Warn("invalid extensions.tn_lp_rewards.rpc_url; falling back to [rpc].listen", "error", err) - } - } - } - listen := app.Service.LocalConfig.RPC.ListenAddress - if listen == "" { - ext.logger.Warn("RPC listen address is empty; cannot create broadcaster") - } else if u, err := normalizeListenAddressForClient(listen); err != nil { - ext.logger.Warn("invalid RPC listen address; cannot create broadcaster", "addr", listen, "error", err) - } else { - ext.broadcaster = makeBroadcasterFromURL(u) - } - } -} - -// normalizeListenAddressForClient converts a server listen address into a client URL -func normalizeListenAddressForClient(listen string) (*url.URL, error) { - if listen == "" { - return nil, fmt.Errorf("empty listen address") - } - endpoint := listen - if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { - endpoint = "http://" + endpoint - } - u, err := url.Parse(endpoint) - if err != nil { - return nil, err - } - host, port, err := net.SplitHostPort(u.Host) - if err != nil { - cleanHost := strings.Trim(u.Host, "[]") - if cleanHost == "" { - u.Host = "127.0.0.1" - } else if ip := net.ParseIP(cleanHost); ip != nil && ip.IsUnspecified() { - u.Host = "127.0.0.1" - } - } else { - cleanHost := strings.Trim(host, "[]") - if cleanHost == "" { - u.Host = net.JoinHostPort("127.0.0.1", port) - } else if ip := net.ParseIP(cleanHost); ip != nil && ip.IsUnspecified() { - u.Host = net.JoinHostPort("127.0.0.1", port) - } - } - return u, nil -} - -// makeBroadcasterFromURL creates a TxBroadcaster backed by the user JSON-RPC client -func makeBroadcasterFromURL(u *url.URL) TxBroadcaster { - userClient := rpcuser.NewClient(u) - return txBroadcasterFunc(func(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) { - // Map sync flag to broadcast mode (0 for accept-only, 1 for wait-commit) - mode := rpcclient.BroadcastWaitAccept - if sync == 1 { - mode = rpcclient.BroadcastWaitCommit - } - h, err := userClient.Broadcast(ctx, tx, mode) - if err != nil { - return ktypes.Hash{}, nil, err - } - // For accept mode, we don't need to query the result (just mempool acceptance) - if mode == rpcclient.BroadcastWaitAccept { - return h, nil, nil - } - // For commit mode, query the result - txQueryResp, err := userClient.TxQuery(ctx, h) - if err != nil { - return h, nil, fmt.Errorf("failed to query transaction result: %w", err) - } - if txQueryResp == nil || txQueryResp.Result == nil { - return h, nil, nil - } - return h, txQueryResp.Result, nil - }) -} - -// leaderAcquire is called when this node becomes leader -func leaderAcquire(ctx context.Context, app *common.App, block *common.BlockContext) { - ext := GetExtension() - ext.isLeader.Store(true) - ext.logger.Info("acquired leadership, LP rewards sampling enabled") -} - -// leaderLose is called when this node loses leadership -func leaderLose(ctx context.Context, app *common.App, block *common.BlockContext) { - ext := GetExtension() - ext.isLeader.Store(false) - ext.logger.Info("lost leadership, LP rewards sampling disabled") -} - -// leaderEndBlock is called at the end of each block when this node is leader -func leaderEndBlock(ctx context.Context, app *common.App, block *common.BlockContext) { +// endBlockHook is called on every node at the end of each block. +// It performs reward sampling as a consensus rule (no keys/nonces needed). +func endBlockHook(ctx context.Context, app *common.App, block *common.BlockContext) error { ext := GetExtension() - if !ext.isLeader.Load() { - return - } - - if block == nil { - return + if block == nil || ext.engOps == nil { + return nil } blockHeight := block.Height @@ -320,32 +162,24 @@ func leaderEndBlock(ctx context.Context, app *common.App, block *common.BlockCon maxMarketsPerRun := ext.maxMarketsPerRun ext.mu.RUnlock() - // Reload config periodically (do this BEFORE enabled check so we can re-enable without restart) + // Reload config periodically if configReloadInterval > 0 && blockHeight-atomic.LoadInt64(&ext.lastCheckedHeight) >= configReloadInterval { - // Use background context since EndBlockHook context is canceled when block ends - go ext.reloadConfig(context.Background()) + ext.reloadConfig(ctx) atomic.StoreInt64(&ext.lastCheckedHeight, blockHeight) } if !enabled { - return + return nil } // Check if it's time to sample (every N blocks) if samplingIntervalBlocks <= 0 || blockHeight%samplingIntervalBlocks != 0 { - return - } - - // Skip if previous sampling run is still in progress (prevents nonce conflicts) - if !ext.isSampling.CompareAndSwap(false, true) { - ext.logger.Warn("skipping LP rewards sampling - previous run still in progress", - "block", blockHeight) - return + return nil } - // Sample LP rewards in background with background context - // (EndBlockHook context is canceled when block processing ends) - go ext.sampleLPRewardsWithConfig(context.Background(), blockHeight, maxMarketsPerRun) + // Execute sampling logic directly against the engine (Internal call) + // This is deterministic and run by all nodes at the same height. + return ext.performInternalSampling(ctx, app, blockHeight, maxMarketsPerRun) } // reloadConfig reloads configuration from database @@ -372,69 +206,30 @@ func (ext *Extension) reloadConfig(ctx context.Context) { "max_markets_per_run", maxMarkets) } -// sampleLPRewardsWithConfig samples LP rewards for all active markets using provided config -func (ext *Extension) sampleLPRewardsWithConfig(ctx context.Context, blockHeight int64, maxMarketsPerRun int) { - // Always clear the sampling flag when done - defer ext.isSampling.Store(false) +// performInternalSampling executes sampling logic for all active markets in a single batch. +// This runs within the block execution transaction. +func (ext *Extension) performInternalSampling(ctx context.Context, app *common.App, blockHeight int64, maxMarketsPerRun int) error { + ext.logger.Info("performing batch LP rewards sampling", + "block", blockHeight, + "limit", maxMarketsPerRun) - if ext.engOps == nil || ext.signer == nil || ext.broadcaster == nil { - ext.logger.Warn("LP rewards extension not fully initialized") - return - } + // Call sample_all_active_lp_rewards directly via Engine (Internal execution) + // This performs the entire sampling loop inside a single SQL context for efficiency. + res, err := app.Engine.CallWithoutEngineCtx(ctx, app.DB, "main", "sample_all_active_lp_rewards", []any{ + blockHeight, + int64(maxMarketsPerRun), + }, nil) - // Get active markets - markets, err := ext.engOps.GetActiveMarkets(ctx, maxMarketsPerRun) if err != nil { - ext.logger.Warn("failed to get active markets", "error", err) - return + ext.logger.Error("batch internal sampling failed", "error", err) + return nil // We don't return error to avoid halting network on logic issues } - if len(markets) == 0 { - ext.logger.Debug("no active markets to sample", "block", blockHeight) - return - } - - ext.logger.Info("sampling LP rewards", - "block", blockHeight, - "market_count", len(markets)) - - // Get chain ID from service - chainID := "" - if ext.service != nil && ext.service.GenesisConfig != nil { - chainID = ext.service.GenesisConfig.ChainID - } - - // Sample each market sequentially to avoid nonce conflicts - // Each transaction must complete before the next one starts - successCount := 0 - failCount := 0 - for _, queryID := range markets { - err := ext.engOps.BroadcastSampleLPRewards( - ctx, - chainID, - ext.signer, - ext.broadcaster.BroadcastTx, - queryID, - blockHeight, - ) - if err != nil { - ext.logger.Warn("failed to sample LP rewards", - "query_id", queryID, - "block", blockHeight, - "error", err) - failCount++ - // Continue to next market even if this one fails - continue - } - - ext.logger.Debug("sampled LP rewards", - "query_id", queryID, - "block", blockHeight) - successCount++ + if res.Error != nil { + ext.logger.Warn("batch sampling execution error", "error", res.Error) + } else { + ext.logger.Info("batch internal LP rewards sampling completed", "block", blockHeight) } - ext.logger.Info("LP rewards sampling completed", - "block", blockHeight, - "success", successCount, - "failed", failCount) + return nil } diff --git a/internal/benchmark/types.go b/internal/benchmark/types.go index 2740d3bc9..e6377d9e2 100644 --- a/internal/benchmark/types.go +++ b/internal/benchmark/types.go @@ -4,7 +4,7 @@ import ( "time" "github.com/trufnetwork/sdk-go/core/util" - testutils "github.com/trufnetwork/node/tests/streams/utils" + "github.com/trufnetwork/node/tests/streams/utils/cache" ) type ( @@ -17,7 +17,7 @@ type ( Samples int Procedures []ProcedureEnum CacheEnabled bool // Whether to enable cache for this benchmark case - CacheConfig *testutils.CacheOptions // Optional custom cache configuration + CacheConfig *cache.CacheOptions // Optional custom cache configuration } Result struct { Case BenchmarkCase diff --git a/internal/migrations/034-order-book-rewards.sql b/internal/migrations/034-order-book-rewards.sql index e93f4d6fc..10ee897bf 100644 --- a/internal/migrations/034-order-book-rewards.sql +++ b/internal/migrations/034-order-book-rewards.sql @@ -120,7 +120,7 @@ CREATE INDEX IF NOT EXISTS idx_ob_rewards_query_block ON ob_rewards(query_id, bl CREATE OR REPLACE ACTION sample_lp_rewards( $query_id INT, $block INT8 -) PUBLIC { +) PRIVATE { -- Check if this block was already sampled to prevent duplicate key errors -- This handles retries, race conditions, and scheduler overlap for $row in SELECT 1 FROM ob_rewards WHERE query_id = $query_id AND block = $block LIMIT 1 { @@ -181,9 +181,10 @@ CREATE OR REPLACE ACTION sample_lp_rewards( for $pos in SELECT p1.participant_id, - p1.price as yes_price, + p1.price as p1_price, + p1.outcome as p1_outcome, p1.amount, - p2.price as no_price + p2.price as p2_price FROM ob_positions p1 JOIN ob_positions p2 ON p1.query_id = p2.query_id @@ -193,45 +194,54 @@ CREATE OR REPLACE ACTION sample_lp_rewards( WHERE p1.query_id = $query_id { $pid INT := $pos.participant_id; - $yes_price INT := $pos.yes_price; - $no_price INT := $pos.no_price; $amount INT := $pos.amount; - -- Price range is 0-100 cents (100 represents $1.00) - -- Constraint: yes_price = 100 + no_price - -- This ensures we only process (SELL, BUY) pairs, not (BUY, SELL) duplicates - -- Example: SELL YES @ 48 + BUY NO @ 52 → 48 = 100 + (-52) ✅ - if $yes_price == 100 + $no_price { - -- Calculate distances from midpoint using ABS() - -- Handles both positive (SELL) and negative (BUY) prices - $yes_dist INT := $x_mid - $yes_price; - if $yes_dist < 0 { - $yes_dist := -$yes_dist; + -- The specification requires pairing a SELL order with a BUY order + -- such that Magnitude(Ask) + Magnitude(Bid) = 100. + -- Formula: Price1 = 100 + Price2 (where Price1 > 0 and Price2 < 0) + if $pos.p1_price == 100 + $pos.p2_price { + -- Assign prices based on outcome + $yes_price INT; + $no_price INT; + if $pos.p1_outcome == TRUE { + $yes_price := $pos.p1_price; + $no_price := $pos.p2_price; + } else { + $yes_price := $pos.p2_price; + $no_price := $pos.p1_price; } - $no_dist INT := (100 - $x_mid) - (-$no_price); - if $no_dist < 0 { - $no_dist := -$no_dist; - } + -- Convert to positive magnitudes for distance check (abs workaround) + $yes_mag INT := $yes_price; + if $yes_mag < 0 { $yes_mag := -$yes_mag; } + $no_mag INT := $no_price; + if $no_mag < 0 { $no_mag := -$no_mag; } + + -- Calculate distances from midpoint magnitudes + $yes_dist INT := $x_mid - $yes_mag; + if $yes_dist < 0 { $yes_dist := -$yes_dist; } + + $no_dist INT := (100 - $x_mid) - $no_mag; + if $no_dist < 0 { $no_dist := -$no_dist; } -- Filter by spread (reference line 135-146) if $yes_dist < $x_spread AND $no_dist < $x_spread { - -- Get minimum distance (reference uses LEAST()) - $min_dist INT := $yes_dist; - if $no_dist < $yes_dist { - $min_dist := $no_dist; - } + -- Get minimum distance (reference uses LEAST()) + $min_dist INT := $yes_dist; + if $no_dist < $yes_dist { + $min_dist := $no_dist; + } - -- Calculate score (reference line 74: amount * POWER((spread - dist) / spread, 2)) - $spread_minus_dist INT := $x_spread - $min_dist; - $score NUMERIC(20,4) := ( - $amount::NUMERIC(20,4) * - ($spread_minus_dist * $spread_minus_dist)::NUMERIC(20,4) / - ($x_spread * $x_spread)::NUMERIC(20,4) - ); + -- Calculate score (reference line 74: amount * POWER((spread - dist) / spread, 2)) + $spread_minus_dist INT := $x_spread - $min_dist; + $score NUMERIC(20,4) := ( + $amount::NUMERIC(20,4) * + ($spread_minus_dist * $spread_minus_dist)::NUMERIC(20,4) / + ($x_spread * $x_spread)::NUMERIC(20,4) + ); - INSERT INTO ob_rewards (query_id, participant_id, block, reward_percent) - VALUES ($query_id, $pid, $block, $score::NUMERIC(5,2)); + INSERT INTO ob_rewards (query_id, participant_id, block, reward_percent) + VALUES ($query_id, $pid, $block, $score::NUMERIC(5,2)); } } } @@ -260,3 +270,38 @@ CREATE OR REPLACE ACTION sample_lp_rewards( } } }; + +-- ============================================================================= +-- sample_all_active_lp_rewards: Batch sampling for all active markets +-- ============================================================================= +/** + * Iterates through all active markets and performs reward sampling. + * This consolidated call is more efficient than calling sample_lp_rewards + * multiple times from an external process. + * + * Parameters: + * - $block: The block height to record for all samples + * - $market_limit: Maximum number of markets to process in this run + */ +CREATE OR REPLACE ACTION sample_all_active_lp_rewards( + $block INT8, + $market_limit INT +) PRIVATE { + if $block IS NULL OR $block <= 0 { + ERROR('block height is required and must be positive'); + } + + if $market_limit IS NULL OR $market_limit <= 0 { + ERROR('market_limit is required and must be positive'); + } + + for $market in SELECT id FROM ob_queries + WHERE settled = false + ORDER BY id ASC + LIMIT $market_limit + { + -- Call the helper action for each market + sample_lp_rewards($market.id, $block); + } +}; + diff --git a/tests/streams/order_book/fee_distribution_audit_test.go b/tests/streams/order_book/fee_distribution_audit_test.go index c281ce9ef..00c5159fd 100644 --- a/tests/streams/order_book/fee_distribution_audit_test.go +++ b/tests/streams/order_book/fee_distribution_audit_test.go @@ -78,7 +78,7 @@ func testAuditRecordCreation(t *testing.T) func(context.Context, *kwilTesting.Pl setupLPScenario(t, ctx, platform, &user1, &user2, int(marketID)) // Sample LP rewards - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) // Fund vault and call distribute_fees @@ -201,11 +201,11 @@ func testAuditMultiBlock(t *testing.T) func(context.Context, *kwilTesting.Platfo setupLPScenario(t, ctx, platform, &user1, &user2, int(marketID)) // Sample 3 blocks - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 2000, nil) + err = triggerBatchSampling(ctx, platform, 2000) require.NoError(t, err) - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 3000, nil) + err = triggerBatchSampling(ctx, platform, 3000) require.NoError(t, err) // Distribute fees @@ -394,7 +394,7 @@ func testAuditDataIntegrity(t *testing.T) func(context.Context, *kwilTesting.Pla require.NoError(t, err) // Sample - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) // Distribute diff --git a/tests/streams/order_book/fee_distribution_test.go b/tests/streams/order_book/fee_distribution_test.go index 1acff3377..e4b47a505 100644 --- a/tests/streams/order_book/fee_distribution_test.go +++ b/tests/streams/order_book/fee_distribution_test.go @@ -108,7 +108,7 @@ func testDistribution1Block2LPs(t *testing.T) func(context.Context, *kwilTesting require.NoError(t, err) // Sample LP rewards at block 1000 - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) // Verify rewards were recorded @@ -278,11 +278,11 @@ func testDistribution3Blocks2LPs(t *testing.T) func(context.Context, *kwilTestin require.NoError(t, err) // Sample LP rewards at 3 different blocks - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 2000, nil) + err = triggerBatchSampling(ctx, platform, 2000) require.NoError(t, err) - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 3000, nil) + err = triggerBatchSampling(ctx, platform, 3000) require.NoError(t, err) // Verify rewards were recorded for all 3 blocks @@ -558,7 +558,7 @@ func testDistributionZeroFees(t *testing.T) func(context.Context, *kwilTesting.P require.NoError(t, err) // Sample LP rewards - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) // Get balances before distribution @@ -666,7 +666,7 @@ func testDistribution1LP(t *testing.T) func(context.Context, *kwilTesting.Platfo require.NoError(t, err) // Sample LP rewards at block 1000 - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) // Verify only 1 LP recorded diff --git a/tests/streams/order_book/rewards_test.go b/tests/streams/order_book/rewards_test.go index 9c2c78e1a..c5e6b879f 100644 --- a/tests/streams/order_book/rewards_test.go +++ b/tests/streams/order_book/rewards_test.go @@ -20,27 +20,29 @@ import ( "github.com/trufnetwork/sdk-go/core/util" ) -func callSampleLPRewards(ctx context.Context, platform *kwilTesting.Platform, signer *util.EthereumAddress, queryID int, block int64, resultFn func(*common.Row) error) error { - tx := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: 1, - Timestamp: time.Now().Unix(), - }, - Signer: signer.Bytes(), - Caller: signer.Address(), - TxID: platform.Txid(), - Authenticator: coreauth.EthPersonalSignAuth, +// triggerDirectSampling allows testing individual market sampling (e.g. for error cases) +func triggerDirectSampling(ctx context.Context, platform *kwilTesting.Platform, queryID int, block int64) error { + res, err := platform.Engine.CallWithoutEngineCtx(ctx, platform.DB, "main", "sample_lp_rewards", []any{int64(queryID), block}, nil) + if err != nil { + return err } - engineCtx := &common.EngineContext{TxContext: tx} + if res.Error != nil { + return fmt.Errorf("%s", res.Error.Error()) + } + return nil +} - res, err := platform.Engine.Call( - engineCtx, +// triggerBatchSampling simulates the EndBlockHook calling the PRIVATE sample_all_active_lp_rewards action. +func triggerBatchSampling(ctx context.Context, platform *kwilTesting.Platform, block int64) error { + // We use CallWithoutEngineCtx because internal hooks run without an external caller/signer. + // This matches the new batch logic in tn_lp_rewards.go. + res, err := platform.Engine.CallWithoutEngineCtx( + ctx, platform.DB, - "", - "sample_lp_rewards", - []any{queryID, block}, - resultFn, + "main", + "sample_all_active_lp_rewards", + []any{block, int64(1000)}, + nil, ) if err != nil { return err @@ -169,8 +171,8 @@ func testSampleRewardsMarketSettled(t *testing.T) func(context.Context, *kwilTes ) require.NoError(t, err) - // Try to sample settled market - err = callSampleLPRewards(ctx, platform, &userAddr, int(marketID), 1000, nil) + // Try to sample settled market directly + err = triggerDirectSampling(ctx, platform, int(marketID), 1000) require.Error(t, err) require.Contains(t, err.Error(), "Market is already settled") @@ -206,7 +208,7 @@ func testSampleRewardsNoOrderBook(t *testing.T) func(context.Context, *kwilTesti require.NoError(t, err) // Sample should succeed but produce no rewards (empty order book) - err = callSampleLPRewards(ctx, platform, &userAddr, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) // Verify no rewards were inserted @@ -251,7 +253,7 @@ func testSampleRewardsIncompleteOrderBook(t *testing.T) func(context.Context, *k require.NoError(t, err) // Sample should succeed but produce no rewards (incomplete order book) - err = callSampleLPRewards(ctx, platform, &userAddr, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) // Verify no rewards @@ -296,7 +298,7 @@ func testSampleRewardsSpread5Cents(t *testing.T) func(context.Context, *kwilTest require.NoError(t, err) // Sample rewards - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 1000, nil) + err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) // Verify spread was 5¢ (we can't check spread directly, but rewards should be generated) @@ -341,7 +343,7 @@ func testSampleRewardsSpread4Cents(t *testing.T) func(context.Context, *kwilTest require.NoError(t, err) // Sample rewards - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 2000, nil) + err = triggerBatchSampling(ctx, platform, 2000) require.NoError(t, err) rewards, err := getRewards(ctx, platform, int(marketID), 2000) @@ -385,7 +387,7 @@ func testSampleRewardsSpread3Cents(t *testing.T) func(context.Context, *kwilTest require.NoError(t, err) // Sample rewards - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 3000, nil) + err = triggerBatchSampling(ctx, platform, 3000) require.NoError(t, err) rewards, err := getRewards(ctx, platform, int(marketID), 3000) @@ -429,7 +431,7 @@ func testSampleRewardsIneligibleMarket(t *testing.T) func(context.Context, *kwil require.NoError(t, err) // Sample should succeed but produce no rewards (ineligible spread) - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 4000, nil) + err = triggerBatchSampling(ctx, platform, 4000) require.NoError(t, err) rewards, err := getRewards(ctx, platform, int(marketID), 4000) @@ -489,7 +491,7 @@ func testSampleRewardsSingleLP(t *testing.T) func(context.Context, *kwilTesting. require.NoError(t, err) // Sample rewards - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 5000, nil) + err = triggerBatchSampling(ctx, platform, 5000) require.NoError(t, err) rewards, err := getRewards(ctx, platform, int(marketID), 5000) @@ -566,7 +568,7 @@ func testSampleRewardsTwoLPs(t *testing.T) func(context.Context, *kwilTesting.Pl require.NoError(t, err) // Sample rewards - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 6000, nil) + err = triggerBatchSampling(ctx, platform, 6000) require.NoError(t, err) rewards, err := getRewards(ctx, platform, int(marketID), 6000) @@ -660,7 +662,7 @@ func testSampleRewardsMultipleLPs(t *testing.T) func(context.Context, *kwilTesti require.NoError(t, err) // Sample rewards - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 7000, nil) + err = triggerBatchSampling(ctx, platform, 7000) require.NoError(t, err) rewards, err := getRewards(ctx, platform, int(marketID), 7000) @@ -722,7 +724,7 @@ func testSampleRewardsNoQualifyingOrders(t *testing.T) func(context.Context, *kw // Midpoint will be around 50¢ with spread distance > threshold // Sample should produce no rewards - err = callSampleLPRewards(ctx, platform, &user1, int(marketID), 8000, nil) + err = triggerBatchSampling(ctx, platform, 8000) require.NoError(t, err) rewards, err := getRewards(ctx, platform, int(marketID), 8000) @@ -781,7 +783,7 @@ func testConstraintSellBuyPair(t *testing.T) func(context.Context, *kwilTesting. require.NoError(t, err) // Sample rewards - err = callSampleLPRewards(ctx, platform, &user1, int(queryID), 9000, nil) + err = triggerBatchSampling(ctx, platform, 9000) require.NoError(t, err) // Verify rewards generated @@ -841,7 +843,7 @@ func testConstraintNoDuplicates(t *testing.T) func(context.Context, *kwilTesting require.NoError(t, err) // Sample rewards - err = callSampleLPRewards(ctx, platform, &user2, int(queryID), 10000, nil) + err = triggerBatchSampling(ctx, platform, 10000) require.NoError(t, err) // Count reward rows - should be exactly 1, not 2 From 08b99a1b19661d6cf5d10fc44b10067c64d83393 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Fri, 27 Feb 2026 19:06:23 +0700 Subject: [PATCH 2/3] chore: apply suggestion --- extensions/tn_lp_rewards/internal/engine_ops.go | 8 ++++---- internal/migrations/034-order-book-rewards.sql | 4 ++-- tests/streams/order_book/rewards_test.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/extensions/tn_lp_rewards/internal/engine_ops.go b/extensions/tn_lp_rewards/internal/engine_ops.go index bbd8e9aa4..cc8fc5caf 100644 --- a/extensions/tn_lp_rewards/internal/engine_ops.go +++ b/extensions/tn_lp_rewards/internal/engine_ops.go @@ -68,7 +68,7 @@ func (e *EngineOperations) LoadLPRewardsConfig(ctx context.Context) (bool, int, db, cleanup, err := e.getFreshReadTx(ctx) if err != nil { - return true, 10, 50, fmt.Errorf("get fresh read tx: %w", err) + return enabled, interval, maxMarkets, fmt.Errorf("get fresh read tx: %w", err) } defer cleanup() @@ -104,12 +104,12 @@ func (e *EngineOperations) LoadLPRewardsConfig(ctx context.Context) (bool, int, strings.Contains(msg, "undefined table") || strings.Contains(msg, "not found")) { e.logger.Info("lp_rewards_config table not found; using defaults") - return true, 10, 50, nil + return enabled, interval, maxMarkets, nil } - return true, 10, 50, err + return enabled, interval, maxMarkets, err } if !found { - return true, 10, 50, nil + return enabled, interval, maxMarkets, nil } return enabled, interval, maxMarkets, nil } diff --git a/internal/migrations/034-order-book-rewards.sql b/internal/migrations/034-order-book-rewards.sql index 10ee897bf..aa238131d 100644 --- a/internal/migrations/034-order-book-rewards.sql +++ b/internal/migrations/034-order-book-rewards.sql @@ -32,9 +32,9 @@ * - Reward percentages sum to 100% (±0.01% for rounding) per block * * Sampling Trigger: - * - External system calls sample_lp_rewards($query_id, $block) periodically + * - Internal system calls sample_lp_rewards($query_id, $block) periodically via EndBlockHook * - Recommended: Every 50 blocks (~10 minutes with 12s block times) - * - Can be called by anyone (PUBLIC action) + * - Can only be invoked internally (PRIVATE action) * * Distribution: * - At settlement, total fees divided equally across all sampled blocks diff --git a/tests/streams/order_book/rewards_test.go b/tests/streams/order_book/rewards_test.go index c5e6b879f..c93e132fa 100644 --- a/tests/streams/order_book/rewards_test.go +++ b/tests/streams/order_book/rewards_test.go @@ -27,7 +27,7 @@ func triggerDirectSampling(ctx context.Context, platform *kwilTesting.Platform, return err } if res.Error != nil { - return fmt.Errorf("%s", res.Error.Error()) + return res.Error } return nil } @@ -48,7 +48,7 @@ func triggerBatchSampling(ctx context.Context, platform *kwilTesting.Platform, b return err } if res.Error != nil { - return fmt.Errorf("%s", res.Error.Error()) + return res.Error } return nil } From 5acb8d0037bf1aaea736eb202b3b6f7526e311a2 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Fri, 27 Feb 2026 22:36:36 +0700 Subject: [PATCH 3/3] chore: increase limit to 1000 --- internal/migrations/041-settlement-config-actions.sql | 10 +++++----- internal/migrations/042-lp-rewards-config.sql | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/migrations/041-settlement-config-actions.sql b/internal/migrations/041-settlement-config-actions.sql index 467f05b45..1d2756107 100644 --- a/internal/migrations/041-settlement-config-actions.sql +++ b/internal/migrations/041-settlement-config-actions.sql @@ -19,7 +19,7 @@ UPDATE settlement_config SET enabled = true, settlement_schedule = '0,30 * * * *', - max_markets_per_run = 100, + max_markets_per_run = 1000, retry_attempts = 3, updated_at = 0 WHERE id = 1; @@ -33,11 +33,11 @@ WHERE id = 1; -- Parameters: -- - $enabled: Enable/disable automatic settlement (BOOL) -- - $schedule: Cron schedule for settlement checks (TEXT) --- - $max_markets_per_run: Max markets to process per job run (INT, 1-100) +-- - $max_markets_per_run: Max markets to process per job run (INT, 1-1000) -- - $retry_attempts: Number of retry attempts for failed settlements (INT, 1-10) -- -- Usage: --- kwil-cli call-action update_settlement_config bool:true text:'0,30 * * * *' int:100 int:3 +-- kwil-cli call-action update_settlement_config bool:true text:'0,30 * * * *' int:1000 int:3 CREATE OR REPLACE ACTION update_settlement_config( $enabled BOOL, $schedule TEXT, @@ -68,8 +68,8 @@ CREATE OR REPLACE ACTION update_settlement_config( ERROR('schedule cannot be empty'); } - if $max_markets_per_run IS NULL OR $max_markets_per_run < 1 OR $max_markets_per_run > 100 { - ERROR('max_markets_per_run must be between 1 and 100'); + if $max_markets_per_run IS NULL OR $max_markets_per_run < 1 OR $max_markets_per_run > 1000 { + ERROR('max_markets_per_run must be between 1 and 1000'); } if $retry_attempts IS NULL OR $retry_attempts < 1 OR $retry_attempts > 10 { diff --git a/internal/migrations/042-lp-rewards-config.sql b/internal/migrations/042-lp-rewards-config.sql index 5f2a5b6dd..9344c13ee 100644 --- a/internal/migrations/042-lp-rewards-config.sql +++ b/internal/migrations/042-lp-rewards-config.sql @@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS lp_rewards_config ( id INT PRIMARY KEY DEFAULT 1, enabled BOOL NOT NULL DEFAULT TRUE, sampling_interval_blocks INT NOT NULL DEFAULT 10, - max_markets_per_run INT NOT NULL DEFAULT 50, + max_markets_per_run INT NOT NULL DEFAULT 1000, CHECK (id = 1), CHECK (sampling_interval_blocks >= 1), CHECK (max_markets_per_run >= 1) @@ -41,7 +41,7 @@ CREATE TABLE IF NOT EXISTS lp_rewards_config ( -- Insert default configuration INSERT INTO lp_rewards_config (id, enabled, sampling_interval_blocks, max_markets_per_run) -VALUES (1, TRUE, 10, 50) +VALUES (1, TRUE, 10, 1000) ON CONFLICT (id) DO NOTHING; -- ============================================================================ @@ -66,7 +66,7 @@ PUBLIC VIEW RETURNS ( RETURN $row.enabled, $row.sampling_interval_blocks, $row.max_markets_per_run; } -- Default if no row exists - RETURN TRUE, 10, 50; + RETURN TRUE, 10, 1000; }; /**