From 03ed6875f90ff59c56ca229bc4c37de73ec6f1bf Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Thu, 2 Oct 2025 14:34:40 +0700 Subject: [PATCH 1/2] fix: digest failing to prune pending days due to nonce errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves issue where digest operations failed to complete, leaving 596K+ pending prune days unprocessed. The digest scheduler halted midway through processing due to transaction nonce collisions during retries. **Problem:** - Digest stopped processing on Sept 30, leaving days 20353-20361+ pending - 596,375 days accumulated in pending_prune_days table - Root cause: broadcast timeout → retry used same nonce → "transaction already exists" - Concurrent transactions from same account caused nonce conflicts - System required manual intervention to resume **Solution:** Implemented stateless retry logic that always refetches fresh nonce from database on each attempt. This automatically handles: - Network timeouts - Concurrent transaction activity - Nonce gaps and database state changes **Implementation:** - Added `BroadcastAutoDigestWithArgsAndRetry()` with exponential backoff (5s → 60s max) - Retry on error - Fresh nonce query before each broadcast attempt - Context-aware cancellation support - Maximum 3 retries per digest run **Testing:** - Added 6 unit tests covering retry scenarios - Verified fresh nonce refetch on each attempt - Tests for timeout, cancellation, max retries, and transaction failures - Added build tag `//go:build kwiltest` to integration test - All 19 tests passing **Files Changed:** - `extensions/tn_digest/internal/engine_ops.go` - Core retry logic - `extensions/tn_digest/scheduler/scheduler.go` - Scheduler integration - `extensions/tn_digest/internal/engine_ops_test.go` - Comprehensive test coverage - `extensions/tn_digest/engine_ops_integration_test.go` - Build tag fix This ensures digest operations continue reliably even during network congestion or concurrent transaction activity, eliminating the nonce collision failure mode observed in production. resolves: https://github.com/trufnetwork/truf-network/issues/1241 --- .../tn_digest/engine_ops_integration_test.go | 2 + extensions/tn_digest/internal/engine_ops.go | 164 ++++++++- .../tn_digest/internal/engine_ops_test.go | 315 +++++++++++++++++- extensions/tn_digest/scheduler/scheduler.go | 6 +- 4 files changed, 483 insertions(+), 4 deletions(-) diff --git a/extensions/tn_digest/engine_ops_integration_test.go b/extensions/tn_digest/engine_ops_integration_test.go index 2be736f8f..b54ed7a49 100644 --- a/extensions/tn_digest/engine_ops_integration_test.go +++ b/extensions/tn_digest/engine_ops_integration_test.go @@ -1,3 +1,5 @@ +//go:build kwiltest + package tn_digest import ( diff --git a/extensions/tn_digest/internal/engine_ops.go b/extensions/tn_digest/internal/engine_ops.go index 020ffc722..8618e48ec 100644 --- a/extensions/tn_digest/internal/engine_ops.go +++ b/extensions/tn_digest/internal/engine_ops.go @@ -5,12 +5,13 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/trufnetwork/kwil-db/common" "github.com/trufnetwork/kwil-db/core/crypto/auth" "github.com/trufnetwork/kwil-db/core/log" ktypes "github.com/trufnetwork/kwil-db/core/types" - sql "github.com/trufnetwork/kwil-db/node/types/sql" + "github.com/trufnetwork/kwil-db/node/types/sql" ) // DigestTxResult represents the parsed result from an auto_digest transaction @@ -233,6 +234,167 @@ func (e *EngineOperations) BroadcastAutoDigestWithArgsAndParse( return result, nil } +// BroadcastAutoDigestWithArgsAndRetry wraps broadcast with simple retry logic. +// On ANY error, it waits and refetches a fresh nonce from the database before retrying. +// This handles ALL error scenarios: timeouts, concurrent transactions, nonce collisions, etc. +func (e *EngineOperations) BroadcastAutoDigestWithArgsAndRetry( + ctx context.Context, + chainID string, + signer auth.Signer, + broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), + deleteCap, expectedRecords, preserveDays int, + maxRetries int, +) (*DigestTxResult, error) { + var lastErr error + backoff := 5 * time.Second + maxBackoff := 60 * time.Second + + for attempt := 0; attempt <= maxRetries; attempt++ { + if attempt > 0 { + e.logger.Warn("Retrying auto_digest broadcast with fresh nonce", + "attempt", attempt, + "max_retries", maxRetries, + "backoff", backoff, + "last_error", lastErr) + + // Wait before retry (allows pending tx to resolve and prevents rapid retries) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(backoff): + // Continue to retry + } + } + + // ALWAYS fetch fresh nonce from database on each attempt + // This automatically handles: timeouts, concurrent transactions, nonce collisions, etc. + result, hash, err := e.broadcastAutoDigestWithFreshNonce( + ctx, chainID, signer, broadcaster, + deleteCap, expectedRecords, preserveDays, + ) + + if err == nil { + // Success! + return result, nil + } + + // On ANY error, retry with fresh nonce after backoff + lastErr = err + e.logger.Warn("Broadcast failed, will retry with fresh nonce", + "attempt", attempt, + "tx_hash", hash.String(), + "error", err) + + // Exponential backoff with max cap + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + + return nil, fmt.Errorf("max retries (%d) exceeded: %w", maxRetries, lastErr) +} + +// broadcastAutoDigestWithFreshNonce always fetches a fresh nonce from the database before broadcasting. +// This ensures we don't use stale nonces even if other transactions have been submitted. +func (e *EngineOperations) broadcastAutoDigestWithFreshNonce( + ctx context.Context, + chainID string, + signer auth.Signer, + broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), + deleteCap, expectedRecords, preserveDays int, +) (*DigestTxResult, ktypes.Hash, error) { + // Get the signer account ID + signerAccountID, err := ktypes.GetSignerAccount(signer) + if err != nil { + return nil, ktypes.Hash{}, fmt.Errorf("get signer account: %w", err) + } + + // ALWAYS query database for current nonce (fresh state) + account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + var nextNonce uint64 + if err != nil { + // Only treat "not found" / "no rows" as missing-account + msg := strings.ToLower(err.Error()) + if !strings.Contains(msg, "not found") && !strings.Contains(msg, "no rows") { + return nil, ktypes.Hash{}, fmt.Errorf("get account: %w", err) + } + nextNonce = 1 + e.logger.Info("Account not found, using nonce 1", + "account", fmt.Sprintf("%x", signerAccountID.Identifier)) + } else { + // Account exists - use next nonce + nextNonce = uint64(account.Nonce + 1) + e.logger.Info("Fresh nonce from database", + "account", fmt.Sprintf("%x", signerAccountID.Identifier), + "db_nonce", account.Nonce, + "next_nonce", nextNonce, + "balance", account.Balance) + } + + // Encode arguments + deleteCapArg, err := ktypes.EncodeValue(int64(deleteCap)) + if err != nil { + return nil, ktypes.Hash{}, fmt.Errorf("encode deleteCap: %w", err) + } + expectedRecordsArg, err := ktypes.EncodeValue(int64(expectedRecords)) + if err != nil { + return nil, ktypes.Hash{}, fmt.Errorf("encode expectedRecords: %w", err) + } + preserveDaysArg, err := ktypes.EncodeValue(int64(preserveDays)) + if err != nil { + return nil, ktypes.Hash{}, fmt.Errorf("encode preserveDays: %w", err) + } + + payload := &ktypes.ActionExecution{ + Namespace: "main", + Action: "auto_digest", + Arguments: [][]*ktypes.EncodedValue{{ + deleteCapArg, expectedRecordsArg, preserveDaysArg, + }}, + } + + // Create transaction with fresh nonce + tx, err := ktypes.CreateNodeTransaction(payload, chainID, nextNonce) + if err != nil { + return nil, ktypes.Hash{}, fmt.Errorf("create tx: %w", err) + } + if err := tx.Sign(signer); err != nil { + return nil, ktypes.Hash{}, fmt.Errorf("sign tx: %w", err) + } + + // Broadcast + hash, txResult, err := broadcaster(ctx, tx, 1) + if err != nil { + // Return error but also return hash for logging purposes + return nil, hash, err + } + + // Check transaction result code before parsing logs + if txResult.Code != uint32(ktypes.CodeOk) { + return nil, hash, fmt.Errorf("transaction failed with code %d (expected %d): %s", + txResult.Code, uint32(ktypes.CodeOk), txResult.Log) + } + + // Parse the digest result from the transaction log + result, err := parseDigestResultFromTxLog(txResult.Log) + if err != nil { + return nil, hash, fmt.Errorf("parse digest result: %w", err) + } + + e.logger.Info("auto_digest with args completed", + "processed_days", result.ProcessedDays, + "deleted_rows", result.TotalDeletedRows, + "has_more", result.HasMoreToDelete, + "tx_hash", hash.String(), + "nonce", nextNonce, + "delete_cap", deleteCap, + "expected_records", expectedRecords, + "preserve_days", preserveDays) + + return result, hash, nil +} + // parseDigestResultFromTxLog parses the digest result from transaction log output // The digest action outputs log entries like: "auto_digest:{...json...}" func parseDigestResultFromTxLog(logOutput string) (*DigestTxResult, error) { diff --git a/extensions/tn_digest/internal/engine_ops_test.go b/extensions/tn_digest/internal/engine_ops_test.go index 06ed878be..a801a33aa 100644 --- a/extensions/tn_digest/internal/engine_ops_test.go +++ b/extensions/tn_digest/internal/engine_ops_test.go @@ -1,6 +1,19 @@ package internal -import "testing" +import ( + "context" + "errors" + "math/big" + "strings" + "testing" + "time" + + "github.com/trufnetwork/kwil-db/core/crypto" + "github.com/trufnetwork/kwil-db/core/crypto/auth" + "github.com/trufnetwork/kwil-db/core/log" + ktypes "github.com/trufnetwork/kwil-db/core/types" + "github.com/trufnetwork/kwil-db/node/types/sql" +) func TestParseDigestResultFromTxLog_HasMoreTrue(t *testing.T) { log := "INFO something\nNOTICE: auto_digest:{\"processed_days\":2,\"total_deleted_rows\":500,\"has_more_to_delete\":true}\nother" @@ -43,3 +56,303 @@ func TestParseDigestResultFromTxLog_NoEntry(t *testing.T) { t.Fatalf("expected error for missing auto_digest entry, got nil") } } + +// Mock implementations for testing retry logic + +type mockBroadcaster struct { + attempts int + failUntil int + returnError error + successResult *ktypes.TxResult +} + +func (m *mockBroadcaster) broadcast(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) { + m.attempts++ + + result := m.successResult + if result == nil { + result = &ktypes.TxResult{ + Code: uint32(ktypes.CodeOk), + Log: "auto_digest:{\"processed_days\":100,\"total_deleted_rows\":500,\"has_more_to_delete\":false}", + } + } + + if m.attempts <= m.failUntil { + // Return error but still return result (in case of broadcast errors with partial results) + return ktypes.Hash{}, result, m.returnError + } + + return ktypes.Hash{1, 2, 3}, result, nil +} + +type mockAccounts struct { + nonceCalls int +} + +func (m *mockAccounts) GetAccount(ctx context.Context, db sql.Executor, accountID *ktypes.AccountID) (*ktypes.Account, error) { + m.nonceCalls++ + return &ktypes.Account{ + ID: accountID, + Nonce: int64(m.nonceCalls), + Balance: big.NewInt(1000000), + }, nil +} + +func (m *mockAccounts) Credit(ctx context.Context, db sql.Executor, account *ktypes.AccountID, balance *big.Int) error { + return nil +} + +func (m *mockAccounts) Transfer(ctx context.Context, db sql.TxMaker, from, to *ktypes.AccountID, amt *big.Int) error { + return nil +} + +func (m *mockAccounts) ApplySpend(ctx context.Context, db sql.Executor, account *ktypes.AccountID, amount *big.Int, nonce int64) error { + return nil +} + +// Test retry logic without actual time delays by using context with timeout +func TestBroadcastAutoDigestWithArgsAndRetry_ImmediateSuccess(t *testing.T) { + accounts := &mockAccounts{} + broadcaster := &mockBroadcaster{failUntil: 0} // Success immediately + + priv, _, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + t.Fatalf("Failed to generate key: %v", err) + } + signer := auth.GetNodeSigner(priv) + + ops := &EngineOperations{ + logger: log.New(), + accounts: accounts, + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + result, err := ops.BroadcastAutoDigestWithArgsAndRetry( + ctx, "test-chain", signer, broadcaster.broadcast, + 10000, 24, 2, 3, + ) + + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + + if result == nil { + t.Fatal("Expected result, got nil") + } + + if broadcaster.attempts != 1 { + t.Errorf("Expected 1 attempt, got %d", broadcaster.attempts) + } + + if accounts.nonceCalls != 1 { + t.Errorf("Expected 1 nonce fetch, got %d", accounts.nonceCalls) + } + + if result.ProcessedDays != 100 { + t.Errorf("Expected 100 processed days, got %d", result.ProcessedDays) + } +} + +func TestBroadcastAutoDigestWithArgsAndRetry_RetriesOnError(t *testing.T) { + accounts := &mockAccounts{} + broadcaster := &mockBroadcaster{ + failUntil: 2, // Fail first 2 attempts + returnError: errors.New("network error"), + } + + priv, _, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + t.Fatalf("Failed to generate key: %v", err) + } + signer := auth.GetNodeSigner(priv) + + ops := &EngineOperations{ + logger: log.New(), + accounts: accounts, + } + + // Use very short timeout to fail fast instead of waiting for real backoff + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + _, err = ops.BroadcastAutoDigestWithArgsAndRetry( + ctx, "test-chain", signer, broadcaster.broadcast, + 10000, 24, 2, 3, + ) + + // Should get context deadline exceeded because of backoff delays + if err == nil { + t.Fatal("Expected error due to context timeout, got nil") + } + + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("Expected context.DeadlineExceeded, got: %v", err) + } + + // Should have attempted at least once + if broadcaster.attempts < 1 { + t.Errorf("Expected at least 1 attempt, got %d", broadcaster.attempts) + } +} + +func TestBroadcastAutoDigestWithArgsAndRetry_MaxRetriesExceeded(t *testing.T) { + accounts := &mockAccounts{} + broadcaster := &mockBroadcaster{ + failUntil: 10, // Always fail + returnError: errors.New("persistent error"), + } + + priv, _, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + t.Fatalf("Failed to generate key: %v", err) + } + signer := auth.GetNodeSigner(priv) + + ops := &EngineOperations{ + logger: log.New(), + accounts: accounts, + } + + // Use short timeout to fail fast + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + result, err := ops.BroadcastAutoDigestWithArgsAndRetry( + ctx, "test-chain", signer, broadcaster.broadcast, + 10000, 24, 2, 0, // maxRetries = 0 (only initial attempt) + ) + + if err == nil { + t.Fatal("Expected error, got nil") + } + + if result != nil { + t.Errorf("Expected nil result, got %v", result) + } + + // With maxRetries=0, should only attempt once + if broadcaster.attempts != 1 { + t.Errorf("Expected 1 attempt with maxRetries=0, got %d", broadcaster.attempts) + } +} + +func TestBroadcastAutoDigestWithArgsAndRetry_FreshNonceEachAttempt(t *testing.T) { + accounts := &mockAccounts{} + broadcaster := &mockBroadcaster{ + failUntil: 1, // Fail once, succeed on 2nd + returnError: errors.New("nonce error"), + } + + priv, _, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + t.Fatalf("Failed to generate key: %v", err) + } + signer := auth.GetNodeSigner(priv) + + ops := &EngineOperations{ + logger: log.New(), + accounts: accounts, + } + + // Use longer timeout to allow one retry + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err = ops.BroadcastAutoDigestWithArgsAndRetry( + ctx, "test-chain", signer, broadcaster.broadcast, + 10000, 24, 2, 3, + ) + + // Should succeed after retry + if err != nil { + t.Fatalf("Expected success after retry, got error: %v", err) + } + + // Should have fetched nonce twice (once per attempt) + if accounts.nonceCalls != 2 { + t.Errorf("Expected 2 nonce fetches (fresh nonce each attempt), got %d", accounts.nonceCalls) + } + + // Should have attempted twice + if broadcaster.attempts != 2 { + t.Errorf("Expected 2 attempts, got %d", broadcaster.attempts) + } +} + +func TestBroadcastAutoDigestWithArgsAndRetry_ContextCancellation(t *testing.T) { + accounts := &mockAccounts{} + broadcaster := &mockBroadcaster{ + failUntil: 10, // Always fail to trigger retries + returnError: errors.New("error"), + } + + priv, _, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + t.Fatalf("Failed to generate key: %v", err) + } + signer := auth.GetNodeSigner(priv) + + ops := &EngineOperations{ + logger: log.New(), + accounts: accounts, + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + _, err = ops.BroadcastAutoDigestWithArgsAndRetry( + ctx, "test-chain", signer, broadcaster.broadcast, + 10000, 24, 2, 3, + ) + + if err == nil { + t.Fatal("Expected context cancellation error, got nil") + } + + if !errors.Is(err, context.Canceled) { + t.Errorf("Expected context.Canceled error, got: %v", err) + } +} + +func TestBroadcastAutoDigestWithArgsAndRetry_TransactionFailure(t *testing.T) { + accounts := &mockAccounts{} + broadcaster := &mockBroadcaster{ + failUntil: 10, // Always return this result + successResult: &ktypes.TxResult{ + Code: 99, // Non-OK code + Log: "transaction failed", + }, + } + + priv, _, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + t.Fatalf("Failed to generate key: %v", err) + } + signer := auth.GetNodeSigner(priv) + + ops := &EngineOperations{ + logger: log.New(), + accounts: accounts, + } + + // Set maxRetries=0 to only try once + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + _, err = ops.BroadcastAutoDigestWithArgsAndRetry( + ctx, "test-chain", signer, broadcaster.broadcast, + 10000, 24, 2, 0, // maxRetries=0 + ) + + if err == nil { + t.Fatal("Expected error for failed transaction, got nil") + } + + // Retry logic retries on ANY error, including transaction failures + // So we should get the transaction failure error + if !strings.Contains(err.Error(), "transaction failed with code 99") && !strings.Contains(err.Error(), "max retries") { + t.Errorf("Expected transaction failure or max retries error, got: %v", err) + } +} diff --git a/extensions/tn_digest/scheduler/scheduler.go b/extensions/tn_digest/scheduler/scheduler.go index 79764c29b..42740b947 100644 --- a/extensions/tn_digest/scheduler/scheduler.go +++ b/extensions/tn_digest/scheduler/scheduler.go @@ -120,7 +120,8 @@ func (s *DigestScheduler) Start(ctx context.Context, cronExpr string) error { runs++ - result, err := engineOps.BroadcastAutoDigestWithArgsAndParse( + // Use retry-aware broadcast method with fresh nonce refetch on each attempt + result, err := engineOps.BroadcastAutoDigestWithArgsAndRetry( jobCtx, chainID, signer, @@ -128,11 +129,12 @@ func (s *DigestScheduler) Start(ctx context.Context, cronExpr string) error { DigestDeleteCap, DigestExpectedRecordsPerStream, DigestPreservePastDays, + 3, // maxRetries = 3 attempts per run ) if err != nil { consecutiveFailures++ - s.logger.Warn("auto_digest broadcast failed", + s.logger.Warn("auto_digest broadcast failed after retries", "run", runs, "consecutive_failures", consecutiveFailures, "error", err) From 78c94893aed8ecc21b76256557035c5bf44a3df2 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Thu, 2 Oct 2025 17:05:44 +0700 Subject: [PATCH 2/2] chore: apply suggestion from coderabbitai --- extensions/tn_digest/internal/engine_ops.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/extensions/tn_digest/internal/engine_ops.go b/extensions/tn_digest/internal/engine_ops.go index 8618e48ec..5a00468f8 100644 --- a/extensions/tn_digest/internal/engine_ops.go +++ b/extensions/tn_digest/internal/engine_ops.go @@ -264,6 +264,12 @@ func (e *EngineOperations) BroadcastAutoDigestWithArgsAndRetry( case <-time.After(backoff): // Continue to retry } + + // Exponential backoff with max cap (applied after wait for next retry) + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } } // ALWAYS fetch fresh nonce from database on each attempt @@ -284,12 +290,6 @@ func (e *EngineOperations) BroadcastAutoDigestWithArgsAndRetry( "attempt", attempt, "tx_hash", hash.String(), "error", err) - - // Exponential backoff with max cap - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } } return nil, fmt.Errorf("max retries (%d) exceeded: %w", maxRetries, lastErr)