diff --git a/.gitignore b/.gitignore index 54913412d..4cf105ef5 100644 --- a/.gitignore +++ b/.gitignore @@ -32,4 +32,7 @@ deployments/network /gitignore .task -CLAUDE.md \ No newline at end of file +CLAUDE.md + +# dev-only maintenance files +maintenance \ No newline at end of file diff --git a/extensions/register.go b/extensions/register.go index 0a411f845..b5a30c0dd 100644 --- a/extensions/register.go +++ b/extensions/register.go @@ -6,6 +6,7 @@ import ( "github.com/trufnetwork/node/extensions/tn_attestation" "github.com/trufnetwork/node/extensions/tn_cache" "github.com/trufnetwork/node/extensions/tn_digest" + "github.com/trufnetwork/node/extensions/tn_lp_rewards" "github.com/trufnetwork/node/extensions/tn_settlement" "github.com/trufnetwork/node/extensions/tn_vacuum" "github.com/trufnetwork/node/extensions/tn_utils" @@ -17,6 +18,7 @@ func init() { tn_cache.InitializeExtension() tn_digest.InitializeExtension() tn_settlement.InitializeExtension() + tn_lp_rewards.InitializeExtension() tn_vacuum.InitializeExtension() tn_attestation.InitializeExtension() database_size.InitializeExtension() diff --git a/extensions/tn_lp_rewards/internal/engine_ops.go b/extensions/tn_lp_rewards/internal/engine_ops.go new file mode 100644 index 000000000..c1c25f135 --- /dev/null +++ b/extensions/tn_lp_rewards/internal/engine_ops.go @@ -0,0 +1,250 @@ +package internal + +import ( + "context" + "fmt" + "strings" + + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto/auth" + "github.com/trufnetwork/kwil-db/core/log" + ktypes "github.com/trufnetwork/kwil-db/core/types" + "github.com/trufnetwork/kwil-db/node/types/sql" +) + +// EngineOperations wraps engine calls needed by the LP rewards extension +type EngineOperations struct { + engine common.Engine + logger log.Logger + db sql.DB + dbPool sql.DelayedReadTxMaker + accounts common.Accounts +} + +// NewEngineOperations creates a new EngineOperations instance +func NewEngineOperations(engine common.Engine, db sql.DB, dbPool sql.DelayedReadTxMaker, accounts common.Accounts, logger log.Logger) *EngineOperations { + return &EngineOperations{ + engine: engine, + db: db, + dbPool: dbPool, + accounts: accounts, + logger: logger.New("lp_rewards_ops"), + } +} + +// isAccountNotFoundError checks if the error indicates an account was not found +func isAccountNotFoundError(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "not found") || strings.Contains(msg, "no rows") +} + +// getFreshReadTx returns a fresh database connection for read operations +func (e *EngineOperations) getFreshReadTx(ctx context.Context) (sql.DB, func(), error) { + if e.dbPool != nil { + readTx := e.dbPool.BeginDelayedReadTx() + cleanup := func() { + readTx.Rollback(ctx) + } + return readTx, cleanup, nil + } + // Fallback to stored db connection + if e.db == nil { + return nil, func() {}, fmt.Errorf("no database connection available (both dbPool and db are nil)") + } + e.logger.Warn("dbPool is nil, falling back to stored db connection (may be stale)") + return e.db, func() {}, nil +} + +// LoadLPRewardsConfig reads the LP rewards configuration +// Returns (enabled, samplingIntervalBlocks, maxMarketsPerRun, error) +func (e *EngineOperations) LoadLPRewardsConfig(ctx context.Context) (bool, int, int, error) { + var ( + enabled bool = true + interval int = 10 + maxMarkets int = 50 + found bool + ) + + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return true, 10, 50, fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + + err = e.engine.ExecuteWithoutEngineCtx(ctx, db, + `SELECT enabled, sampling_interval_blocks, max_markets_per_run + FROM main.lp_rewards_config WHERE id = 1`, nil, + func(row *common.Row) error { + if len(row.Values) >= 3 { + if v, ok := row.Values[0].(bool); ok { + enabled = v + } + if v, ok := row.Values[1].(int); ok { + interval = v + } else if v64, ok := row.Values[1].(int64); ok { + interval = int(v64) + } + if v, ok := row.Values[2].(int); ok { + maxMarkets = v + } else if v64, ok := row.Values[2].(int64); ok { + maxMarkets = int(v64) + } + found = true + } + return nil + }) + + if err != nil { + // Tolerate missing table + msg := strings.ToLower(err.Error()) + if strings.Contains(msg, "lp_rewards_config") && + (strings.Contains(msg, "does not exist") || + strings.Contains(msg, "no such table") || + strings.Contains(msg, "undefined table") || + strings.Contains(msg, "not found")) { + e.logger.Info("lp_rewards_config table not found; using defaults") + return true, 10, 50, nil + } + return true, 10, 50, err + } + if !found { + return true, 10, 50, nil + } + return enabled, interval, maxMarkets, nil +} + +// GetActiveMarkets returns active (unsettled) market IDs +func (e *EngineOperations) GetActiveMarkets(ctx context.Context, limit int) ([]int, error) { + var markets []int + + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return nil, fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + + query := ` + SELECT id FROM ob_queries + WHERE settled = false + ORDER BY id ASC + LIMIT $limit + ` + + err = e.engine.ExecuteWithoutEngineCtx(ctx, db, query, + map[string]any{"limit": int64(limit)}, + func(row *common.Row) error { + if len(row.Values) >= 1 { + switch v := row.Values[0].(type) { + case int: + markets = append(markets, v) + case int32: + markets = append(markets, int(v)) + case int64: + markets = append(markets, int(v)) + default: + return fmt.Errorf("unexpected type for id: %T", v) + } + } + return nil + }) + + if err != nil { + // Tolerate missing table (migrations not run yet) + msg := strings.ToLower(err.Error()) + if strings.Contains(msg, "ob_queries") && + (strings.Contains(msg, "does not exist") || + strings.Contains(msg, "no such table")) { + return nil, nil + } + return nil, fmt.Errorf("query active markets: %w", err) + } + + return markets, nil +} + +// BroadcastSampleLPRewards broadcasts a sample_lp_rewards transaction +func (e *EngineOperations) BroadcastSampleLPRewards( + ctx context.Context, + chainID string, + signer auth.Signer, + broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), + queryID int, + blockHeight int64, +) error { + // Get signer account ID + signerAccountID, err := ktypes.GetSignerAccount(signer) + if err != nil { + return fmt.Errorf("get signer account: %w", err) + } + + // Fetch fresh nonce from database using validated read transaction + readTx, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + + var nextNonce uint64 + account, err := e.accounts.GetAccount(ctx, readTx, signerAccountID) + if err != nil { + if !isAccountNotFoundError(err) { + return fmt.Errorf("get account: %w", err) + } + nextNonce = 1 + } else { + nextNonce = uint64(account.Nonce + 1) + } + + // Encode arguments for sample_lp_rewards action + // Parameters: $query_id INT, $block INT8 + queryIDArg, err := ktypes.EncodeValue(int64(queryID)) + if err != nil { + return fmt.Errorf("encode query_id: %w", err) + } + blockArg, err := ktypes.EncodeValue(blockHeight) + if err != nil { + return fmt.Errorf("encode block: %w", err) + } + + // Build ActionExecution payload + payload := &ktypes.ActionExecution{ + Namespace: "main", + Action: "sample_lp_rewards", + Arguments: [][]*ktypes.EncodedValue{{queryIDArg, blockArg}}, + } + + // Create transaction + tx, err := ktypes.CreateNodeTransaction(payload, chainID, nextNonce) + if err != nil { + return fmt.Errorf("create tx: %w", err) + } + + // Sign transaction + if err := tx.Sign(signer); err != nil { + return fmt.Errorf("sign tx: %w", err) + } + + // Broadcast with sync mode = 1 (wait for commit) to ensure nonce increments properly + // before broadcasting next transaction + hash, txResult, err := broadcaster(ctx, tx, 1) + if err != nil { + return fmt.Errorf("broadcast tx: %w", err) + } + + // Check immediate result (may not have error yet in async mode) + if txResult != nil && txResult.Code != uint32(ktypes.CodeOk) { + return fmt.Errorf("transaction failed with code %d: %s", + txResult.Code, txResult.Log) + } + + e.logger.Debug("sample_lp_rewards transaction broadcast", + "query_id", queryID, + "block", blockHeight, + "tx_hash", hash.String(), + "nonce", nextNonce) + + return nil +} diff --git a/extensions/tn_lp_rewards/tn_lp_rewards.go b/extensions/tn_lp_rewards/tn_lp_rewards.go new file mode 100644 index 000000000..b1694f467 --- /dev/null +++ b/extensions/tn_lp_rewards/tn_lp_rewards.go @@ -0,0 +1,440 @@ +package tn_lp_rewards + +import ( + "context" + "fmt" + "net" + "net/url" + "strings" + "sync" + "sync/atomic" + + "github.com/trufnetwork/kwil-db/app/key" + appconf "github.com/trufnetwork/kwil-db/app/node/conf" + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/config" + "github.com/trufnetwork/kwil-db/core/crypto/auth" + "github.com/trufnetwork/kwil-db/core/log" + rpcclient "github.com/trufnetwork/kwil-db/core/rpc/client" + rpcuser "github.com/trufnetwork/kwil-db/core/rpc/client/user/jsonrpc" + ktypes "github.com/trufnetwork/kwil-db/core/types" + "github.com/trufnetwork/kwil-db/extensions/hooks" + "github.com/trufnetwork/kwil-db/extensions/precompiles" + sql "github.com/trufnetwork/kwil-db/node/types/sql" + "github.com/trufnetwork/node/extensions/leaderwatch" + "github.com/trufnetwork/node/extensions/tn_lp_rewards/internal" +) + +const ( + ExtensionName = "tn_lp_rewards" + + // Default configuration + DefaultSamplingIntervalBlocks = 10 + DefaultMaxMarketsPerRun = 50 +) + +// TxBroadcaster interface for broadcasting transactions +type TxBroadcaster interface { + BroadcastTx(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) +} + +// txBroadcasterFunc adapts a function to the TxBroadcaster interface +type txBroadcasterFunc func(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) + +func (f txBroadcasterFunc) BroadcastTx(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) { + return f(ctx, tx, sync) +} + +// Extension holds the singleton state for LP rewards sampling +type Extension struct { + mu sync.RWMutex + + logger log.Logger + service *common.Service + engOps *internal.EngineOperations + isLeader atomic.Bool + + // Configuration (loaded from database) + enabled bool + samplingIntervalBlocks int64 + maxMarketsPerRun int + configReloadInterval int64 // Reload config every N blocks + lastCheckedHeight int64 + + // Sampling state - prevents overlapping runs + isSampling atomic.Bool + + // Transaction broadcasting + signer auth.Signer + broadcaster TxBroadcaster +} + +var ( + extensionInstance *Extension + extensionMu sync.RWMutex +) + +// GetExtension returns the singleton extension instance +func GetExtension() *Extension { + extensionMu.RLock() + if extensionInstance != nil { + defer extensionMu.RUnlock() + return extensionInstance + } + extensionMu.RUnlock() + + extensionMu.Lock() + defer extensionMu.Unlock() + + if extensionInstance == nil { + extensionInstance = &Extension{ + logger: log.New(log.WithLevel(log.LevelInfo)), + enabled: true, + samplingIntervalBlocks: DefaultSamplingIntervalBlocks, + maxMarketsPerRun: DefaultMaxMarketsPerRun, + configReloadInterval: 100, + } + } + return extensionInstance +} + +// InitializeExtension registers hooks needed by this extension +func InitializeExtension() { + // Register precompile to make extension visible in logs + err := precompiles.RegisterInitializer(ExtensionName, initializePrecompile) + if err != nil { + panic(fmt.Sprintf("failed to register %s initializer: %v", ExtensionName, err)) + } + + // Register engine ready hook + if err := hooks.RegisterEngineReadyHook(ExtensionName+"_engine_ready", engineReadyHook); err != nil { + panic(fmt.Sprintf("failed to register %s engine ready hook: %v", ExtensionName, err)) + } + + // Register with leaderwatch for leadership coordination + if err := leaderwatch.Register(ExtensionName, leaderwatch.Callbacks{ + OnAcquire: leaderAcquire, + OnLose: leaderLose, + OnEndBlock: leaderEndBlock, + }); err != nil { + panic(fmt.Sprintf("failed to register %s leader watcher: %v", ExtensionName, err)) + } +} + +// initializePrecompile makes the extension visible in logs +func initializePrecompile(ctx context.Context, service *common.Service, db sql.DB, alias string, metadata map[string]any) (precompiles.Precompile, error) { + return precompiles.Precompile{}, nil +} + +// engineReadyHook initializes engine operations +func engineReadyHook(ctx context.Context, app *common.App) error { + logger := app.Service.Logger.New(ExtensionName) + + var db sql.DB = app.DB + if db == nil { + logger.Warn("app.DB is nil; LP rewards extension may not be fully operational") + } + + // Build engine operations wrapper + engOps := internal.NewEngineOperations(app.Engine, db, app.Service.DBPool, app.Accounts, app.Service.Logger) + + // Create extension instance and set basic values + ext := GetExtension() + ext.logger = logger + ext.service = app.Service + ext.engOps = engOps + + // Load config from database - only update if successful, otherwise keep defaults + enabled, interval, maxMarkets, err := engOps.LoadLPRewardsConfig(ctx) + if err != nil { + logger.Warn("failed to load LP rewards config; using defaults", "error", err) + } else { + ext.enabled = enabled + ext.samplingIntervalBlocks = int64(interval) + ext.maxMarketsPerRun = maxMarkets + } + + // Load config from node TOML [extensions.tn_lp_rewards] + if ext.service != nil && ext.service.LocalConfig != nil { + if m, ok := ext.service.LocalConfig.Extensions[ExtensionName]; ok { + // config_reload_interval_blocks (default: 100) + if v, ok2 := m["config_reload_interval_blocks"]; ok2 && v != "" { + var parsed int64 + if _, err := fmt.Sscan(v, &parsed); err != nil { + logger.Warn("failed to parse config_reload_interval_blocks, using default", "value", v, "error", err) + } else if parsed > 0 { + ext.configReloadInterval = parsed + } + } + } + } + + // Wire signer and broadcaster + wireSignerAndBroadcaster(app, ext) + + logger.Info("LP rewards extension initialized", + "enabled", ext.enabled, + "sampling_interval_blocks", ext.samplingIntervalBlocks, + "max_markets_per_run", ext.maxMarketsPerRun) + + return nil +} + +// wireSignerAndBroadcaster fills in signer and broadcaster if not already set +func wireSignerAndBroadcaster(app *common.App, ext *Extension) { + if app == nil || app.Service == nil || app.Service.LocalConfig == nil { + return + } + // Signer (from node key file) + if ext.signer == nil { + rootDir := appconf.RootDir() + if rootDir == "" { + ext.logger.Warn("root dir is empty; cannot load node key for signer") + } else { + keyPath := config.NodeKeyFilePath(rootDir) + if pk, err := key.LoadNodeKey(keyPath); err != nil { + ext.logger.Warn("failed to load node key for signer; LP rewards disabled until available", "path", keyPath, "error", err) + } else { + ext.signer = auth.GetUserSigner(pk) + } + } + } + // Broadcaster (JSON-RPC user service) + if ext.broadcaster == nil { + // Optional override: [extensions.tn_lp_rewards].rpc_url + if m, ok := app.Service.LocalConfig.Extensions[ExtensionName]; ok { + if rpcURL := m["rpc_url"]; rpcURL != "" { + if u, err := normalizeListenAddressForClient(rpcURL); err == nil { + ext.broadcaster = makeBroadcasterFromURL(u) + return + } else { + ext.logger.Warn("invalid extensions.tn_lp_rewards.rpc_url; falling back to [rpc].listen", "error", err) + } + } + } + listen := app.Service.LocalConfig.RPC.ListenAddress + if listen == "" { + ext.logger.Warn("RPC listen address is empty; cannot create broadcaster") + } else if u, err := normalizeListenAddressForClient(listen); err != nil { + ext.logger.Warn("invalid RPC listen address; cannot create broadcaster", "addr", listen, "error", err) + } else { + ext.broadcaster = makeBroadcasterFromURL(u) + } + } +} + +// normalizeListenAddressForClient converts a server listen address into a client URL +func normalizeListenAddressForClient(listen string) (*url.URL, error) { + if listen == "" { + return nil, fmt.Errorf("empty listen address") + } + endpoint := listen + if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { + endpoint = "http://" + endpoint + } + u, err := url.Parse(endpoint) + if err != nil { + return nil, err + } + host, port, err := net.SplitHostPort(u.Host) + if err != nil { + cleanHost := strings.Trim(u.Host, "[]") + if cleanHost == "" { + u.Host = "127.0.0.1" + } else if ip := net.ParseIP(cleanHost); ip != nil && ip.IsUnspecified() { + u.Host = "127.0.0.1" + } + } else { + cleanHost := strings.Trim(host, "[]") + if cleanHost == "" { + u.Host = net.JoinHostPort("127.0.0.1", port) + } else if ip := net.ParseIP(cleanHost); ip != nil && ip.IsUnspecified() { + u.Host = net.JoinHostPort("127.0.0.1", port) + } + } + return u, nil +} + +// makeBroadcasterFromURL creates a TxBroadcaster backed by the user JSON-RPC client +func makeBroadcasterFromURL(u *url.URL) TxBroadcaster { + userClient := rpcuser.NewClient(u) + return txBroadcasterFunc(func(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) { + // Map sync flag to broadcast mode (0 for accept-only, 1 for wait-commit) + mode := rpcclient.BroadcastWaitAccept + if sync == 1 { + mode = rpcclient.BroadcastWaitCommit + } + h, err := userClient.Broadcast(ctx, tx, mode) + if err != nil { + return ktypes.Hash{}, nil, err + } + // For accept mode, we don't need to query the result (just mempool acceptance) + if mode == rpcclient.BroadcastWaitAccept { + return h, nil, nil + } + // For commit mode, query the result + txQueryResp, err := userClient.TxQuery(ctx, h) + if err != nil { + return h, nil, fmt.Errorf("failed to query transaction result: %w", err) + } + if txQueryResp == nil || txQueryResp.Result == nil { + return h, nil, nil + } + return h, txQueryResp.Result, nil + }) +} + +// leaderAcquire is called when this node becomes leader +func leaderAcquire(ctx context.Context, app *common.App, block *common.BlockContext) { + ext := GetExtension() + ext.isLeader.Store(true) + ext.logger.Info("acquired leadership, LP rewards sampling enabled") +} + +// leaderLose is called when this node loses leadership +func leaderLose(ctx context.Context, app *common.App, block *common.BlockContext) { + ext := GetExtension() + ext.isLeader.Store(false) + ext.logger.Info("lost leadership, LP rewards sampling disabled") +} + +// leaderEndBlock is called at the end of each block when this node is leader +func leaderEndBlock(ctx context.Context, app *common.App, block *common.BlockContext) { + ext := GetExtension() + + if !ext.isLeader.Load() { + return + } + + if block == nil { + return + } + + blockHeight := block.Height + + // Snapshot config values under RLock to avoid races with reloadConfig + ext.mu.RLock() + enabled := ext.enabled + samplingIntervalBlocks := ext.samplingIntervalBlocks + configReloadInterval := ext.configReloadInterval + maxMarketsPerRun := ext.maxMarketsPerRun + ext.mu.RUnlock() + + // Reload config periodically (do this BEFORE enabled check so we can re-enable without restart) + if configReloadInterval > 0 && blockHeight-atomic.LoadInt64(&ext.lastCheckedHeight) >= configReloadInterval { + // Use background context since EndBlockHook context is canceled when block ends + go ext.reloadConfig(context.Background()) + atomic.StoreInt64(&ext.lastCheckedHeight, blockHeight) + } + + if !enabled { + return + } + + // Check if it's time to sample (every N blocks) + if samplingIntervalBlocks <= 0 || blockHeight%samplingIntervalBlocks != 0 { + return + } + + // Skip if previous sampling run is still in progress (prevents nonce conflicts) + if !ext.isSampling.CompareAndSwap(false, true) { + ext.logger.Warn("skipping LP rewards sampling - previous run still in progress", + "block", blockHeight) + return + } + + // Sample LP rewards in background with background context + // (EndBlockHook context is canceled when block processing ends) + go ext.sampleLPRewardsWithConfig(context.Background(), blockHeight, maxMarketsPerRun) +} + +// reloadConfig reloads configuration from database +func (ext *Extension) reloadConfig(ctx context.Context) { + if ext.engOps == nil { + return + } + + enabled, interval, maxMarkets, err := ext.engOps.LoadLPRewardsConfig(ctx) + if err != nil { + ext.logger.Warn("failed to reload LP rewards config", "error", err) + return + } + + ext.mu.Lock() + ext.enabled = enabled + ext.samplingIntervalBlocks = int64(interval) + ext.maxMarketsPerRun = maxMarkets + ext.mu.Unlock() + + ext.logger.Debug("reloaded LP rewards config", + "enabled", enabled, + "sampling_interval_blocks", interval, + "max_markets_per_run", maxMarkets) +} + +// sampleLPRewardsWithConfig samples LP rewards for all active markets using provided config +func (ext *Extension) sampleLPRewardsWithConfig(ctx context.Context, blockHeight int64, maxMarketsPerRun int) { + // Always clear the sampling flag when done + defer ext.isSampling.Store(false) + + if ext.engOps == nil || ext.signer == nil || ext.broadcaster == nil { + ext.logger.Warn("LP rewards extension not fully initialized") + return + } + + // Get active markets + markets, err := ext.engOps.GetActiveMarkets(ctx, maxMarketsPerRun) + if err != nil { + ext.logger.Warn("failed to get active markets", "error", err) + return + } + + if len(markets) == 0 { + ext.logger.Debug("no active markets to sample", "block", blockHeight) + return + } + + ext.logger.Info("sampling LP rewards", + "block", blockHeight, + "market_count", len(markets)) + + // Get chain ID from service + chainID := "" + if ext.service != nil && ext.service.GenesisConfig != nil { + chainID = ext.service.GenesisConfig.ChainID + } + + // Sample each market sequentially to avoid nonce conflicts + // Each transaction must complete before the next one starts + successCount := 0 + failCount := 0 + for _, queryID := range markets { + err := ext.engOps.BroadcastSampleLPRewards( + ctx, + chainID, + ext.signer, + ext.broadcaster.BroadcastTx, + queryID, + blockHeight, + ) + if err != nil { + ext.logger.Warn("failed to sample LP rewards", + "query_id", queryID, + "block", blockHeight, + "error", err) + failCount++ + // Continue to next market even if this one fails + continue + } + + ext.logger.Debug("sampled LP rewards", + "query_id", queryID, + "block", blockHeight) + successCount++ + } + + ext.logger.Info("LP rewards sampling completed", + "block", blockHeight, + "success", successCount, + "failed", failCount) +} diff --git a/extensions/tn_settlement/extension.go b/extensions/tn_settlement/extension.go index 8fd2f255c..a5514d939 100644 --- a/extensions/tn_settlement/extension.go +++ b/extensions/tn_settlement/extension.go @@ -39,6 +39,9 @@ type Extension struct { maxMarketsPerRun int retryAttempts int + // Processing state - prevents overlapping settlement runs (nonce conflicts) + isProcessing atomic.Bool + // reload policy reloadIntervalBlocks int64 lastCheckedHeight int64 @@ -169,6 +172,19 @@ func (e *Extension) Broadcaster() TxBroadcaster { return e.broadcaster } func (e *Extension) SetNodeSigner(s auth.Signer) { e.nodeSigner = s } func (e *Extension) NodeSigner() auth.Signer { return e.nodeSigner } +// IsProcessing returns true if a settlement run is currently in progress +func (e *Extension) IsProcessing() bool { return e.isProcessing.Load() } + +// SetProcessing sets the processing flag (used to clear the flag when done) +func (e *Extension) SetProcessing(v bool) { e.isProcessing.Store(v) } + +// TryStartProcessing atomically checks and sets the processing flag. +// Returns true if processing was started (flag was false and is now true). +// Returns false if another run is already in progress (flag was already true). +func (e *Extension) TryStartProcessing() bool { + return e.isProcessing.CompareAndSwap(false, true) +} + // ShutdownContext returns the extension's shutdown context, creating it if needed. // This context is cancelled when Close() is called, allowing graceful termination // of long-running operations like the scheduler. diff --git a/extensions/tn_settlement/internal/engine_ops.go b/extensions/tn_settlement/internal/engine_ops.go index a6a70b7d0..9bb8bfaa0 100644 --- a/extensions/tn_settlement/internal/engine_ops.go +++ b/extensions/tn_settlement/internal/engine_ops.go @@ -258,7 +258,8 @@ func (e *EngineOperations) AttestationExists(ctx context.Context, marketHash []b return exists, nil } -// BroadcastSettleMarketWithRetry broadcasts settle_market transaction with retry logic +// BroadcastSettleMarketWithRetry broadcasts settle_market transaction with retry logic. +// Uses exponential backoff since external systems may use the same wallet (nonce conflicts). func (e *EngineOperations) BroadcastSettleMarketWithRetry( ctx context.Context, chainID string, @@ -277,9 +278,10 @@ func (e *EngineOperations) BroadcastSettleMarketWithRetry( "attempt", attempt, "query_id", queryID, "backoff", backoff, + "is_nonce_error", isNonceError(lastErr), "last_error", lastErr) - // Wait before retry + // Wait before retry with context cancellation support select { case <-ctx.Done(): return ctx.Err() @@ -310,10 +312,11 @@ func (e *EngineOperations) BroadcastSettleMarketWithRetry( "attempt", attempt, "query_id", queryID, "tx_hash", hash.String(), + "is_nonce_error", isNonceError(err), "error", err) } - return fmt.Errorf("max retries (%d) exceeded: %w", maxRetries, lastErr) + return fmt.Errorf("settle_market failed after %d retries: %w", maxRetries, lastErr) } // broadcastSettleMarketWithFreshNonce builds and broadcasts settle_market with fresh nonce @@ -531,7 +534,17 @@ func decodeQueryComponents(data []byte) (*QueryComponents, error) { }, nil } +// isNonceError checks if the error is related to nonce conflicts +func isNonceError(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "nonce") || strings.Contains(msg, "invalid nonce") +} + // RequestAttestationForMarket broadcasts a request_attestation transaction for a market +// with retry logic (exponential backoff for transient errors like nonce conflicts) func (e *EngineOperations) RequestAttestationForMarket( ctx context.Context, chainID string, @@ -539,12 +552,67 @@ func (e *EngineOperations) RequestAttestationForMarket( broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), market *UnsettledMarket, ) error { - // Get query components from market + // Get query components from market (only need to do this once) components, err := e.GetMarketQueryComponents(ctx, market.ID) if err != nil { return fmt.Errorf("get query components: %w", err) } + // Retry configuration + // Uses exponential backoff since external systems may be using the same wallet + const maxRetries = 5 + backoff := 2 * time.Second + maxBackoff := 30 * time.Second + + var lastErr error + for attempt := 0; attempt <= maxRetries; attempt++ { + if attempt > 0 { + e.logger.Warn("retrying request_attestation", + "query_id", market.ID, + "attempt", attempt, + "backoff", backoff, + "is_nonce_error", isNonceError(lastErr), + "last_error", lastErr) + + // Wait before retry with context cancellation support + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backoff): + } + + // Exponential backoff + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + + err := e.broadcastRequestAttestationWithFreshNonce(ctx, chainID, signer, broadcaster, market, components) + if err == nil { + return nil + } + + lastErr = err + e.logger.Warn("request_attestation broadcast failed", + "query_id", market.ID, + "attempt", attempt, + "is_nonce_error", isNonceError(err), + "error", err) + } + + return fmt.Errorf("request_attestation failed after %d retries: %w", maxRetries, lastErr) +} + +// broadcastRequestAttestationWithFreshNonce builds and broadcasts request_attestation with fresh nonce +func (e *EngineOperations) broadcastRequestAttestationWithFreshNonce( + ctx context.Context, + chainID string, + signer auth.Signer, + broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), + market *UnsettledMarket, + components *QueryComponents, +) error { // Get signer account ID signerAccountID, err := ktypes.GetSignerAccount(signer) if err != nil { @@ -648,7 +716,8 @@ func (e *EngineOperations) RequestAttestationForMarket( "tx_hash", hash.String(), "data_provider", components.DataProvider, "stream_id", components.StreamID, - "action_name", components.ActionName) + "action_name", components.ActionName, + "nonce", nextNonce) return nil } diff --git a/extensions/tn_settlement/internal/engine_ops_test.go b/extensions/tn_settlement/internal/engine_ops_test.go index eb5909f63..c87af40cc 100644 --- a/extensions/tn_settlement/internal/engine_ops_test.go +++ b/extensions/tn_settlement/internal/engine_ops_test.go @@ -50,7 +50,7 @@ func (m *mockBroadcaster) broadcast(ctx context.Context, tx *ktypes.Transaction, } type mockAccounts struct { - nonceCalls int + nonceCalls int currentNonce int64 } @@ -198,8 +198,8 @@ func TestBroadcastSettleMarketWithRetry_MaxRetriesExceeded(t *testing.T) { t.Fatal("Expected error, got nil") } - if !strings.Contains(err.Error(), "max retries") { - t.Errorf("Expected max retries error, got: %v", err) + if !strings.Contains(err.Error(), "retries") { + t.Errorf("Expected retries error, got: %v", err) } // With maxRetries=0, should only attempt once diff --git a/extensions/tn_settlement/scheduler/scheduler.go b/extensions/tn_settlement/scheduler/scheduler.go index d4e367ace..35c9de4ab 100644 --- a/extensions/tn_settlement/scheduler/scheduler.go +++ b/extensions/tn_settlement/scheduler/scheduler.go @@ -20,6 +20,17 @@ type txBroadcaster interface { BroadcastTx(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) } +// ProcessingGuard provides atomic processing state management to prevent +// overlapping settlement runs (which cause nonce conflicts). +type ProcessingGuard interface { + // TryStartProcessing atomically tries to start processing. + // Returns true if processing was started (no other run in progress). + // Returns false if another run is already in progress. + TryStartProcessing() bool + // SetProcessing sets the processing state (used to clear flag when done). + SetProcessing(v bool) +} + // EngineOps defines the operations needed by the settlement scheduler. // This interface allows for mocking in tests. type EngineOps interface { @@ -43,6 +54,9 @@ type SettlementScheduler struct { maxMarketsPerRun int retryAttempts int + + // processingGuard prevents overlapping settlement runs (nonce conflicts) + processingGuard ProcessingGuard } type NewSettlementSchedulerParams struct { @@ -53,6 +67,7 @@ type NewSettlementSchedulerParams struct { Tx txBroadcaster MaxMarketsPerRun int RetryAttempts int + ProcessingGuard ProcessingGuard // Optional: prevents overlapping runs } func NewSettlementScheduler(params NewSettlementSchedulerParams) *SettlementScheduler { @@ -74,6 +89,7 @@ func NewSettlementScheduler(params NewSettlementSchedulerParams) *SettlementSche signer: params.Signer, maxMarketsPerRun: maxMarkets, retryAttempts: retries, + processingGuard: params.ProcessingGuard, } } @@ -108,6 +124,22 @@ func (s *SettlementScheduler) Start(ctx context.Context, cronExpr string) error } }() + // Check processing guard to prevent overlapping runs (prevents nonce conflicts) + // This is needed because scheduler restart (e.g., on config change) can cause + // the previous job to still be running when a new scheduler starts. + s.mu.Lock() + guard := s.processingGuard + s.mu.Unlock() + + if guard != nil { + if !guard.TryStartProcessing() { + s.logger.Warn("skipping settlement job - previous run still in progress") + return + } + // Ensure we clear the processing flag when done + defer guard.SetProcessing(false) + } + // Snapshot dependencies under lock to avoid races with setters s.mu.Lock() jobCtx := s.ctx @@ -247,6 +279,18 @@ func (s *SettlementScheduler) Stop() error { // RunOnce executes the settlement job payload once (for tests and manual triggering) func (s *SettlementScheduler) RunOnce(ctx context.Context) error { + // Check processing guard to prevent overlapping runs + s.mu.Lock() + guard := s.processingGuard + s.mu.Unlock() + + if guard != nil { + if !guard.TryStartProcessing() { + return fmt.Errorf("settlement run already in progress") + } + defer guard.SetProcessing(false) + } + s.mu.Lock() engineOps := s.engineOps broadcaster := s.broadcaster diff --git a/extensions/tn_settlement/scheduler_lifecycle.go b/extensions/tn_settlement/scheduler_lifecycle.go index 5bcfb4648..666e1341e 100644 --- a/extensions/tn_settlement/scheduler_lifecycle.go +++ b/extensions/tn_settlement/scheduler_lifecycle.go @@ -27,6 +27,7 @@ func (e *Extension) buildScheduler(service *common.Service) *scheduler.Settlemen Signer: e.NodeSigner(), MaxMarketsPerRun: e.MaxMarketsPerRun(), RetryAttempts: e.RetryAttempts(), + ProcessingGuard: e, // Extension implements ProcessingGuard interface }) } diff --git a/extensions/tn_settlement/settlement_integration_test.go b/extensions/tn_settlement/settlement_integration_test.go index 7db8b69a3..2c23ee92f 100644 --- a/extensions/tn_settlement/settlement_integration_test.go +++ b/extensions/tn_settlement/settlement_integration_test.go @@ -314,7 +314,7 @@ func testLoadSettlementConfig(t *testing.T) func(context.Context, *kwilTesting.P require.NoError(t, err) require.True(t, enabled, "should be true (enabled by migration 041)") require.Equal(t, "0,30 * * * *", schedule, "should be 30-minute schedule from migration 041") - require.Equal(t, 10, maxMarkets) + require.Equal(t, 100, maxMarkets, "should be 100 from migration 041") require.Equal(t, 3, retries) t.Logf("✅ LoadSettlementConfig loaded config from migration: enabled=%v, schedule=%s, max=%d, retries=%d", diff --git a/internal/migrations/033-order-book-settlement.sql b/internal/migrations/033-order-book-settlement.sql index b64a87cc1..4189237a0 100644 --- a/internal/migrations/033-order-book-settlement.sql +++ b/internal/migrations/033-order-book-settlement.sql @@ -3,10 +3,10 @@ * * Automatic atomic settlement processing: * - Bulk delete losing positions (efficient) - * - Pay winners (shares × $1.00 - 2% redemption fee) - * - Refund open buy orders (no fee) + * - Pay winners full $1.00 per share (no redemption fee) + * - Refund open buy orders * - Delete all positions atomically - * - Collect fees in vault + * - Zero-sum settlement: losers fund winners * * Implementation Note: * Uses CTE + ARRAY_AGG to collect all payout data in a single query, then @@ -72,8 +72,12 @@ CREATE OR REPLACE ACTION ob_batch_unlock_collateral( /** * distribute_fees($query_id, $total_fees) * - * Distributes settlement fees to liquidity providers based on sampled rewards. - * Called automatically after winner payouts in process_settlement(). + * Distributes trading fees to liquidity providers based on sampled rewards. + * Called to distribute LP rewards collected during market operation (spread fees). + * + * NOTE: This function is NOT called during settlement. Settlement is zero-sum + * (no redemption fees). This function is used to distribute trading fees that + * were collected during market operation from the bid-ask spread. * * DYNAMIC REWARDS MODEL: * Uses the ob_rewards table populated by periodic sample_lp_rewards() calls. @@ -96,8 +100,8 @@ CREATE OR REPLACE ACTION ob_batch_unlock_collateral( * This ensures full traceability for compliance and user verification. * * Parameters: - * - $query_id: Settled market ID - * - $total_fees: Total fees collected (2% of redemptions), in wei + * - $query_id: Market ID + * - $total_fees: Total trading fees to distribute, in wei * * Behavior: * - No samples → fees remain in vault (safe accumulation), NO audit record @@ -301,13 +305,15 @@ CREATE OR REPLACE ACTION distribute_fees( DELETE FROM ob_rewards WHERE query_id = $query_id; }; --- Process settlement: Pay winners, refund open buys, collect fees +-- Process settlement: Pay winners, refund open buys +-- NOTE: No redemption fee is charged. Winners receive full $1 per share. +-- LP rewards come from trading fees (spread) collected during market operation, +-- NOT from settlement redemptions. This ensures prediction markets are zero-sum +-- for participants (losers fund winners, not winners paying fees). CREATE OR REPLACE ACTION process_settlement( $query_id INT, $winning_outcome BOOL ) PRIVATE { - $redemption_fee_bps INT := 200; -- 2% (200 basis points) - $total_fees_collected NUMERIC(78, 0) := '0'::NUMERIC(78, 0); $one_token NUMERIC(78, 0) := '1000000000000000000'::NUMERIC(78, 0); -- Get market's bridge for unlock operations @@ -351,8 +357,8 @@ CREATE OR REPLACE ACTION process_settlement( price, -- Pre-calculate all monetary values to avoid CASE type issues -- All amounts cast to NUMERIC(78, 0) to match ethereum_bridge.unlock() API - (amount::NUMERIC(78, 0) * $one_token)::NUMERIC(78, 0) as gross_winner_payout, - ((amount::NUMERIC(78, 0) * $one_token * $redemption_fee_bps::NUMERIC(78, 0)) / 10000::NUMERIC(78, 0))::NUMERIC(78, 0) as winner_fee, + -- Winners get full $1 per share (no redemption fee) + (amount::NUMERIC(78, 0) * $one_token)::NUMERIC(78, 0) as winner_payout, ((amount::NUMERIC(78, 0) * abs(price)::NUMERIC(78, 0) * $one_token) / 100::NUMERIC(78, 0))::NUMERIC(78, 0) as refund_amount FROM remaining_positions ), @@ -360,44 +366,35 @@ CREATE OR REPLACE ACTION process_settlement( SELECT wallet_address, -- Remaining positions after Step 1 are: - -- 1. Winning holdings/sells (price >= 0): Pay shares × $1 - 2% fee - -- 2. Open buy orders (price < 0): Refund locked collateral, no fee + -- 1. Winning holdings/sells (price >= 0): Pay shares × $1 (full amount, no fee) + -- 2. Open buy orders (price < 0): Refund locked collateral CASE WHEN price >= 0 THEN - gross_winner_payout - winner_fee + winner_payout ELSE refund_amount - END as payout_amount, - CASE - WHEN price >= 0 THEN - winner_fee - ELSE - '0'::NUMERIC(78, 0) - END as fee_amount + END as payout_amount FROM calculated_values ), wallet_totals AS ( -- Group by wallet to handle multiple positions per user SELECT wallet_address, - SUM(payout_amount) as total_payout, - SUM(fee_amount) as total_fees + SUM(payout_amount) as total_payout FROM payouts GROUP BY wallet_address ), aggregated AS ( SELECT ARRAY_AGG(wallet_address ORDER BY wallet_address) as wallets, - ARRAY_AGG(total_payout::NUMERIC(78, 0) ORDER BY wallet_address) as amounts, - SUM(total_fees)::NUMERIC(78, 0) as total_fees + ARRAY_AGG(total_payout::NUMERIC(78, 0) ORDER BY wallet_address) as amounts FROM wallet_totals ) - SELECT wallets, amounts, COALESCE(total_fees, 0::NUMERIC(78, 0)) as total_fees + SELECT wallets, amounts FROM aggregated { $wallet_addresses := $result.wallets; $amounts := $result.amounts; - $total_fees_collected := $result.total_fees; } -- Step 3: Delete all processed positions (set-based, no loop!) @@ -408,16 +405,8 @@ CREATE OR REPLACE ACTION process_settlement( ob_batch_unlock_collateral($bridge, $wallet_addresses, $amounts); } - -- Step 5: Fee distribution to liquidity providers - -- Fees are automatically kept in the vault by deducting from unlocked amounts. - -- Winners receive (shares × $1 - 2% fee), so 2% remains locked in vault. - -- - -- distribute_fees() distributes the collected fees to qualified LPs proportionally. - -- See function definition above in this migration for implementation details. - -- Edge cases: - -- - No LPs: Fees remain in vault (safe accumulation) - -- - Zero fees: No-op, returns early - distribute_fees($query_id, $total_fees_collected); - - -- Verification: Check vault balance via ethereum_bridge queries + -- Note: LP fee distribution happens separately via trading fees collected during + -- market operation (spread between buy/sell orders). Settlement does NOT charge + -- redemption fees - winners receive full $1 per share, ensuring the market is + -- zero-sum for participants (losers fund winners). } diff --git a/internal/migrations/034-order-book-rewards.sql b/internal/migrations/034-order-book-rewards.sql index 2de688ca2..e93f4d6fc 100644 --- a/internal/migrations/034-order-book-rewards.sql +++ b/internal/migrations/034-order-book-rewards.sql @@ -121,6 +121,12 @@ CREATE OR REPLACE ACTION sample_lp_rewards( $query_id INT, $block INT8 ) PUBLIC { + -- Check if this block was already sampled to prevent duplicate key errors + -- This handles retries, race conditions, and scheduler overlap + for $row in SELECT 1 FROM ob_rewards WHERE query_id = $query_id AND block = $block LIMIT 1 { + RETURN; + } + -- Check if market is settled $is_settled BOOL; for $row in SELECT settled FROM ob_queries WHERE id = $query_id { diff --git a/internal/migrations/037-order-book-validation.sql b/internal/migrations/037-order-book-validation.sql index 5eedc686f..39ca16de7 100644 --- a/internal/migrations/037-order-book-validation.sql +++ b/internal/migrations/037-order-book-validation.sql @@ -179,8 +179,10 @@ PUBLIC VIEW RETURNS ( -- Step 8: Validate collateral balance -- Now compares vault balance against TOTAL expected collateral from ALL unsettled markets + -- Using >= because having MORE collateral than expected is safe (extra margin), + -- while having LESS would indicate missing funds (which would fail this check) $valid_collateral BOOL; - if $vault_balance = $expected_collateral { + if $vault_balance >= $expected_collateral { $valid_collateral := TRUE; } else { $valid_collateral := FALSE; diff --git a/internal/migrations/041-settlement-config-actions.sql b/internal/migrations/041-settlement-config-actions.sql index bf68491b1..467f05b45 100644 --- a/internal/migrations/041-settlement-config-actions.sql +++ b/internal/migrations/041-settlement-config-actions.sql @@ -19,7 +19,7 @@ UPDATE settlement_config SET enabled = true, settlement_schedule = '0,30 * * * *', - max_markets_per_run = 10, + max_markets_per_run = 100, retry_attempts = 3, updated_at = 0 WHERE id = 1; @@ -37,7 +37,7 @@ WHERE id = 1; -- - $retry_attempts: Number of retry attempts for failed settlements (INT, 1-10) -- -- Usage: --- kwil-cli call-action update_settlement_config bool:true text:'0,30 * * * *' int:10 int:3 +-- kwil-cli call-action update_settlement_config bool:true text:'0,30 * * * *' int:100 int:3 CREATE OR REPLACE ACTION update_settlement_config( $enabled BOOL, $schedule TEXT, @@ -49,9 +49,9 @@ CREATE OR REPLACE ACTION update_settlement_config( $has_role BOOL := false; for $row in SELECT 1 FROM role_members - WHERE role_owner = 'system' + WHERE owner = 'system' AND role_name = 'network_writer' - AND member_address = $caller_addr { + AND wallet = $caller_addr { $has_role := true; } diff --git a/internal/migrations/042-lp-rewards-config.sql b/internal/migrations/042-lp-rewards-config.sql new file mode 100644 index 000000000..5f2a5b6dd --- /dev/null +++ b/internal/migrations/042-lp-rewards-config.sql @@ -0,0 +1,147 @@ +/** + * MIGRATION 042: LP REWARDS SCHEDULER CONFIGURATION + * + * Purpose: + * Configuration table for the tn_lp_rewards extension that samples + * liquidity provider rewards at block intervals. + * + * The scheduler calls sample_lp_rewards($query_id, $block) periodically + * for all active (unsettled) markets to track LP contributions over time. + * + * Dependencies: + * - Migration 034: sample_lp_rewards action and ob_rewards table + * - Migration 030: ob_queries table (active markets) + */ + +-- ============================================================================ +-- CONFIGURATION TABLE +-- ============================================================================ + +/** + * lp_rewards_config + * + * Single-row configuration for LP rewards sampling scheduler. + * Similar to settlement_config but uses block intervals instead of cron. + * + * Columns: + * - id: Always 1 (single row) + * - enabled: Whether sampling is active + * - sampling_interval_blocks: Sample every N blocks (e.g., 10 = every 10 blocks) + * - max_markets_per_run: Limit markets sampled per block to prevent overload + */ +CREATE TABLE IF NOT EXISTS lp_rewards_config ( + id INT PRIMARY KEY DEFAULT 1, + enabled BOOL NOT NULL DEFAULT TRUE, + sampling_interval_blocks INT NOT NULL DEFAULT 10, + max_markets_per_run INT NOT NULL DEFAULT 50, + CHECK (id = 1), + CHECK (sampling_interval_blocks >= 1), + CHECK (max_markets_per_run >= 1) +); + +-- Insert default configuration +INSERT INTO lp_rewards_config (id, enabled, sampling_interval_blocks, max_markets_per_run) +VALUES (1, TRUE, 10, 50) +ON CONFLICT (id) DO NOTHING; + +-- ============================================================================ +-- CONFIGURATION ACTIONS +-- ============================================================================ + +/** + * get_lp_rewards_config + * + * Returns the current LP rewards scheduler configuration. + * Used by the tn_lp_rewards extension to load settings. + */ +CREATE OR REPLACE ACTION get_lp_rewards_config() +PUBLIC VIEW RETURNS ( + enabled BOOL, + sampling_interval_blocks INT, + max_markets_per_run INT +) { + for $row in SELECT enabled, sampling_interval_blocks, max_markets_per_run + FROM lp_rewards_config WHERE id = 1 + { + RETURN $row.enabled, $row.sampling_interval_blocks, $row.max_markets_per_run; + } + -- Default if no row exists + RETURN TRUE, 10, 50; +}; + +/** + * update_lp_rewards_config + * + * Updates the LP rewards scheduler configuration. + * Only callable by network_writer role. + * + * Parameters: + * - $enabled: Enable/disable sampling + * - $sampling_interval_blocks: Sample every N blocks + * - $max_markets_per_run: Max markets to sample per run + */ +CREATE OR REPLACE ACTION update_lp_rewards_config( + $enabled BOOL, + $sampling_interval_blocks INT, + $max_markets_per_run INT +) PUBLIC { + -- Check caller has network_writer role (query role_members table) + $has_role BOOL := FALSE; + + for $row in SELECT 1 FROM role_members + WHERE owner = 'system' + AND role_name = 'network_writer' + AND wallet = LOWER(@caller) + LIMIT 1 + { + $has_role := TRUE; + } + + if $has_role = FALSE { + ERROR('Only network_writer can update LP rewards config'); + } + + -- Validate inputs + if $enabled IS NULL { + ERROR('enabled cannot be NULL'); + } + if $sampling_interval_blocks IS NULL OR $sampling_interval_blocks < 1 { + ERROR('sampling_interval_blocks must be >= 1'); + } + if $max_markets_per_run IS NULL OR $max_markets_per_run < 1 OR $max_markets_per_run > 1000 { + ERROR('max_markets_per_run must be between 1 and 1000'); + } + + -- Update or insert config + INSERT INTO lp_rewards_config (id, enabled, sampling_interval_blocks, max_markets_per_run) + VALUES (1, $enabled, $sampling_interval_blocks, $max_markets_per_run) + ON CONFLICT (id) DO UPDATE + SET enabled = $enabled, + sampling_interval_blocks = $sampling_interval_blocks, + max_markets_per_run = $max_markets_per_run; +}; + +/** + * get_active_markets_for_sampling + * + * Returns active (unsettled) markets that can be sampled for LP rewards. + * Used by the tn_lp_rewards extension. + * + * Parameters: + * - $limit: Maximum number of markets to return + */ +CREATE OR REPLACE ACTION get_active_markets_for_sampling($limit INT) +PUBLIC VIEW RETURNS (query_id INT) { + -- Validate limit parameter + if $limit IS NULL OR $limit < 1 { + ERROR('limit must be >= 1'); + } + + for $row in SELECT id FROM ob_queries + WHERE settled = FALSE + ORDER BY id ASC + LIMIT $limit + { + RETURN NEXT $row.id; + } +}; diff --git a/tests/streams/order_book/lp_rewards_config_test.go b/tests/streams/order_book/lp_rewards_config_test.go new file mode 100644 index 000000000..f1ae5e383 --- /dev/null +++ b/tests/streams/order_book/lp_rewards_config_test.go @@ -0,0 +1,425 @@ +//go:build kwiltest + +package order_book + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/common" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + "github.com/trufnetwork/kwil-db/core/types" + erc20bridge "github.com/trufnetwork/kwil-db/node/exts/erc20-bridge/erc20" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/internal/migrations" + testutils "github.com/trufnetwork/node/tests/streams/utils" + "github.com/trufnetwork/node/tests/streams/utils/setup" + "github.com/trufnetwork/sdk-go/core/util" +) + +// TestLPRewardsConfig tests the LP rewards configuration table and actions +func TestLPRewardsConfig(t *testing.T) { + testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "LP_REWARDS_CONFIG", + SeedStatements: migrations.GetSeedScriptStatements(), + FunctionTests: []kwilTesting.TestFunc{ + testGetLPRewardsConfigDefault(t), + testUpdateLPRewardsConfig(t), + testUpdateLPRewardsConfigUnauthorized(t), + testGetActiveMarketsForSampling(t), + }, + }, testutils.GetTestOptionsWithCache()) +} + +// testGetLPRewardsConfigDefault tests that config can be read +func testGetLPRewardsConfigDefault(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + Timestamp: time.Now().Unix(), + }, + Signer: []byte("anonymous"), + Caller: "anonymous", + TxID: platform.Txid(), + Authenticator: "anonymous", + } + engineCtx := &common.EngineContext{TxContext: tx} + + var enabled bool + var samplingInterval int64 + var maxMarkets int64 + var found bool + + res, err := platform.Engine.Call( + engineCtx, + platform.DB, + "", + "get_lp_rewards_config", + []any{}, + func(row *common.Row) error { + // Use type switches for robustness across different backends + switch v := row.Values[0].(type) { + case bool: + enabled = v + default: + return fmt.Errorf("unexpected type for enabled: %T", v) + } + switch v := row.Values[1].(type) { + case int64: + samplingInterval = v + case int: + samplingInterval = int64(v) + default: + return fmt.Errorf("unexpected type for sampling_interval: %T", v) + } + switch v := row.Values[2].(type) { + case int64: + maxMarkets = v + case int: + maxMarkets = int64(v) + default: + return fmt.Errorf("unexpected type for max_markets: %T", v) + } + found = true + return nil + }, + ) + require.NoError(t, err) + if res.Error != nil { + return fmt.Errorf("get_lp_rewards_config failed: %v", res.Error) + } + require.True(t, found, "Should return config row") + + // Verify config values are reasonable (not checking exact defaults due to test pollution) + require.GreaterOrEqual(t, samplingInterval, int64(1), "Sampling interval should be >= 1") + require.GreaterOrEqual(t, maxMarkets, int64(1), "Max markets should be >= 1") + + t.Logf("LP rewards config: enabled=%v, sampling_interval=%d, max_markets=%d", + enabled, samplingInterval, maxMarkets) + + return nil + } +} + +// testUpdateLPRewardsConfig tests updating config with network_writer role +func testUpdateLPRewardsConfig(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Use a test address and grant it network_writer role + networkWriter := util.Unsafe_NewEthereumAddressFromString("0xf9820f9143699cac6f662b19a4b29e13c9393783") + + // Grant network_writer role to this address + err := setup.AddMemberToRoleBypass(ctx, platform, "system", "network_writer", networkWriter.Address()) + require.NoError(t, err, "Failed to grant network_writer role") + + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + Timestamp: time.Now().Unix(), + }, + Signer: networkWriter.Bytes(), + Caller: networkWriter.Address(), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx} + + // Update config to new values + res, err := platform.Engine.Call( + engineCtx, + platform.DB, + "", + "update_lp_rewards_config", + []any{false, int64(20), int64(100)}, // enabled=false, interval=20, max=100 + nil, + ) + require.NoError(t, err) + if res.Error != nil { + return fmt.Errorf("update_lp_rewards_config failed: %v", res.Error) + } + + // Verify updated values + var enabled bool + var samplingInterval int64 + var maxMarkets int64 + + readCtx := &common.EngineContext{TxContext: &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + Timestamp: time.Now().Unix(), + }, + Signer: []byte("anonymous"), + Caller: "anonymous", + TxID: platform.Txid(), + Authenticator: "anonymous", + }} + + res, err = platform.Engine.Call( + readCtx, + platform.DB, + "", + "get_lp_rewards_config", + []any{}, + func(row *common.Row) error { + // Use type switches for robustness + switch v := row.Values[0].(type) { + case bool: + enabled = v + default: + return fmt.Errorf("unexpected type for enabled: %T", v) + } + switch v := row.Values[1].(type) { + case int64: + samplingInterval = v + case int: + samplingInterval = int64(v) + default: + return fmt.Errorf("unexpected type for sampling_interval: %T", v) + } + switch v := row.Values[2].(type) { + case int64: + maxMarkets = v + case int: + maxMarkets = int64(v) + default: + return fmt.Errorf("unexpected type for max_markets: %T", v) + } + return nil + }, + ) + require.NoError(t, err) + if res.Error != nil { + return fmt.Errorf("get_lp_rewards_config after update failed: %v", res.Error) + } + + require.False(t, enabled, "Should be disabled after update") + require.Equal(t, int64(20), samplingInterval, "Sampling interval should be 20") + require.Equal(t, int64(100), maxMarkets, "Max markets should be 100") + + t.Logf("Updated LP rewards config: enabled=%v, sampling_interval=%d, max_markets=%d", + enabled, samplingInterval, maxMarkets) + + // Reset back to defaults for other tests + res, err = platform.Engine.Call( + engineCtx, + platform.DB, + "", + "update_lp_rewards_config", + []any{true, int64(10), int64(50)}, + nil, + ) + require.NoError(t, err) + if res.Error != nil { + return fmt.Errorf("reset config failed: %v", res.Error) + } + + return nil + } +} + +// testUpdateLPRewardsConfigUnauthorized tests that non-network_writer cannot update config +func testUpdateLPRewardsConfigUnauthorized(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Use a random address that doesn't have network_writer role + randomUser := util.Unsafe_NewEthereumAddressFromString("0x1234567890123456789012345678901234567890") + + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + Timestamp: time.Now().Unix(), + }, + Signer: randomUser.Bytes(), + Caller: randomUser.Address(), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx} + + // Try to update config - should fail + res, err := platform.Engine.Call( + engineCtx, + platform.DB, + "", + "update_lp_rewards_config", + []any{false, int64(100), int64(200)}, + nil, + ) + require.NoError(t, err) // Engine call succeeds + + // Action should return error + require.NotNil(t, res.Error, "Should fail for unauthorized user") + require.Contains(t, res.Error.Error(), "network_writer", "Error should mention network_writer role") + + t.Logf("Unauthorized update correctly rejected: %v", res.Error) + + return nil + } +} + +// testGetActiveMarketsForSampling tests getting active markets list +func testGetActiveMarketsForSampling(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + lastBalancePoint = nil + lastTrufBalancePoint = nil + + // Initialize ERC20 extension + err := initERC20ForTest(ctx, platform) + require.NoError(t, err) + + user := util.Unsafe_NewEthereumAddressFromString("0xabcdef0123456789abcdef0123456789abcdef01") + + // Give user balance + err = giveBalanceChained(ctx, platform, user.Address(), "500000000000000000000") + require.NoError(t, err) + + // Create a market (will be active/unsettled) + queryComponents, err := encodeQueryComponentsForTests(user.Address(), "sttest00000000000000000000000057", "get_record", []byte{0x01}) + require.NoError(t, err) + settleTime := time.Now().Add(24 * time.Hour).Unix() + + var marketID int64 + err = callCreateMarket(ctx, platform, &user, queryComponents, settleTime, 5, 20, func(row *common.Row) error { + marketID = row.Values[0].(int64) + return nil + }) + require.NoError(t, err) + + // Get active markets + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + Timestamp: time.Now().Unix(), + }, + Signer: []byte("anonymous"), + Caller: "anonymous", + TxID: platform.Txid(), + Authenticator: "anonymous", + } + engineCtx := &common.EngineContext{TxContext: tx} + + var activeMarkets []int64 + res, err := platform.Engine.Call( + engineCtx, + platform.DB, + "", + "get_active_markets_for_sampling", + []any{int64(10)}, // limit=10 + func(row *common.Row) error { + switch v := row.Values[0].(type) { + case int64: + activeMarkets = append(activeMarkets, v) + case int: + activeMarkets = append(activeMarkets, int64(v)) + case int32: + activeMarkets = append(activeMarkets, int64(v)) + default: + return fmt.Errorf("unexpected type for query_id: %T", v) + } + return nil + }, + ) + require.NoError(t, err) + if res.Error != nil { + return fmt.Errorf("get_active_markets_for_sampling failed: %v", res.Error) + } + + // Should include our newly created market + require.NotEmpty(t, activeMarkets, "Should have at least one active market") + t.Logf("Active markets for sampling: %v (created market: %d)", activeMarkets, marketID) + + // Verify our market is in the list + found := false + for _, id := range activeMarkets { + if id == marketID { + found = true + break + } + } + require.True(t, found, "Newly created market should be in active markets list") + + return nil + } +} + +// Helper to initialize ERC20 extension +func initERC20ForTest(ctx context.Context, platform *kwilTesting.Platform) error { + return erc20bridge.ForTestingInitializeExtension(ctx, platform) +} + +// TestLPRewardsExtensionConfig tests loading config via extension engine operations +func TestLPRewardsExtensionConfig(t *testing.T) { + testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "LP_REWARDS_EXTENSION_CONFIG", + SeedStatements: migrations.GetSeedScriptStatements(), + FunctionTests: []kwilTesting.TestFunc{ + testExtensionConfigLoad(t), + }, + }, testutils.GetTestOptionsWithCache()) +} + +// testExtensionConfigLoad tests that extension can load config from database +func testExtensionConfigLoad(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Query the config table directly to verify it exists and has data + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + Timestamp: time.Now().Unix(), + }, + Signer: []byte("anonymous"), + Caller: "anonymous", + TxID: platform.Txid(), + Authenticator: "anonymous", + } + engineCtx := &common.EngineContext{TxContext: tx} + + var enabled bool + var samplingInterval int64 + var maxMarkets int64 + var found bool + + err := platform.Engine.Execute( + engineCtx, + platform.DB, + "SELECT enabled, sampling_interval_blocks, max_markets_per_run FROM lp_rewards_config WHERE id = 1", + nil, + func(row *common.Row) error { + enabled = row.Values[0].(bool) + samplingInterval = row.Values[1].(int64) + // max_markets_per_run might be INT which returns as int64 + switch v := row.Values[2].(type) { + case int64: + maxMarkets = v + case int: + maxMarkets = int64(v) + case *types.Decimal: + // Handle if it's stored as NUMERIC + maxMarkets = 50 // default + } + found = true + return nil + }, + ) + require.NoError(t, err) + require.True(t, found, "Config row should exist") + + t.Logf("Extension config from DB: enabled=%v, interval=%d, max_markets=%d", + enabled, samplingInterval, maxMarkets) + + // Verify config values are valid + require.True(t, enabled) + require.Equal(t, int64(10), samplingInterval) + require.Equal(t, int64(50), maxMarkets) + + return nil + } +} diff --git a/tests/streams/order_book/settlement_payout_test.go b/tests/streams/order_book/settlement_payout_test.go index 3ccd25a0e..e0bfdec17 100644 --- a/tests/streams/order_book/settlement_payout_test.go +++ b/tests/streams/order_book/settlement_payout_test.go @@ -30,14 +30,15 @@ func TestSettlementPayouts(t *testing.T) { SeedStatements: migrations.GetSeedScriptStatements(), Owner: owner.Address(), FunctionTests: []kwilTesting.TestFunc{ - testWinnerReceives98PercentPayout(t), + testWinnerReceivesFullPayout(t), }, }, testutils.GetTestOptionsWithCache()) } -// testWinnerReceives98PercentPayout verifies that a winner receives 98% payout (2% fee deducted) -// Scenario: User creates 100 YES shares, market settles as YES, user receives 98 USDC -func testWinnerReceives98PercentPayout(t *testing.T) func(context.Context, *kwilTesting.Platform) error { +// testWinnerReceivesFullPayout verifies that a winner receives 100% payout (no fee) +// Scenario: User creates 100 YES shares, market settles as YES, user receives full 100 USDC +// Note: Settlement is zero-sum - losers fund winners, no redemption fee charged +func testWinnerReceivesFullPayout(t *testing.T) func(context.Context, *kwilTesting.Platform) error { return func(ctx context.Context, platform *kwilTesting.Platform) error { // Use valid Ethereum address as user userAddr := util.Unsafe_NewEthereumAddressFromString("0x2222222222222222222222222222222222222222") @@ -253,19 +254,20 @@ func testWinnerReceives98PercentPayout(t *testing.T) func(context.Context, *kwil require.Empty(t, positionsAfter, "All positions should be deleted after settlement") t.Logf("✓ All positions cleared after settlement") - // NEW: Verify user received 98% payout - // Expected: 100 shares × $1.00 - 2% fee = 98 USDC + // NEW: Verify user received full 100% payout (no redemption fee) + // Expected: 100 shares × $1.00 = 100 USDC (zero-sum settlement) balanceAfter, err := getUSDCBalance(ctx, platform, userAddr.Address()) require.NoError(t, err) t.Logf("User USDC balance after: %s", balanceAfter.String()) - // Net USDC change: -100 USDC (locked) + 98 USDC (payout) = -2 USDC (settlement fee only) + // Net USDC change: -100 USDC (locked) + 100 USDC (payout) = 0 USDC + // Settlement is zero-sum: winners get full $1 per share, losers lose their stake // Note: Market creation fee (2 TRUF) is separate from USDC netChange := new(big.Int).Sub(balanceAfter, balanceBefore) - expectedNetChange := new(big.Int).Mul(big.NewInt(-2), big.NewInt(1e18)) // -2 USDC (settlement fee) + expectedNetChange := big.NewInt(0) // No fee, zero-sum settlement require.Equal(t, expectedNetChange.String(), netChange.String(), - "USDC net change should be -2 (settlement fee only, market creation is in TRUF)") - t.Logf("✓ Net USDC balance change: %s (2 USDC settlement fee)", netChange.String()) + "USDC net change should be 0 (no redemption fee, zero-sum settlement)") + t.Logf("✓ Net USDC balance change: %s (zero-sum, no fee)", netChange.String()) return nil } diff --git a/tests/streams/order_book/split_limit_order_test.go b/tests/streams/order_book/split_limit_order_test.go index ffc7fe2e0..8ef9b645f 100644 --- a/tests/streams/order_book/split_limit_order_test.go +++ b/tests/streams/order_book/split_limit_order_test.go @@ -100,9 +100,8 @@ func testSplitOrderMarketSettled(t *testing.T) func(ctx context.Context, platfor // 1. Request attestation from TN oracle // 2. Verify the outcome (TRUE/FALSE) // 3. Cancel all outstanding orders - // 4. Payout winners (minus 2% fee) - // 5. Distribute fees to LPs and network - // 6. Mark market as settled + // 4. Payout winners (full $1 per share, zero-sum settlement) + // 5. Mark market as settled // For now, we bypass this entire flow just to test the "market settled" validation. tx := &common.TxContext{ Ctx: ctx,