Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ jobs:
while [ $attempt -le $max_attempts ]; do
echo "Test attempt $attempt of $max_attempts"

# Clean dangling Docker state only on retries
# Always try to cleanup lingering Kwil DB resources before each attempt
bash node/scripts/ci-cleanup.sh || true

# Additional cleanup only on retries
if [ $attempt -gt 1 ]; then
if [ -f docker-compose.yml ]; then
docker compose down -v || true
else
docker system prune -af --volumes || true
fi
docker compose -f node/compose.yaml down -v || true
docker system prune -af --volumes || true
sleep 5
fi

Expand Down
7 changes: 7 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ tasks:
- go install github.com/golangci/golangci-lint/cmd/golangci-lint@v2.2.1
- go install github.com/vektra/mockery/v2@v2.42.1

# ─── ci ───────────────────────────────────────────────────────────────────────

ci:cleanup:
desc: Cleanup any lingering Kwil DB containers/processes (for CI retries)
cmds:
- bash ./scripts/ci-cleanup.sh

mockery:
desc: Generate mocks
cmds:
Expand Down
37 changes: 35 additions & 2 deletions extensions/tn_digest/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"net/url"
"strings"
"time"

rpcclient "github.com/trufnetwork/kwil-db/core/rpc/client"
rpcuser "github.com/trufnetwork/kwil-db/core/rpc/client/user/jsonrpc"
Expand Down Expand Up @@ -50,7 +51,7 @@ func normalizeListenAddressForClient(listen string) (*url.URL, error) {
func makeBroadcasterFromURL(u *url.URL) TxBroadcaster {
userClient := rpcuser.NewClient(u)
return txBroadcasterFunc(func(ctx context.Context, tx *types.Transaction, sync uint8) (types.Hash, *types.TxResult, error) {
// Map sync flag to a broadcast mode; default to WaitAccept (mempool accept).
// Map sync flag to broadcast mode (callers should pass 1 for WaitCommit)
mode := rpcclient.BroadcastWaitAccept
if sync == uint8(rpcclient.BroadcastWaitCommit) || sync == 1 {
mode = rpcclient.BroadcastWaitCommit
Expand All @@ -59,6 +60,38 @@ func makeBroadcasterFromURL(u *url.URL) TxBroadcaster {
if err != nil {
return types.Hash{}, nil, err
}
return h, nil, nil

// Query the transaction result to get the log output for parsing
var txQueryResp *types.TxQueryResponse
var queryErr error
if mode == rpcclient.BroadcastWaitAccept {
// In Accept mode, commit may not be immediate: perform short polling.
for tries := 0; tries < 10; tries++ {
txQueryResp, queryErr = userClient.TxQuery(ctx, h)
if queryErr == nil && txQueryResp != nil && txQueryResp.Result != nil {
break
}
// brief backoff (non-blocking if ctx is canceled)
select {
case <-ctx.Done():
return types.Hash{}, nil, ctx.Err()
case <-time.After(200 * time.Millisecond):
}
}
if queryErr != nil {
return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", queryErr)
}
} else {
txQueryResp, queryErr = userClient.TxQuery(ctx, h)
if queryErr != nil {
return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", queryErr)
}
}

if txQueryResp == nil || txQueryResp.Result == nil {
return types.Hash{}, nil, fmt.Errorf("transaction result is nil")
}

return h, txQueryResp.Result, nil
})
}
4 changes: 2 additions & 2 deletions extensions/tn_digest/engine_ops_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestBuildAndBroadcastAutoDigestTx_VerifiesTxBuildSignAndDBEffect(t *testing
// Create accounts service
accts, err := accounts.InitializeAccountStore(ctx, platform.DB, log.New())
require.NoError(t, err)

// Prepare EngineOperations
ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New())

Expand Down Expand Up @@ -67,7 +67,7 @@ func TestBuildAndBroadcastAutoDigestTx_VerifiesTxBuildSignAndDBEffect(t *testing
_, execErr := platform.Engine.Call(engCtx, platform.DB, "main", "auto_digest", []any{}, func(_ *common.Row) error { return nil })
require.NoError(t, execErr)

return types.Hash{}, &types.TxResult{Code: uint32(types.CodeOk)}, nil
return types.Hash{}, &types.TxResult{Code: uint32(types.CodeOk), Log: "auto_digest:{\"processed_days\":1,\"total_deleted_rows\":0,\"has_more_to_delete\":false}"}, nil
}

// Build and broadcast via ops
Expand Down
180 changes: 179 additions & 1 deletion extensions/tn_digest/internal/engine_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"encoding/json"
"fmt"
"strings"

Expand All @@ -12,6 +13,13 @@ import (
sql "github.com/trufnetwork/kwil-db/node/types/sql"
)

// DigestTxResult represents the parsed result from an auto_digest transaction
type DigestTxResult struct {
ProcessedDays int
TotalDeletedRows int
HasMoreToDelete bool
}

// EngineOperations wraps engine calls needed by the digest extension.
type EngineOperations struct {
engine common.Engine
Expand Down Expand Up @@ -98,8 +106,178 @@ func (e *EngineOperations) BuildAndBroadcastAutoDigestTx(ctx context.Context, ch
if err := tx.Sign(signer); err != nil {
return fmt.Errorf("sign tx: %w", err)
}
if _, _, err := broadcaster(ctx, tx, 0); err != nil {
hash, txResult, err := broadcaster(ctx, tx, 1)
if err != nil {
return fmt.Errorf("broadcast tx: %w", err)
}

// Check transaction result code before parsing logs
if txResult.Code != uint32(ktypes.CodeOk) {
return fmt.Errorf("transaction failed with code %d (expected %d): %s",
txResult.Code, uint32(ktypes.CodeOk), txResult.Log)
}

// Parse the digest result from the transaction log
result, err := parseDigestResultFromTxLog(txResult.Log)
if err != nil {
return fmt.Errorf("failed to parse digest result: %w", err)
}

e.logger.Info("auto_digest completed",
"processed_days", result.ProcessedDays,
"deleted_rows", result.TotalDeletedRows,
"has_more", result.HasMoreToDelete,
"tx_hash", hash.String())

return nil
}

// BroadcastAutoDigestWithArgsAndParse builds an ActionExecution tx for auto_digest with args and broadcasts it,
// then parses the result from the transaction log
func (e *EngineOperations) BroadcastAutoDigestWithArgsAndParse(
ctx context.Context,
chainID string,
signer auth.Signer,
broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error),
deleteCap, expectedRecords, preserveDays int,
) (*DigestTxResult, error) {
// Get the signer account ID
signerAccountID, err := ktypes.GetSignerAccount(signer)
if err != nil {
return nil, fmt.Errorf("failed to get signer account: %w", err)
}

// Get account information using the accounts service directly on database
account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID)
var nextNonce uint64
if err != nil {
// Only treat “not found” / “no rows” as missing-account; fail fast on any other error
msg := strings.ToLower(err.Error())
if !strings.Contains(msg, "not found") && !strings.Contains(msg, "no rows") {
return nil, fmt.Errorf("get account: %w", err)
}
e.logger.Info(
"Account not found, using nonce 1 for first transaction",
"account", fmt.Sprintf("%x", signerAccountID.Identifier),
)
nextNonce = 1
} else {
// Account exists - use next nonce
nextNonce = uint64(account.Nonce + 1)
e.logger.Info(
"Account found, using next nonce",
"account", fmt.Sprintf("%x", signerAccountID.Identifier),
"currentNonce", account.Nonce,
"nextNonce", nextNonce,
"balance", account.Balance,
)
}

// Encode arguments
deleteCapArg, err := ktypes.EncodeValue(int64(deleteCap))
if err != nil {
return nil, fmt.Errorf("encode deleteCap: %w", err)
}
expectedRecordsArg, err := ktypes.EncodeValue(int64(expectedRecords))
if err != nil {
return nil, fmt.Errorf("encode expectedRecords: %w", err)
}
preserveDaysArg, err := ktypes.EncodeValue(int64(preserveDays))
if err != nil {
return nil, fmt.Errorf("encode preserveDays: %w", err)
}

payload := &ktypes.ActionExecution{
Namespace: "main",
Action: "auto_digest",
Arguments: [][]*ktypes.EncodedValue{{
deleteCapArg, expectedRecordsArg, preserveDaysArg,
}},
}

// Create transaction with proper nonce
tx, err := ktypes.CreateNodeTransaction(payload, chainID, nextNonce)
if err != nil {
return nil, fmt.Errorf("create tx: %w", err)
}
if err := tx.Sign(signer); err != nil {
return nil, fmt.Errorf("sign tx: %w", err)
}

hash, txResult, err := broadcaster(ctx, tx, 1)
if err != nil {
return nil, fmt.Errorf("broadcast tx: %w", err)
}

// Check transaction result code before parsing logs
if txResult.Code != uint32(ktypes.CodeOk) {
return nil, fmt.Errorf("transaction failed with code %d (expected %d): %s",
txResult.Code, uint32(ktypes.CodeOk), txResult.Log)
}

// Parse the digest result from the transaction log
result, err := parseDigestResultFromTxLog(txResult.Log)
if err != nil {
return nil, fmt.Errorf("failed to parse digest result: %w", err)
}

e.logger.Info("auto_digest with args completed",
"processed_days", result.ProcessedDays,
"deleted_rows", result.TotalDeletedRows,
"has_more", result.HasMoreToDelete,
"tx_hash", hash.String(),
"delete_cap", deleteCap,
"expected_records", expectedRecords,
"preserve_days", preserveDays)

return result, nil
}

// parseDigestResultFromTxLog parses the digest result from transaction log output
// The digest action outputs log entries like: "auto_digest:{...json...}"
func parseDigestResultFromTxLog(logOutput string) (*DigestTxResult, error) {
if logOutput == "" {
return nil, fmt.Errorf("empty log output")
}

// Split log into lines and look for auto_digest entries
lines := strings.Split(logOutput, "\n")
var digestJSON string

// Find the last auto_digest line
for _, line := range lines {
if strings.Contains(line, "auto_digest:") {
// Extract JSON part after "auto_digest:"
parts := strings.SplitN(line, "auto_digest:", 2)
if len(parts) == 2 {
digestJSON = strings.TrimSpace(parts[1])
}
}
}

if digestJSON == "" {
return nil, fmt.Errorf("no auto_digest log entry found")
}

// Remove surrounding quotes if present (in case the JSON is quoted in the log)
digestJSON = strings.Trim(digestJSON, `"`)

// Parse JSON properly
var jsonResult struct {
ProcessedDays int `json:"processed_days"`
TotalDeletedRows int `json:"total_deleted_rows"`
HasMoreToDelete bool `json:"has_more_to_delete"`
}

if err := json.Unmarshal([]byte(digestJSON), &jsonResult); err != nil {
return nil, fmt.Errorf("failed to parse digest JSON: %w", err)
}

result := &DigestTxResult{
ProcessedDays: jsonResult.ProcessedDays,
TotalDeletedRows: jsonResult.TotalDeletedRows,
HasMoreToDelete: jsonResult.HasMoreToDelete,
}

return result, nil
}
45 changes: 45 additions & 0 deletions extensions/tn_digest/internal/engine_ops_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package internal

import "testing"

func TestParseDigestResultFromTxLog_HasMoreTrue(t *testing.T) {
log := "INFO something\nNOTICE: auto_digest:{\"processed_days\":2,\"total_deleted_rows\":500,\"has_more_to_delete\":true}\nother"
res, err := parseDigestResultFromTxLog(log)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if res.ProcessedDays != 2 {
t.Fatalf("processed_days: want 2, got %d", res.ProcessedDays)
}
if res.TotalDeletedRows != 500 {
t.Fatalf("total_deleted_rows: want 500, got %d", res.TotalDeletedRows)
}
if !res.HasMoreToDelete {
t.Fatalf("has_more_to_delete: want true, got false")
}
}

func TestParseDigestResultFromTxLog_HasMoreFalse(t *testing.T) {
log := "auto_digest:{\"processed_days\":1,\"total_deleted_rows\":1234,\"has_more_to_delete\":false}"
res, err := parseDigestResultFromTxLog(log)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if res.ProcessedDays != 1 {
t.Fatalf("processed_days: want 1, got %d", res.ProcessedDays)
}
if res.TotalDeletedRows != 1234 {
t.Fatalf("total_deleted_rows: want 1234, got %d", res.TotalDeletedRows)
}
if res.HasMoreToDelete {
t.Fatalf("has_more_to_delete: want false, got true")
}
}

func TestParseDigestResultFromTxLog_NoEntry(t *testing.T) {
log := "INFO: nothing relevant here\nNOTICE: something else"
_, err := parseDigestResultFromTxLog(log)
if err == nil {
t.Fatalf("expected error for missing auto_digest entry, got nil")
}
}
13 changes: 13 additions & 0 deletions extensions/tn_digest/scheduler/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package scheduler

import "time"

const (
// Digest drain mode constants (scheduler-scoped to avoid import cycles)
DigestDeleteCap = 100_000
DigestExpectedRecordsPerStream = 24
DigestPreservePastDays = 2
DrainRunDelay = 60 * time.Second // 1 minute
DrainMaxRuns = 100
DrainMaxConsecutiveFailures = 5
)
Loading
Loading