Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions extensions/tn_digest/engine_ops_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build kwiltest

package tn_digest

import (
Expand Down
164 changes: 163 additions & 1 deletion extensions/tn_digest/internal/engine_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/trufnetwork/kwil-db/common"
"github.com/trufnetwork/kwil-db/core/crypto/auth"
"github.com/trufnetwork/kwil-db/core/log"
ktypes "github.com/trufnetwork/kwil-db/core/types"
sql "github.com/trufnetwork/kwil-db/node/types/sql"
"github.com/trufnetwork/kwil-db/node/types/sql"
)

// DigestTxResult represents the parsed result from an auto_digest transaction
Expand Down Expand Up @@ -233,6 +234,167 @@ func (e *EngineOperations) BroadcastAutoDigestWithArgsAndParse(
return result, nil
}

// BroadcastAutoDigestWithArgsAndRetry wraps broadcast with simple retry logic.
// On ANY error, it waits and refetches a fresh nonce from the database before retrying.
// This handles ALL error scenarios: timeouts, concurrent transactions, nonce collisions, etc.
func (e *EngineOperations) BroadcastAutoDigestWithArgsAndRetry(
ctx context.Context,
chainID string,
signer auth.Signer,
broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error),
deleteCap, expectedRecords, preserveDays int,
maxRetries int,
) (*DigestTxResult, error) {
var lastErr error
backoff := 5 * time.Second
maxBackoff := 60 * time.Second

for attempt := 0; attempt <= maxRetries; attempt++ {
if attempt > 0 {
e.logger.Warn("Retrying auto_digest broadcast with fresh nonce",
"attempt", attempt,
"max_retries", maxRetries,
"backoff", backoff,
"last_error", lastErr)

// Wait before retry (allows pending tx to resolve and prevents rapid retries)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(backoff):
// Continue to retry
}

// Exponential backoff with max cap (applied after wait for next retry)
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}

// ALWAYS fetch fresh nonce from database on each attempt
// This automatically handles: timeouts, concurrent transactions, nonce collisions, etc.
result, hash, err := e.broadcastAutoDigestWithFreshNonce(
ctx, chainID, signer, broadcaster,
deleteCap, expectedRecords, preserveDays,
)

if err == nil {
// Success!
return result, nil
}

// On ANY error, retry with fresh nonce after backoff
lastErr = err
e.logger.Warn("Broadcast failed, will retry with fresh nonce",
"attempt", attempt,
"tx_hash", hash.String(),
"error", err)
}

return nil, fmt.Errorf("max retries (%d) exceeded: %w", maxRetries, lastErr)
}

// broadcastAutoDigestWithFreshNonce always fetches a fresh nonce from the database before broadcasting.
// This ensures we don't use stale nonces even if other transactions have been submitted.
func (e *EngineOperations) broadcastAutoDigestWithFreshNonce(
ctx context.Context,
chainID string,
signer auth.Signer,
broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error),
deleteCap, expectedRecords, preserveDays int,
) (*DigestTxResult, ktypes.Hash, error) {
// Get the signer account ID
signerAccountID, err := ktypes.GetSignerAccount(signer)
if err != nil {
return nil, ktypes.Hash{}, fmt.Errorf("get signer account: %w", err)
}

// ALWAYS query database for current nonce (fresh state)
account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID)
var nextNonce uint64
if err != nil {
// Only treat "not found" / "no rows" as missing-account
msg := strings.ToLower(err.Error())
if !strings.Contains(msg, "not found") && !strings.Contains(msg, "no rows") {
return nil, ktypes.Hash{}, fmt.Errorf("get account: %w", err)
}
nextNonce = 1
e.logger.Info("Account not found, using nonce 1",
"account", fmt.Sprintf("%x", signerAccountID.Identifier))
} else {
// Account exists - use next nonce
nextNonce = uint64(account.Nonce + 1)
e.logger.Info("Fresh nonce from database",
"account", fmt.Sprintf("%x", signerAccountID.Identifier),
"db_nonce", account.Nonce,
"next_nonce", nextNonce,
"balance", account.Balance)
}

// Encode arguments
deleteCapArg, err := ktypes.EncodeValue(int64(deleteCap))
if err != nil {
return nil, ktypes.Hash{}, fmt.Errorf("encode deleteCap: %w", err)
}
expectedRecordsArg, err := ktypes.EncodeValue(int64(expectedRecords))
if err != nil {
return nil, ktypes.Hash{}, fmt.Errorf("encode expectedRecords: %w", err)
}
preserveDaysArg, err := ktypes.EncodeValue(int64(preserveDays))
if err != nil {
return nil, ktypes.Hash{}, fmt.Errorf("encode preserveDays: %w", err)
}

payload := &ktypes.ActionExecution{
Namespace: "main",
Action: "auto_digest",
Arguments: [][]*ktypes.EncodedValue{{
deleteCapArg, expectedRecordsArg, preserveDaysArg,
}},
}

// Create transaction with fresh nonce
tx, err := ktypes.CreateNodeTransaction(payload, chainID, nextNonce)
if err != nil {
return nil, ktypes.Hash{}, fmt.Errorf("create tx: %w", err)
}
if err := tx.Sign(signer); err != nil {
return nil, ktypes.Hash{}, fmt.Errorf("sign tx: %w", err)
}

// Broadcast
hash, txResult, err := broadcaster(ctx, tx, 1)
if err != nil {
// Return error but also return hash for logging purposes
return nil, hash, err
}

// Check transaction result code before parsing logs
if txResult.Code != uint32(ktypes.CodeOk) {
return nil, hash, fmt.Errorf("transaction failed with code %d (expected %d): %s",
txResult.Code, uint32(ktypes.CodeOk), txResult.Log)
}

// Parse the digest result from the transaction log
result, err := parseDigestResultFromTxLog(txResult.Log)
if err != nil {
return nil, hash, fmt.Errorf("parse digest result: %w", err)
}

e.logger.Info("auto_digest with args completed",
"processed_days", result.ProcessedDays,
"deleted_rows", result.TotalDeletedRows,
"has_more", result.HasMoreToDelete,
"tx_hash", hash.String(),
"nonce", nextNonce,
"delete_cap", deleteCap,
"expected_records", expectedRecords,
"preserve_days", preserveDays)

return result, hash, nil
}

// parseDigestResultFromTxLog parses the digest result from transaction log output
// The digest action outputs log entries like: "auto_digest:{...json...}"
func parseDigestResultFromTxLog(logOutput string) (*DigestTxResult, error) {
Expand Down
Loading
Loading