diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ad1047119..46c445cc9 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -79,13 +79,13 @@ jobs: while [ $attempt -le $max_attempts ]; do echo "Test attempt $attempt of $max_attempts" - # Clean dangling Docker state only on retries + # Always try to cleanup lingering Kwil DB resources before each attempt + bash node/scripts/ci-cleanup.sh || true + + # Additional cleanup only on retries if [ $attempt -gt 1 ]; then - if [ -f docker-compose.yml ]; then - docker compose down -v || true - else - docker system prune -af --volumes || true - fi + docker compose -f node/compose.yaml down -v || true + docker system prune -af --volumes || true sleep 5 fi diff --git a/Taskfile.yml b/Taskfile.yml index be6ff5073..7c6e6b488 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -235,6 +235,13 @@ tasks: - go install github.com/golangci/golangci-lint/cmd/golangci-lint@v2.2.1 - go install github.com/vektra/mockery/v2@v2.42.1 + # ─── ci ─────────────────────────────────────────────────────────────────────── + + ci:cleanup: + desc: Cleanup any lingering Kwil DB containers/processes (for CI retries) + cmds: + - bash ./scripts/ci-cleanup.sh + mockery: desc: Generate mocks cmds: diff --git a/extensions/tn_digest/broadcast.go b/extensions/tn_digest/broadcast.go index 41d2ba086..3b790a6c3 100644 --- a/extensions/tn_digest/broadcast.go +++ b/extensions/tn_digest/broadcast.go @@ -6,6 +6,7 @@ import ( "net" "net/url" "strings" + "time" rpcclient "github.com/trufnetwork/kwil-db/core/rpc/client" rpcuser "github.com/trufnetwork/kwil-db/core/rpc/client/user/jsonrpc" @@ -50,7 +51,7 @@ func normalizeListenAddressForClient(listen string) (*url.URL, error) { func makeBroadcasterFromURL(u *url.URL) TxBroadcaster { userClient := rpcuser.NewClient(u) return txBroadcasterFunc(func(ctx context.Context, tx *types.Transaction, sync uint8) (types.Hash, *types.TxResult, error) { - // Map sync flag to a broadcast mode; default to WaitAccept (mempool accept). + // Map sync flag to broadcast mode (callers should pass 1 for WaitCommit) mode := rpcclient.BroadcastWaitAccept if sync == uint8(rpcclient.BroadcastWaitCommit) || sync == 1 { mode = rpcclient.BroadcastWaitCommit @@ -59,6 +60,38 @@ func makeBroadcasterFromURL(u *url.URL) TxBroadcaster { if err != nil { return types.Hash{}, nil, err } - return h, nil, nil + + // Query the transaction result to get the log output for parsing + var txQueryResp *types.TxQueryResponse + var queryErr error + if mode == rpcclient.BroadcastWaitAccept { + // In Accept mode, commit may not be immediate: perform short polling. + for tries := 0; tries < 10; tries++ { + txQueryResp, queryErr = userClient.TxQuery(ctx, h) + if queryErr == nil && txQueryResp != nil && txQueryResp.Result != nil { + break + } + // brief backoff (non-blocking if ctx is canceled) + select { + case <-ctx.Done(): + return types.Hash{}, nil, ctx.Err() + case <-time.After(200 * time.Millisecond): + } + } + if queryErr != nil { + return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", queryErr) + } + } else { + txQueryResp, queryErr = userClient.TxQuery(ctx, h) + if queryErr != nil { + return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", queryErr) + } + } + + if txQueryResp == nil || txQueryResp.Result == nil { + return types.Hash{}, nil, fmt.Errorf("transaction result is nil") + } + + return h, txQueryResp.Result, nil }) } diff --git a/extensions/tn_digest/engine_ops_integration_test.go b/extensions/tn_digest/engine_ops_integration_test.go index c9c50d3e5..2897a23d8 100644 --- a/extensions/tn_digest/engine_ops_integration_test.go +++ b/extensions/tn_digest/engine_ops_integration_test.go @@ -36,7 +36,7 @@ func TestBuildAndBroadcastAutoDigestTx_VerifiesTxBuildSignAndDBEffect(t *testing // Create accounts service accts, err := accounts.InitializeAccountStore(ctx, platform.DB, log.New()) require.NoError(t, err) - + // Prepare EngineOperations ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New()) @@ -67,7 +67,7 @@ func TestBuildAndBroadcastAutoDigestTx_VerifiesTxBuildSignAndDBEffect(t *testing _, execErr := platform.Engine.Call(engCtx, platform.DB, "main", "auto_digest", []any{}, func(_ *common.Row) error { return nil }) require.NoError(t, execErr) - return types.Hash{}, &types.TxResult{Code: uint32(types.CodeOk)}, nil + return types.Hash{}, &types.TxResult{Code: uint32(types.CodeOk), Log: "auto_digest:{\"processed_days\":1,\"total_deleted_rows\":0,\"has_more_to_delete\":false}"}, nil } // Build and broadcast via ops diff --git a/extensions/tn_digest/internal/engine_ops.go b/extensions/tn_digest/internal/engine_ops.go index 4faccab17..020ffc722 100644 --- a/extensions/tn_digest/internal/engine_ops.go +++ b/extensions/tn_digest/internal/engine_ops.go @@ -2,6 +2,7 @@ package internal import ( "context" + "encoding/json" "fmt" "strings" @@ -12,6 +13,13 @@ import ( sql "github.com/trufnetwork/kwil-db/node/types/sql" ) +// DigestTxResult represents the parsed result from an auto_digest transaction +type DigestTxResult struct { + ProcessedDays int + TotalDeletedRows int + HasMoreToDelete bool +} + // EngineOperations wraps engine calls needed by the digest extension. type EngineOperations struct { engine common.Engine @@ -98,8 +106,178 @@ func (e *EngineOperations) BuildAndBroadcastAutoDigestTx(ctx context.Context, ch if err := tx.Sign(signer); err != nil { return fmt.Errorf("sign tx: %w", err) } - if _, _, err := broadcaster(ctx, tx, 0); err != nil { + hash, txResult, err := broadcaster(ctx, tx, 1) + if err != nil { return fmt.Errorf("broadcast tx: %w", err) } + + // Check transaction result code before parsing logs + if txResult.Code != uint32(ktypes.CodeOk) { + return fmt.Errorf("transaction failed with code %d (expected %d): %s", + txResult.Code, uint32(ktypes.CodeOk), txResult.Log) + } + + // Parse the digest result from the transaction log + result, err := parseDigestResultFromTxLog(txResult.Log) + if err != nil { + return fmt.Errorf("failed to parse digest result: %w", err) + } + + e.logger.Info("auto_digest completed", + "processed_days", result.ProcessedDays, + "deleted_rows", result.TotalDeletedRows, + "has_more", result.HasMoreToDelete, + "tx_hash", hash.String()) + return nil } + +// BroadcastAutoDigestWithArgsAndParse builds an ActionExecution tx for auto_digest with args and broadcasts it, +// then parses the result from the transaction log +func (e *EngineOperations) BroadcastAutoDigestWithArgsAndParse( + ctx context.Context, + chainID string, + signer auth.Signer, + broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), + deleteCap, expectedRecords, preserveDays int, +) (*DigestTxResult, error) { + // Get the signer account ID + signerAccountID, err := ktypes.GetSignerAccount(signer) + if err != nil { + return nil, fmt.Errorf("failed to get signer account: %w", err) + } + + // Get account information using the accounts service directly on database + account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + var nextNonce uint64 + if err != nil { + // Only treat “not found” / “no rows” as missing-account; fail fast on any other error + msg := strings.ToLower(err.Error()) + if !strings.Contains(msg, "not found") && !strings.Contains(msg, "no rows") { + return nil, fmt.Errorf("get account: %w", err) + } + e.logger.Info( + "Account not found, using nonce 1 for first transaction", + "account", fmt.Sprintf("%x", signerAccountID.Identifier), + ) + nextNonce = 1 + } else { + // Account exists - use next nonce + nextNonce = uint64(account.Nonce + 1) + e.logger.Info( + "Account found, using next nonce", + "account", fmt.Sprintf("%x", signerAccountID.Identifier), + "currentNonce", account.Nonce, + "nextNonce", nextNonce, + "balance", account.Balance, + ) + } + + // Encode arguments + deleteCapArg, err := ktypes.EncodeValue(int64(deleteCap)) + if err != nil { + return nil, fmt.Errorf("encode deleteCap: %w", err) + } + expectedRecordsArg, err := ktypes.EncodeValue(int64(expectedRecords)) + if err != nil { + return nil, fmt.Errorf("encode expectedRecords: %w", err) + } + preserveDaysArg, err := ktypes.EncodeValue(int64(preserveDays)) + if err != nil { + return nil, fmt.Errorf("encode preserveDays: %w", err) + } + + payload := &ktypes.ActionExecution{ + Namespace: "main", + Action: "auto_digest", + Arguments: [][]*ktypes.EncodedValue{{ + deleteCapArg, expectedRecordsArg, preserveDaysArg, + }}, + } + + // Create transaction with proper nonce + tx, err := ktypes.CreateNodeTransaction(payload, chainID, nextNonce) + if err != nil { + return nil, fmt.Errorf("create tx: %w", err) + } + if err := tx.Sign(signer); err != nil { + return nil, fmt.Errorf("sign tx: %w", err) + } + + hash, txResult, err := broadcaster(ctx, tx, 1) + if err != nil { + return nil, fmt.Errorf("broadcast tx: %w", err) + } + + // Check transaction result code before parsing logs + if txResult.Code != uint32(ktypes.CodeOk) { + return nil, fmt.Errorf("transaction failed with code %d (expected %d): %s", + txResult.Code, uint32(ktypes.CodeOk), txResult.Log) + } + + // Parse the digest result from the transaction log + result, err := parseDigestResultFromTxLog(txResult.Log) + if err != nil { + return nil, fmt.Errorf("failed to parse digest result: %w", err) + } + + e.logger.Info("auto_digest with args completed", + "processed_days", result.ProcessedDays, + "deleted_rows", result.TotalDeletedRows, + "has_more", result.HasMoreToDelete, + "tx_hash", hash.String(), + "delete_cap", deleteCap, + "expected_records", expectedRecords, + "preserve_days", preserveDays) + + return result, nil +} + +// parseDigestResultFromTxLog parses the digest result from transaction log output +// The digest action outputs log entries like: "auto_digest:{...json...}" +func parseDigestResultFromTxLog(logOutput string) (*DigestTxResult, error) { + if logOutput == "" { + return nil, fmt.Errorf("empty log output") + } + + // Split log into lines and look for auto_digest entries + lines := strings.Split(logOutput, "\n") + var digestJSON string + + // Find the last auto_digest line + for _, line := range lines { + if strings.Contains(line, "auto_digest:") { + // Extract JSON part after "auto_digest:" + parts := strings.SplitN(line, "auto_digest:", 2) + if len(parts) == 2 { + digestJSON = strings.TrimSpace(parts[1]) + } + } + } + + if digestJSON == "" { + return nil, fmt.Errorf("no auto_digest log entry found") + } + + // Remove surrounding quotes if present (in case the JSON is quoted in the log) + digestJSON = strings.Trim(digestJSON, `"`) + + // Parse JSON properly + var jsonResult struct { + ProcessedDays int `json:"processed_days"` + TotalDeletedRows int `json:"total_deleted_rows"` + HasMoreToDelete bool `json:"has_more_to_delete"` + } + + if err := json.Unmarshal([]byte(digestJSON), &jsonResult); err != nil { + return nil, fmt.Errorf("failed to parse digest JSON: %w", err) + } + + result := &DigestTxResult{ + ProcessedDays: jsonResult.ProcessedDays, + TotalDeletedRows: jsonResult.TotalDeletedRows, + HasMoreToDelete: jsonResult.HasMoreToDelete, + } + + return result, nil +} diff --git a/extensions/tn_digest/internal/engine_ops_test.go b/extensions/tn_digest/internal/engine_ops_test.go new file mode 100644 index 000000000..06ed878be --- /dev/null +++ b/extensions/tn_digest/internal/engine_ops_test.go @@ -0,0 +1,45 @@ +package internal + +import "testing" + +func TestParseDigestResultFromTxLog_HasMoreTrue(t *testing.T) { + log := "INFO something\nNOTICE: auto_digest:{\"processed_days\":2,\"total_deleted_rows\":500,\"has_more_to_delete\":true}\nother" + res, err := parseDigestResultFromTxLog(log) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if res.ProcessedDays != 2 { + t.Fatalf("processed_days: want 2, got %d", res.ProcessedDays) + } + if res.TotalDeletedRows != 500 { + t.Fatalf("total_deleted_rows: want 500, got %d", res.TotalDeletedRows) + } + if !res.HasMoreToDelete { + t.Fatalf("has_more_to_delete: want true, got false") + } +} + +func TestParseDigestResultFromTxLog_HasMoreFalse(t *testing.T) { + log := "auto_digest:{\"processed_days\":1,\"total_deleted_rows\":1234,\"has_more_to_delete\":false}" + res, err := parseDigestResultFromTxLog(log) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if res.ProcessedDays != 1 { + t.Fatalf("processed_days: want 1, got %d", res.ProcessedDays) + } + if res.TotalDeletedRows != 1234 { + t.Fatalf("total_deleted_rows: want 1234, got %d", res.TotalDeletedRows) + } + if res.HasMoreToDelete { + t.Fatalf("has_more_to_delete: want false, got true") + } +} + +func TestParseDigestResultFromTxLog_NoEntry(t *testing.T) { + log := "INFO: nothing relevant here\nNOTICE: something else" + _, err := parseDigestResultFromTxLog(log) + if err == nil { + t.Fatalf("expected error for missing auto_digest entry, got nil") + } +} diff --git a/extensions/tn_digest/scheduler/constants.go b/extensions/tn_digest/scheduler/constants.go new file mode 100644 index 000000000..a573b30f0 --- /dev/null +++ b/extensions/tn_digest/scheduler/constants.go @@ -0,0 +1,13 @@ +package scheduler + +import "time" + +const ( + // Digest drain mode constants (scheduler-scoped to avoid import cycles) + DigestDeleteCap = 100_000 + DigestExpectedRecordsPerStream = 24 + DigestPreservePastDays = 2 + DrainRunDelay = 60 * time.Second // 1 minute + DrainMaxRuns = 100 + DrainMaxConsecutiveFailures = 5 +) diff --git a/extensions/tn_digest/scheduler/scheduler.go b/extensions/tn_digest/scheduler/scheduler.go index 2b37c51df..79764c29b 100644 --- a/extensions/tn_digest/scheduler/scheduler.go +++ b/extensions/tn_digest/scheduler/scheduler.go @@ -72,7 +72,7 @@ func (s *DigestScheduler) Start(ctx context.Context, cronExpr string) error { // Clear any existing jobs to avoid duplicates on (re)start s.cron.Clear() - // Use background context for job execution to avoid cancellation issues + // Use scheduler context for job execution to enable cancellation on leadership loss jobFunc := func() { defer func() { if r := recover(); r != nil { @@ -80,8 +80,8 @@ func (s *DigestScheduler) Start(ctx context.Context, cronExpr string) error { } }() - // Create a fresh context for each job execution - jobCtx := context.Background() + // Use the scheduler's context so Stop() cancels the drain loop + jobCtx := s.ctx // Snapshot dependencies under lock to avoid races with setters. s.mu.Lock() @@ -96,11 +96,90 @@ func (s *DigestScheduler) Start(ctx context.Context, cronExpr string) error { return } chainID := kwilService.GenesisConfig.ChainID - if err := engineOps.BuildAndBroadcastAutoDigestTx(jobCtx, chainID, signer, broadcaster.BroadcastTx); err != nil { - s.logger.Warn("auto_digest broadcast failed", "error", err) - return + + // Implement drain mode: run auto_digest repeatedly until has_more=false + s.logger.Info("starting digest drain mode", + "delete_cap", DigestDeleteCap, + "expected_records", DigestExpectedRecordsPerStream, + "preserve_days", DigestPreservePastDays, + "max_runs", DrainMaxRuns) + + runs := 0 + consecutiveFailures := 0 + totalProcessedDays := 0 + totalDeletedRows := 0 + + for runs < DrainMaxRuns { + // Check for cancellation + select { + case <-jobCtx.Done(): + s.logger.Info("digest drain canceled", "runs_completed", runs) + return + default: + } + + runs++ + + result, err := engineOps.BroadcastAutoDigestWithArgsAndParse( + jobCtx, + chainID, + signer, + broadcaster.BroadcastTx, + DigestDeleteCap, + DigestExpectedRecordsPerStream, + DigestPreservePastDays, + ) + + if err != nil { + consecutiveFailures++ + s.logger.Warn("auto_digest broadcast failed", + "run", runs, + "consecutive_failures", consecutiveFailures, + "error", err) + + if consecutiveFailures >= DrainMaxConsecutiveFailures { + s.logger.Error("too many consecutive failures, aborting drain", + "consecutive_failures", consecutiveFailures, + "max_allowed", DrainMaxConsecutiveFailures) + return + } + } else { + consecutiveFailures = 0 + // Update cumulative totals + totalProcessedDays += result.ProcessedDays + totalDeletedRows += result.TotalDeletedRows + + s.logger.Info("digest run completed", + "run", runs, + "processed_days", result.ProcessedDays, + "deleted_rows", result.TotalDeletedRows, + "has_more", result.HasMoreToDelete, + "cumulative_processed", totalProcessedDays, + "cumulative_deleted", totalDeletedRows) + + // Check if we're done + if !result.HasMoreToDelete { + s.logger.Info("digest drain completed successfully", + "total_runs", runs, + "total_processed_days", totalProcessedDays, + "total_deleted_rows", totalDeletedRows) + return + } + } + + // Sleep between runs, but allow cancellation + select { + case <-jobCtx.Done(): + s.logger.Info("digest drain canceled during sleep", "runs_completed", runs) + return + case <-time.After(DrainRunDelay): + // Continue to next run + } } - s.logger.Info("auto_digest tx broadcasted") + + s.logger.Info("digest drain reached max runs", + "max_runs", DrainMaxRuns, + "runs_completed", runs) } if j, err := s.cron.Cron(cronExpr).Do(jobFunc); err != nil { diff --git a/extensions/tn_digest/scheduler_lifecycle.go b/extensions/tn_digest/scheduler_lifecycle.go index 58c833356..e59e833a2 100644 --- a/extensions/tn_digest/scheduler_lifecycle.go +++ b/extensions/tn_digest/scheduler_lifecycle.go @@ -40,8 +40,8 @@ func (e *Extension) ensureSchedulerWithService(service *common.Service) bool { return true } -func (e *Extension) startScheduler(ctx context.Context) error { - return e.Scheduler().Start(ctx, e.Schedule()) +func (e *Extension) startScheduler(_ context.Context) error { + return e.Scheduler().Start(context.Background(), e.Schedule()) } func (e *Extension) stopSchedulerIfRunning() { diff --git a/go.mod b/go.mod index f1faf3564..19350221b 100644 --- a/go.mod +++ b/go.mod @@ -19,9 +19,9 @@ require ( github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.37.0 - github.com/trufnetwork/kwil-db v0.10.3-0.20250829040222-799cf48f758e - github.com/trufnetwork/kwil-db/core v0.4.3-0.20250829040222-799cf48f758e - github.com/trufnetwork/sdk-go v0.4.3-0.20250821041427-ae33ba5dcd2c + github.com/trufnetwork/kwil-db v0.10.3-0.20250905175054-602e824e33c2 + github.com/trufnetwork/kwil-db/core v0.4.3-0.20250905175054-602e824e33c2 + github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa golang.org/x/sync v0.15.0 @@ -268,7 +268,7 @@ require ( golang.org/x/net v0.40.0 // indirect golang.org/x/sys v0.34.0 // indirect golang.org/x/term v0.32.0 // indirect - golang.org/x/text v0.25.0 // indirect + golang.org/x/text v0.25.0 golang.org/x/time v0.10.0 // indirect golang.org/x/tools v0.30.0 // indirect gonum.org/v1/gonum v0.15.1 // indirect diff --git a/go.sum b/go.sum index 95380158b..666ae848f 100644 --- a/go.sum +++ b/go.sum @@ -1212,16 +1212,14 @@ github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZ github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY= github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPDo= github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= -github.com/trufnetwork/kwil-db v0.10.3-0.20250829040222-799cf48f758e h1:pfdGR7fw1v7C5A5DQc8CdR+ROV2yKaRpWLuT4W86zx8= -github.com/trufnetwork/kwil-db v0.10.3-0.20250829040222-799cf48f758e/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250829040222-799cf48f758e h1:k0LKW2SdfBcJE2rpMrgWMR3xJojnS4wJPBOGEHB1Il4= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250829040222-799cf48f758e/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db v0.10.3-0.20250905175054-602e824e33c2 h1:sWkNssIA46NoEYD8db4zTHbXJecPvlm8OWbYbyW21Es= +github.com/trufnetwork/kwil-db v0.10.3-0.20250905175054-602e824e33c2/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250905175054-602e824e33c2 h1:ojCdkwDCQ3/i79zaKQa9PO5l/dBERRZnmP0SFhPOv24= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250905175054-602e824e33c2/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2/go.mod h1:Y0MJpPp9QXU5vC6Gpoilql2NkgmGNcbHm9HYC2v2N8s= github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 h1:d9EqPXIjbq/atzEncK5dM3Z9oStx1BxCGuL/sjefeCw= github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709/go.mod h1:lMhUy1bin5eNoDVkeuvG8kNfJuGDmXLpNPicR9vh4eA= -github.com/trufnetwork/sdk-go v0.4.3-0.20250821041427-ae33ba5dcd2c h1:8S/w2IHvHMD613knvfwbV7/AiW7wMH9gNQzxpEqk/9s= -github.com/trufnetwork/sdk-go v0.4.3-0.20250821041427-ae33ba5dcd2c/go.mod h1:Qt82Hye1L7QB09QNSXcuN6+ZgoeBLZFX5ErDqEO5DMs= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.10 h1:p8Fspmz3iTctJstry1PYS3HVdllxnEzTEsgIgtxTrCk= github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= diff --git a/internal/migrations/020-digest-actions.sql b/internal/migrations/020-digest-actions.sql index 36f535d38..0d9d06750 100644 --- a/internal/migrations/020-digest-actions.sql +++ b/internal/migrations/020-digest-actions.sql @@ -1,765 +1,778 @@ -/* - * DIGEST ACTIONS MIGRATION - * - * Implements optimized digest system with UNNEST batch processing: - * - batch_digest: Direct OHLC processing with bulk operations (replaces digest_daily) - * - auto_digest: Batch process multiple pending days using optimized batch_digest - * - get_daily_ohlc: Query daily OHLC data from raw or digested sources - */ - --- ============================================================================= --- CORE DIGEST ACTIONS --- ============================================================================= - -/** - * batch_digest: Single-statement bulk processing with CTEs for maximum performance - * - * Implements complete bulk operations in a single SQL statement using CTEs. - * Eliminates array ↔︎ SQL round-trips and multiple statement overhead. - * - * Performance benefits: - * - Single SQL statement with all operations - * - Zero array conversions and UNNEST overhead - * - Relation-based joins instead of expensive anti-joins - * - Reuses intermediate results efficiently - * - Planner can optimize the entire pipeline - */ -CREATE OR REPLACE ACTION batch_digest( - $stream_refs INT[], - $day_indexes INT[], - $delete_cap INT DEFAULT 10000, - $preserve_past_days INT DEFAULT 2 -) PUBLIC RETURNS TABLE( - processed_days INT, - total_deleted_rows INT, - total_preserved_rows INT, - has_more_to_delete BOOL -) { - -- Leader authorization check, keep it commented out for now so test passing and until we can inject how leader is - -- if @caller != @leader { - -- ERROR('Only the leader node can execute batch digest operations'); - -- } - - -- Validate input arrays have same length - if COALESCE(array_length($stream_refs), 0) != COALESCE(array_length($day_indexes), 0) { - ERROR('stream_refs and day_indexes arrays must have the same length'); - } - - if COALESCE(array_length($stream_refs), 0) = 0 { - RETURN 0, 0, 0, false; - } - - $total_processed := 0; - $total_deleted := 0; - $total_preserved := 0; - $has_more_to_delete := false; - $events_probe_count := 0; - $events_deleted_count := 0; - $marker_cap := 0; - $markers_probe_count := 0; - $markers_deleted_count := 0; - - -- Kwil-compatible execution with separate statements and EXISTS - -- Check if we have any targets to process - $has_targets := false; - for $result in - WITH targets AS ( - SELECT - ord, - sr AS stream_ref, - di AS day_index, - (di * 86400) AS day_start, - (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) - WITH ORDINALITY AS u(sr, di, ord) - ) - SELECT COUNT(*) AS target_count FROM targets - { - if $result.target_count > 0 { - $has_targets := true; - } - } - - -- Only proceed if we have targets - if $has_targets { - -- Step 2: Derive day events using windowed ranges (fewer scans) - for $result in - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ) - SELECT COUNT(*) AS processed_days FROM ohlc_rows - { - $total_processed := $result.processed_days; - } - - -- Step 3: Build keep-set relation - for $result in - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, - MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ), - keep_rows AS ( - SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL - UNION ALL - SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL - UNION ALL - SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL - UNION ALL - SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL - ) - SELECT COUNT(*) AS preserved_count FROM (SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows) k - { - $total_preserved := $result.preserved_count; - } - - -- Step 4: Probe events to delete (cap+1 to detect leftovers) - for $result in - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, - MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ), - keep_rows AS ( - SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL - UNION ALL - SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL - UNION ALL - SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL - UNION ALL - SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL - ), - keep_rows_dedup AS ( - SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows - ), - events_probe AS ( - SELECT de.stream_ref, de.event_time, de.created_at - FROM day_events de - LEFT JOIN keep_rows_dedup k - ON k.stream_ref = de.stream_ref - AND k.event_time = de.event_time - AND k.created_at = de.created_at - WHERE k.stream_ref IS NULL - LIMIT $delete_cap + 1 - ) - SELECT COUNT(*) AS probe_count FROM events_probe - { - $events_probe_count := $result.probe_count; - } - - -- Step 5: Delete events (capped) - if $events_probe_count > 0 { - -- Delete with cap - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, - MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ), - keep_rows AS ( - SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL - UNION ALL - SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL - UNION ALL - SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL - UNION ALL - SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL - ), - keep_rows_dedup AS ( - SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows - ), - events_to_delete AS ( - SELECT de.stream_ref, de.event_time, de.created_at - FROM day_events de - LEFT JOIN keep_rows_dedup k - ON k.stream_ref = de.stream_ref - AND k.event_time = de.event_time - AND k.created_at = de.created_at - WHERE k.stream_ref IS NULL - LIMIT $delete_cap - ) - DELETE FROM primitive_events - WHERE EXISTS ( - SELECT 1 FROM events_to_delete d - WHERE d.stream_ref = primitive_events.stream_ref - AND d.event_time = primitive_events.event_time - AND d.created_at = primitive_events.created_at - ); - -- Set deleted events count from pre-delete probe - $events_deleted_count := LEAST($delete_cap, $events_probe_count); - } - - -- Step 6: Calculate marker cap and probe - $marker_cap := GREATEST(0, $delete_cap - $events_deleted_count); - for $result in - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - markers_probe AS ( - SELECT pet.stream_ref, pet.event_time - FROM primitive_event_type pet - JOIN windows w - ON pet.stream_ref = w.stream_ref - AND pet.event_time >= w.lo * 86400 - AND pet.event_time < (w.hi + 1) * 86400 - LIMIT $marker_cap + 1 - ) - SELECT COUNT(*) AS probe_count FROM markers_probe - { - $markers_probe_count := $result.probe_count; - } - - -- Step 7: Delete markers (capped) - if $markers_probe_count > 0 { - -- Delete with cap - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - markers_to_delete AS ( - SELECT pet.stream_ref, pet.event_time - FROM primitive_event_type pet - JOIN windows w - ON pet.stream_ref = w.stream_ref - AND pet.event_time >= w.lo * 86400 - AND pet.event_time < (w.hi + 1) * 86400 - LIMIT $marker_cap - ) - DELETE FROM primitive_event_type - WHERE EXISTS ( - SELECT 1 FROM markers_to_delete m - WHERE m.stream_ref = primitive_event_type.stream_ref - AND m.event_time = primitive_event_type.event_time - ); - -- Set deleted markers count from pre-delete probe - $markers_deleted_count := LEAST($marker_cap, $markers_probe_count); - } - - -- Step 8: Upsert markers - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ), - ohlc_markers AS ( - SELECT stream_ref, open_time AS event_time, 1 AS type FROM ohlc_rows WHERE open_time IS NOT NULL - UNION ALL - SELECT stream_ref, close_time AS event_time, 8 AS type FROM ohlc_rows WHERE close_time IS NOT NULL - UNION ALL - SELECT stream_ref, high_time AS event_time, 2 AS type FROM ohlc_rows WHERE high_time IS NOT NULL - UNION ALL - SELECT stream_ref, low_time AS event_time, 4 AS type FROM ohlc_rows WHERE low_time IS NOT NULL - ), - aggregated_markers AS ( - SELECT stream_ref, event_time, SUM(type)::INT AS type - FROM (SELECT DISTINCT stream_ref, event_time, type FROM ohlc_markers) d - GROUP BY stream_ref, event_time - ) - INSERT INTO primitive_event_type (stream_ref, event_time, type) - SELECT stream_ref, event_time, type FROM aggregated_markers - ON CONFLICT (stream_ref, event_time) DO UPDATE SET type = EXCLUDED.type; - - -- Step 9: Calculate totals - $total_deleted := $events_deleted_count + $markers_deleted_count; - $has_more_to_delete := ($events_probe_count > $delete_cap) OR ($markers_probe_count > $marker_cap); - - -- Step 10: Cleanup pending_prune_days only when no leftovers - if NOT $has_more_to_delete { - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ) - DELETE FROM pending_prune_days - WHERE EXISTS ( - SELECT 1 FROM targets t - WHERE t.stream_ref = pending_prune_days.stream_ref - AND t.day_index = pending_prune_days.day_index - ); - } - } - - -- Uncomment for debugging - -- NOTICE('Batch digest completed: Processed '|| $total_processed::TEXT ||' days, Deleted '|| $total_deleted::TEXT ||' rows, Preserved '|| $total_preserved::TEXT ||' rows'); - - RETURN $total_processed, $total_deleted, $total_preserved, $has_more_to_delete; -}; - -/** - * auto_digest: Batch process multiple pending days (optimized version) - * - * Now uses the efficient batch_digest internally for better performance - */ -CREATE OR REPLACE ACTION auto_digest( - $delete_cap INT DEFAULT 10000, - -- Expected records per stream per day, used to calculate optimal batch size - -- Default of 24 represents typical hourly data collection (24 hours per day) - $expected_records_per_stream INT DEFAULT 24, - -- Number of most recent full/partial days to preserve from digestion - -- Example: 2 preserves today and yesterday - $preserve_past_days INT DEFAULT 2 -) PUBLIC RETURNS TABLE( - processed_days INT, - total_deleted_rows INT, - -- has_more_to_delete indicates that there are still pending batches to process - has_more_to_delete BOOL -) { - - -- Validate expected_records_per_stream to prevent divide by zero - if $expected_records_per_stream IS NULL OR $expected_records_per_stream < 1 { - ERROR('expected_records_per_stream must be a positive integer (minimum 1), got: ' || COALESCE($expected_records_per_stream::TEXT, 'NULL')); - } - - -- Calculate batch size dynamically - -- Formula: floor((delete_cap * 3) / (expected_records_per_stream * 2)) - -- Equivalent to: (delete_cap / expected_records_per_stream) * 1.5 (but ensuring we don't use float like types) - $batch_size := GREATEST(1, (($delete_cap * 3) / ($expected_records_per_stream * 2))); - $batch_size_plus_one := $batch_size + 1; - -- Leader authorization check, keep it commented out for now so test passing and until we can inject how leader is - -- if @caller != @leader { - -- ERROR('Only the leader node can execute auto digest operations'); - -- } - - -- Allow preserve_past_days to be zero (process including current day); only validate non-null - if $preserve_past_days IS NULL { - ERROR('preserve_past_days must not be NULL'); - } - - -- Compute current day and cutoff day to preserve most recent days - $current_day INT := (@block_timestamp / 86400)::INT; - $cutoff_day INT := $current_day - $preserve_past_days; - - -- Get candidates using efficient ARRAY_AGG batch collection, excluding the last $preserve_past_days days - $stream_refs INT[]; - $day_indexes INT[]; - -- will help our user determine if they need to call auto_digest again - $has_more BOOL := false; - - -- Get batch_size + 1 items to check if there are more available - -- Use ARRAY_AGG for efficient batch collection in a single query - for $result in - WITH candidates AS ( - SELECT stream_ref, day_index - FROM pending_prune_days - WHERE day_index <= $cutoff_day - ORDER BY day_index ASC, stream_ref ASC - LIMIT $batch_size_plus_one - ), - aggregated AS ( - SELECT - ARRAY_AGG(stream_ref ORDER BY day_index ASC, stream_ref ASC) AS all_stream_refs, - ARRAY_AGG(day_index ORDER BY day_index ASC, stream_ref ASC) AS all_day_indexes, - COUNT(*) AS total_count - FROM candidates - ) - SELECT - CASE WHEN total_count > $batch_size - THEN all_stream_refs[1:$batch_size] - ELSE all_stream_refs - END AS stream_refs, - CASE WHEN total_count > $batch_size - THEN all_day_indexes[1:$batch_size] - ELSE all_day_indexes - END AS day_indexes, - CASE WHEN total_count > $batch_size - THEN true - ELSE false - END AS has_more_flag - FROM aggregated - { - $stream_refs := $result.stream_refs; - $day_indexes := $result.day_indexes; - $has_more := $result.has_more_flag; - } - - -- Handle empty result case - if $stream_refs IS NULL OR COALESCE(array_length($stream_refs), 0) = 0 { - emit_auto_digest_notice(0, 0, $has_more); - RETURN 0, 0, $has_more; - } - - -- Process using optimized batch_digest - $processed := 0; - $total_deleted := 0; - - for $result in batch_digest($stream_refs, $day_indexes, $delete_cap, $preserve_past_days) { - $processed := $result.processed_days; - $total_deleted := $result.total_deleted_rows; - - if $result.has_more_to_delete { - emit_auto_digest_notice($processed, $total_deleted, true); - $has_more := true; - RETURN $processed, $total_deleted, $has_more; - } - } - emit_auto_digest_notice($processed, $total_deleted, $has_more); - RETURN $processed, $total_deleted, $has_more; -}; - --- private helper to emit structured NOTICE logs for parsing --- we use this because there's no way to get this returned from action executions on sdks -CREATE OR REPLACE ACTION emit_auto_digest_notice( - $processed INT, - $deleted INT, - $has_more BOOL -) PRIVATE VIEW { - $has_more_text := 'false'; - if $has_more { - $has_more_text := 'true'; - } - NOTICE('auto_digest:' || '{"processed_days":' || $processed::TEXT || ',"total_deleted_rows":' || $deleted::TEXT || ',"has_more_to_delete":' || $has_more_text || '}'); -}; - -/** - * get_daily_ohlc: Query daily OHLC data - * - * Returns OHLC values for a specific day and stream - */ -CREATE OR REPLACE ACTION get_daily_ohlc( - $stream_ref INT, - $day INT -) PUBLIC VIEW RETURNS TABLE( - open_value NUMERIC(36,18), - high_value NUMERIC(36,18), - low_value NUMERIC(36,18), - close_value NUMERIC(36,18) -) { - $day_start := $day * 86400; - $day_end := $day_start + 86400; - - -- Check if this day has been digested (ensure markers correspond to existing events) - $is_digested BOOL := false; - for $unused in - SELECT 1 - FROM primitive_event_type t - JOIN primitive_events p - ON p.stream_ref = t.stream_ref - AND p.event_time = t.event_time - WHERE t.stream_ref = $stream_ref - AND t.event_time >= $day_start AND t.event_time < $day_end - LIMIT 1 { - $is_digested := true; - } - - -- Declare variables to store the OHLC values - $open_value NUMERIC(36,18); - $high_value NUMERIC(36,18); - $low_value NUMERIC(36,18); - $close_value NUMERIC(36,18); - - if $is_digested { - -- Calculate from digested data using type markers with robust selection - -- to handle potential stale markers and ensure correct OHLC values - for $result in - SELECT - (SELECT p.value FROM primitive_events p - JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time - WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end - AND (t.type % 2) = 1 - ORDER BY p.event_time ASC, p.created_at DESC - LIMIT 1) AS open_value, - - (SELECT p.value FROM primitive_events p - JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time - WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end - AND ((t.type / 2) % 2) = 1 - ORDER BY p.value DESC, p.event_time ASC, p.created_at DESC - LIMIT 1) AS high_value, - - (SELECT p.value FROM primitive_events p - JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time - WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end - AND ((t.type / 4) % 2) = 1 - ORDER BY p.value ASC, p.event_time ASC, p.created_at DESC - LIMIT 1) AS low_value, - - (SELECT p.value FROM primitive_events p - JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time - WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end - AND ((t.type / 8) % 2) = 1 - ORDER BY p.event_time DESC, p.created_at DESC - LIMIT 1) AS close_value - { - $open_value := $result.open_value; - $high_value := $result.high_value; - $low_value := $result.low_value; - $close_value := $result.close_value; - } - } else { - -- Calculate from raw data (single query with window functions) - for $result in - SELECT - MAX(CASE WHEN rn_open = 1 THEN value END) AS open_value, - MAX(CASE WHEN rn_high = 1 THEN value END) AS high_value, - MAX(CASE WHEN rn_low = 1 THEN value END) AS low_value, - MAX(CASE WHEN rn_close = 1 THEN value END) AS close_value - FROM ( - SELECT value, - ROW_NUMBER() OVER (ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, - ROW_NUMBER() OVER (ORDER BY event_time DESC, created_at DESC) AS rn_close - FROM primitive_events - WHERE stream_ref = $stream_ref - AND event_time >= $day_start AND event_time < $day_end - ) r { - $open_value := $result.open_value; - $high_value := $result.high_value; - $low_value := $result.low_value; - $close_value := $result.close_value; - } - } - - -- Return the calculated values - RETURN $open_value, $high_value, $low_value, $close_value; -}; +/* + * DIGEST ACTIONS MIGRATION + * + * Implements optimized digest system with UNNEST batch processing: + * - batch_digest: Direct OHLC processing with bulk operations (replaces digest_daily) + * - auto_digest: Batch process multiple pending days using optimized batch_digest + * - get_daily_ohlc: Query daily OHLC data from raw or digested sources + */ + +-- ============================================================================= +-- CORE DIGEST ACTIONS +-- ============================================================================= + +/** + * batch_digest: Single-statement bulk processing with CTEs for maximum performance + * + * Implements complete bulk operations in a single SQL statement using CTEs. + * Eliminates array ↔︎ SQL round-trips and multiple statement overhead. + * + * Performance benefits: + * - Single SQL statement with all operations + * - Zero array conversions and UNNEST overhead + * - Relation-based joins instead of expensive anti-joins + * - Reuses intermediate results efficiently + * - Planner can optimize the entire pipeline + */ +CREATE OR REPLACE ACTION batch_digest( + $stream_refs INT[], + $day_indexes INT[], + $delete_cap INT DEFAULT 10000, + $preserve_past_days INT DEFAULT 2 +) PUBLIC RETURNS TABLE( + processed_days INT, + total_deleted_rows INT, + total_preserved_rows INT, + has_more_to_delete BOOL +) { + -- Leader authorization check using dedicated helper function + check_leader_authorization(); + + -- Validate input arrays have same length + if COALESCE(array_length($stream_refs), 0) != COALESCE(array_length($day_indexes), 0) { + ERROR('stream_refs and day_indexes arrays must have the same length'); + } + + if COALESCE(array_length($stream_refs), 0) = 0 { + RETURN 0, 0, 0, false; + } + + $total_processed := 0; + $total_deleted := 0; + $total_preserved := 0; + $has_more_to_delete := false; + $events_probe_count := 0; + $events_deleted_count := 0; + $marker_cap := 0; + $markers_probe_count := 0; + $markers_deleted_count := 0; + + -- Kwil-compatible execution with separate statements and EXISTS + -- Check if we have any targets to process + $has_targets := false; + for $result in + WITH targets AS ( + SELECT + ord, + sr AS stream_ref, + di AS day_index, + (di * 86400) AS day_start, + (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) + WITH ORDINALITY AS u(sr, di, ord) + ) + SELECT COUNT(*) AS target_count FROM targets + { + if $result.target_count > 0 { + $has_targets := true; + } + } + + -- Only proceed if we have targets + if $has_targets { + -- Step 2: Derive day events using windowed ranges (fewer scans) + for $result in + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ) + SELECT COUNT(*) AS processed_days FROM ohlc_rows + { + $total_processed := $result.processed_days; + } + + -- Step 3: Build keep-set relation + for $result in + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, + MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ), + keep_rows AS ( + SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL + UNION ALL + SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL + UNION ALL + SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL + UNION ALL + SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL + ) + SELECT COUNT(*) AS preserved_count FROM (SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows) k + { + $total_preserved := $result.preserved_count; + } + + -- Step 4: Probe events to delete (cap+1 to detect leftovers) + for $result in + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, + MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ), + keep_rows AS ( + SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL + UNION ALL + SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL + UNION ALL + SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL + UNION ALL + SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL + ), + keep_rows_dedup AS ( + SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows + ), + events_probe AS ( + SELECT de.stream_ref, de.event_time, de.created_at + FROM day_events de + LEFT JOIN keep_rows_dedup k + ON k.stream_ref = de.stream_ref + AND k.event_time = de.event_time + AND k.created_at = de.created_at + WHERE k.stream_ref IS NULL + LIMIT $delete_cap + 1 + ) + SELECT COUNT(*) AS probe_count FROM events_probe + { + $events_probe_count := $result.probe_count; + } + + -- Step 5: Delete events (capped) + if $events_probe_count > 0 { + -- Delete with cap + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, + MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ), + keep_rows AS ( + SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL + UNION ALL + SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL + UNION ALL + SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL + UNION ALL + SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL + ), + keep_rows_dedup AS ( + SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows + ), + events_to_delete AS ( + SELECT de.stream_ref, de.event_time, de.created_at + FROM day_events de + LEFT JOIN keep_rows_dedup k + ON k.stream_ref = de.stream_ref + AND k.event_time = de.event_time + AND k.created_at = de.created_at + WHERE k.stream_ref IS NULL + LIMIT $delete_cap + ) + DELETE FROM primitive_events + WHERE EXISTS ( + SELECT 1 FROM events_to_delete d + WHERE d.stream_ref = primitive_events.stream_ref + AND d.event_time = primitive_events.event_time + AND d.created_at = primitive_events.created_at + ); + -- Set deleted events count from pre-delete probe + $events_deleted_count := LEAST($delete_cap, $events_probe_count); + } + + -- Step 6: Calculate marker cap and probe + $marker_cap := GREATEST(0, $delete_cap - $events_deleted_count); + for $result in + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + markers_probe AS ( + SELECT pet.stream_ref, pet.event_time + FROM primitive_event_type pet + JOIN windows w + ON pet.stream_ref = w.stream_ref + AND pet.event_time >= w.lo * 86400 + AND pet.event_time < (w.hi + 1) * 86400 + LIMIT $marker_cap + 1 + ) + SELECT COUNT(*) AS probe_count FROM markers_probe + { + $markers_probe_count := $result.probe_count; + } + + -- Step 7: Delete markers (capped) + if $markers_probe_count > 0 { + -- Delete with cap + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + markers_to_delete AS ( + SELECT pet.stream_ref, pet.event_time + FROM primitive_event_type pet + JOIN windows w + ON pet.stream_ref = w.stream_ref + AND pet.event_time >= w.lo * 86400 + AND pet.event_time < (w.hi + 1) * 86400 + LIMIT $marker_cap + ) + DELETE FROM primitive_event_type + WHERE EXISTS ( + SELECT 1 FROM markers_to_delete m + WHERE m.stream_ref = primitive_event_type.stream_ref + AND m.event_time = primitive_event_type.event_time + ); + -- Set deleted markers count from pre-delete probe + $markers_deleted_count := LEAST($marker_cap, $markers_probe_count); + } + + -- Step 8: Upsert markers + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ), + ohlc_markers AS ( + SELECT stream_ref, open_time AS event_time, 1 AS type FROM ohlc_rows WHERE open_time IS NOT NULL + UNION ALL + SELECT stream_ref, close_time AS event_time, 8 AS type FROM ohlc_rows WHERE close_time IS NOT NULL + UNION ALL + SELECT stream_ref, high_time AS event_time, 2 AS type FROM ohlc_rows WHERE high_time IS NOT NULL + UNION ALL + SELECT stream_ref, low_time AS event_time, 4 AS type FROM ohlc_rows WHERE low_time IS NOT NULL + ), + aggregated_markers AS ( + SELECT stream_ref, event_time, SUM(type)::INT AS type + FROM (SELECT DISTINCT stream_ref, event_time, type FROM ohlc_markers) d + GROUP BY stream_ref, event_time + ) + INSERT INTO primitive_event_type (stream_ref, event_time, type) + SELECT stream_ref, event_time, type FROM aggregated_markers + ON CONFLICT (stream_ref, event_time) DO UPDATE SET type = EXCLUDED.type; + + -- Step 9: Calculate totals + $total_deleted := $events_deleted_count + $markers_deleted_count; + $has_more_to_delete := ($events_probe_count > $delete_cap) OR ($markers_probe_count > $marker_cap); + + -- Step 10: Cleanup pending_prune_days only when no leftovers + if NOT $has_more_to_delete { + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ) + DELETE FROM pending_prune_days + WHERE EXISTS ( + SELECT 1 FROM targets t + WHERE t.stream_ref = pending_prune_days.stream_ref + AND t.day_index = pending_prune_days.day_index + ); + } + } + + -- Uncomment for debugging + -- NOTICE('Batch digest completed: Processed '|| $total_processed::TEXT ||' days, Deleted '|| $total_deleted::TEXT ||' rows, Preserved '|| $total_preserved::TEXT ||' rows'); + + RETURN $total_processed, $total_deleted, $total_preserved, $has_more_to_delete; +}; + +/** + * auto_digest: Batch process multiple pending days (optimized version) + * + * Now uses the efficient batch_digest internally for better performance + */ +CREATE OR REPLACE ACTION auto_digest( + $delete_cap INT DEFAULT 10000, + -- Expected records per stream per day, used to calculate optimal batch size + -- Default of 24 represents typical hourly data collection (24 hours per day) + $expected_records_per_stream INT DEFAULT 24, + -- Number of most recent full/partial days to preserve from digestion + -- Example: 2 preserves today and yesterday + $preserve_past_days INT DEFAULT 2 +) PUBLIC RETURNS TABLE( + processed_days INT, + total_deleted_rows INT, + -- has_more_to_delete indicates that there are still pending batches to process + has_more_to_delete BOOL +) { + + -- Validate expected_records_per_stream to prevent divide by zero + if $expected_records_per_stream IS NULL OR $expected_records_per_stream < 1 { + ERROR('expected_records_per_stream must be a positive integer (minimum 1), got: ' || COALESCE($expected_records_per_stream::TEXT, 'NULL')); + } + + -- Calculate batch size dynamically + -- Formula: floor((delete_cap * 3) / (expected_records_per_stream * 2)) + -- Equivalent to: (delete_cap / expected_records_per_stream) * 1.5 (but ensuring we don't use float like types) + $batch_size := GREATEST(1, (($delete_cap * 3) / ($expected_records_per_stream * 2))); + $batch_size_plus_one := $batch_size + 1; + -- Leader authorization check using dedicated helper function + check_leader_authorization(); + + -- Allow preserve_past_days to be zero (process including current day); only validate non-null + if $preserve_past_days IS NULL { + ERROR('preserve_past_days must not be NULL'); + } + + -- Compute current day and cutoff day to preserve most recent days + $current_day INT := (@block_timestamp / 86400)::INT; + $cutoff_day INT := $current_day - $preserve_past_days; + + -- Get candidates using efficient ARRAY_AGG batch collection, excluding the last $preserve_past_days days + $stream_refs INT[]; + $day_indexes INT[]; + -- will help our user determine if they need to call auto_digest again + $has_more BOOL := false; + + -- Get batch_size + 1 items to check if there are more available + -- Use ARRAY_AGG for efficient batch collection in a single query + for $result in + WITH candidates AS ( + SELECT stream_ref, day_index + FROM pending_prune_days + WHERE day_index <= $cutoff_day + ORDER BY day_index ASC, stream_ref ASC + LIMIT $batch_size_plus_one + ), + aggregated AS ( + SELECT + ARRAY_AGG(stream_ref ORDER BY day_index ASC, stream_ref ASC) AS all_stream_refs, + ARRAY_AGG(day_index ORDER BY day_index ASC, stream_ref ASC) AS all_day_indexes, + COUNT(*) AS total_count + FROM candidates + ) + SELECT + CASE WHEN total_count > $batch_size + THEN all_stream_refs[1:$batch_size] + ELSE all_stream_refs + END AS stream_refs, + CASE WHEN total_count > $batch_size + THEN all_day_indexes[1:$batch_size] + ELSE all_day_indexes + END AS day_indexes, + CASE WHEN total_count > $batch_size + THEN true + ELSE false + END AS has_more_flag + FROM aggregated + { + $stream_refs := $result.stream_refs; + $day_indexes := $result.day_indexes; + $has_more := $result.has_more_flag; + } + + -- Handle empty result case + if $stream_refs IS NULL OR COALESCE(array_length($stream_refs), 0) = 0 { + emit_auto_digest_notice(0, 0, $has_more); + RETURN 0, 0, $has_more; + } + + -- Process using optimized batch_digest + $processed := 0; + $total_deleted := 0; + + for $result in batch_digest($stream_refs, $day_indexes, $delete_cap, $preserve_past_days) { + $processed := $result.processed_days; + $total_deleted := $result.total_deleted_rows; + + if $result.has_more_to_delete { + emit_auto_digest_notice($processed, $total_deleted, true); + $has_more := true; + RETURN $processed, $total_deleted, $has_more; + } + } + emit_auto_digest_notice($processed, $total_deleted, $has_more); + RETURN $processed, $total_deleted, $has_more; +}; + +-- private helper to emit structured NOTICE logs for parsing +-- we use this because there's no way to get this returned from action executions on sdks +CREATE OR REPLACE ACTION emit_auto_digest_notice( + $processed INT, + $deleted INT, + $has_more BOOL +) PRIVATE VIEW { + $has_more_text := 'false'; + if $has_more { + $has_more_text := 'true'; + } + NOTICE('auto_digest:' || '{"processed_days":' || $processed::TEXT || ',"total_deleted_rows":' || $deleted::TEXT || ',"has_more_to_delete":' || $has_more_text || '}'); +}; + +-- private helper for leader authorization with detailed diagnostics +CREATE OR REPLACE ACTION check_leader_authorization() PRIVATE { + -- RETURN; + -- @signer and @leader are already BYTEA, so we can compare them directly + -- For debugging, encode to hex for readable output + IF @leader_sender != @signer { + $leader_sender_hex := encode(@leader_sender, 'hex'); + $signer_hex := encode(@signer, 'hex'); + ERROR('Only the current block leader can execute this operation: leader_sender: ' || $leader_sender_hex::TEXT || ' signer: ' || $signer_hex::TEXT); + } +}; + +-- Helper function to derive Ethereum address from secp256k1 public key +-- This implements the Ethereum address derivation algorithm using available PostgreSQL functions +-- NOTE: This is an approximation since we can't decompress the public key without elliptic curve ops +-- Removed derive_ethereum_address_from_pubkey: inaccurate without EC decompression + +/** + * get_daily_ohlc: Query daily OHLC data + * + * Returns OHLC values for a specific day and stream + */ +CREATE OR REPLACE ACTION get_daily_ohlc( + $stream_ref INT, + $day INT +) PUBLIC VIEW RETURNS TABLE( + open_value NUMERIC(36,18), + high_value NUMERIC(36,18), + low_value NUMERIC(36,18), + close_value NUMERIC(36,18) +) { + $day_start := $day * 86400; + $day_end := $day_start + 86400; + + -- Check if this day has been digested (ensure markers correspond to existing events) + $is_digested BOOL := false; + for $unused in + SELECT 1 + FROM primitive_event_type t + JOIN primitive_events p + ON p.stream_ref = t.stream_ref + AND p.event_time = t.event_time + WHERE t.stream_ref = $stream_ref + AND t.event_time >= $day_start AND t.event_time < $day_end + LIMIT 1 { + $is_digested := true; + } + + -- Declare variables to store the OHLC values + $open_value NUMERIC(36,18); + $high_value NUMERIC(36,18); + $low_value NUMERIC(36,18); + $close_value NUMERIC(36,18); + + if $is_digested { + -- Calculate from digested data using type markers with robust selection + -- to handle potential stale markers and ensure correct OHLC values + for $result in + SELECT + (SELECT p.value FROM primitive_events p + JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time + WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end + AND (t.type % 2) = 1 + ORDER BY p.event_time ASC, p.created_at DESC + LIMIT 1) AS open_value, + + (SELECT p.value FROM primitive_events p + JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time + WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end + AND ((t.type / 2) % 2) = 1 + ORDER BY p.value DESC, p.event_time ASC, p.created_at DESC + LIMIT 1) AS high_value, + + (SELECT p.value FROM primitive_events p + JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time + WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end + AND ((t.type / 4) % 2) = 1 + ORDER BY p.value ASC, p.event_time ASC, p.created_at DESC + LIMIT 1) AS low_value, + + (SELECT p.value FROM primitive_events p + JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time + WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end + AND ((t.type / 8) % 2) = 1 + ORDER BY p.event_time DESC, p.created_at DESC + LIMIT 1) AS close_value + { + $open_value := $result.open_value; + $high_value := $result.high_value; + $low_value := $result.low_value; + $close_value := $result.close_value; + } + } else { + -- Calculate from raw data (single query with window functions) + for $result in + SELECT + MAX(CASE WHEN rn_open = 1 THEN value END) AS open_value, + MAX(CASE WHEN rn_high = 1 THEN value END) AS high_value, + MAX(CASE WHEN rn_low = 1 THEN value END) AS low_value, + MAX(CASE WHEN rn_close = 1 THEN value END) AS close_value + FROM ( + SELECT value, + ROW_NUMBER() OVER (ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, + ROW_NUMBER() OVER (ORDER BY event_time DESC, created_at DESC) AS rn_close + FROM primitive_events + WHERE stream_ref = $stream_ref + AND event_time >= $day_start AND event_time < $day_end + ) r { + $open_value := $result.open_value; + $high_value := $result.high_value; + $low_value := $result.low_value; + $close_value := $result.close_value; + } + } + + -- Return the calculated values + RETURN $open_value, $high_value, $low_value, $close_value; +}; diff --git a/scripts/ci-cleanup.sh b/scripts/ci-cleanup.sh new file mode 100644 index 000000000..e891e9029 --- /dev/null +++ b/scripts/ci-cleanup.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +set -euo pipefail + +echo "[ci-cleanup] Starting cleanup of Kwil DB resources..." + +# Stop and remove docker compose stack(s) +if [ -f "compose.yaml" ]; then + echo "[ci-cleanup] docker compose down for single stack" + docker compose -f compose.yaml down -v --remove-orphans || true +fi + +# Common container names/images +names=("tn-db" "kwil-postgres" "kwild" "postgres") +images=("kwildb/postgres" "tn-db:local" "kwildb/postgres:latest" "kwildb/postgres:16.8-1") + +echo "[ci-cleanup] Stopping/removing lingering containers by name..." +for n in "${names[@]}"; do + ids=$(docker ps -aq --filter "name=${n}") || true + if [ -n "${ids}" ]; then + echo "[ci-cleanup] Removing containers matching name ${n}: ${ids}" + docker rm -f ${ids} || true + fi +done + +echo "[ci-cleanup] Stopping/removing lingering containers by image..." +for img in "${images[@]}"; do + ids=$(docker ps -aq --filter "ancestor=${img}") || true + if [ -n "${ids}" ]; then + echo "[ci-cleanup] Removing containers for image ${img}: ${ids}" + docker rm -f ${ids} || true + fi +done + +# Kill leftover processes that might bind ports (8484, 5432) +ports=(8484 5432) +for p in "${ports[@]}"; do + if command -v lsof >/dev/null 2>&1; then + pids=$(lsof -t -i :${p}) || true + if [ -n "${pids}" ]; then + echo "[ci-cleanup] Killing processes on port ${p}: ${pids}" + kill -9 ${pids} || true + fi + fi +done + +echo "[ci-cleanup] Cleanup complete." + +# Best-effort: kill kwild process if it was started directly (not via docker) +if command -v pkill >/dev/null 2>&1; then + pkill -9 -f "\bkwild\b" 2>/dev/null || true +fi + + diff --git a/tests/streams/digest/digest_actions_test.go b/tests/streams/digest/digest_actions_test.go index 2c38c3eb6..5edd36216 100644 --- a/tests/streams/digest/digest_actions_test.go +++ b/tests/streams/digest/digest_actions_test.go @@ -11,6 +11,9 @@ import ( "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + extauth "github.com/trufnetwork/kwil-db/extensions/auth" kwilTesting "github.com/trufnetwork/kwil-db/testing" @@ -61,6 +64,75 @@ func TestDigestActions(t *testing.T) { }, testutils.GetTestOptionsWithCache().Options) } +// Verifies leader-only authorization on digest actions using BlockContext.Proposer. +func TestDigestActionsLeaderAuthorization(t *testing.T) { + kwilTesting.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "digest_actions_leader_authorization", + SeedScripts: migrations.GetSeedScriptPaths(), + FunctionTests: []kwilTesting.TestFunc{ + WithSignerAndProvider(func(ctx context.Context, platform *kwilTesting.Platform) error { + // Create a secp256k1 leader key + _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + return errors.Wrap(err, "generate secp256k1 key") + } + pub, ok := pubGeneric.(*crypto.Secp256k1PublicKey) + if !ok { + return errors.New("unexpected pubkey type") + } + + // Helper to call action with explicit BlockContext.Proposer and signer/auth + callWithCtx := func(action string, args []any, signer []byte, authenticator string) (*common.CallResult, error) { + caller := "" + if ident, e := extauth.GetIdentifier(authenticator, signer); e == nil { + caller = ident + } + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{Height: 1, Proposer: pub}, + Signer: signer, + Caller: caller, + TxID: platform.Txid(), + Authenticator: authenticator, + } + eng := &common.EngineContext{TxContext: tx} + return platform.Engine.Call(eng, platform.DB, "", action, args, func(*common.Row) error { return nil }) + } + + // Non-leader: signer != leader_sender → expect leader-only error + if r, err := callWithCtx("batch_digest", []any{[]int{}, []int{}}, platform.Deployer, coreauth.EthPersonalSignAuth); err != nil { + return errors.Wrap(err, "batch_digest non-leader call error") + } else if r == nil || r.Error == nil || !strings.Contains(r.Error.Error(), "Only the current block leader") { + return errors.New("expected leader-only error for batch_digest when not leader") + } + + if r, err := callWithCtx("auto_digest", []any{10, 24, 2}, platform.Deployer, coreauth.EthPersonalSignAuth); err != nil { + return errors.Wrap(err, "auto_digest non-leader call error") + } else if r == nil || r.Error == nil || !strings.Contains(r.Error.Error(), "Only the current block leader") { + return errors.New("expected leader-only error for auto_digest when not leader") + } + + // Leader: signer equals derived leader_sender → expect success + signerGood := crypto.EthereumAddressFromPubKey(pub) + + if r, err := callWithCtx("batch_digest", []any{[]int{}, []int{}}, signerGood, coreauth.EthPersonalSignAuth); err != nil { + return errors.Wrap(err, "batch_digest leader call error") + } else if r != nil && r.Error != nil { + return errors.Wrap(r.Error, "batch_digest leader call failed") + } + + if r, err := callWithCtx("auto_digest", []any{10, 24, 2}, signerGood, coreauth.EthPersonalSignAuth); err != nil { + return errors.Wrap(err, "auto_digest leader call error") + } else if r != nil && r.Error != nil { + return errors.Wrap(r.Error, "auto_digest leader call failed") + } + + return nil + }), + }, + }, testutils.GetTestOptionsWithCache().Options) +} + // WithDigestTestSetup sets up test environment with digest-specific data func WithDigestTestSetup(testFn func(ctx context.Context, platform *kwilTesting.Platform) error) func(ctx context.Context, platform *kwilTesting.Platform) error { const md = ` diff --git a/tests/streams/query/metadata_test.go b/tests/streams/query/metadata_test.go index 22819cadb..44d869452 100644 --- a/tests/streams/query/metadata_test.go +++ b/tests/streams/query/metadata_test.go @@ -194,26 +194,26 @@ func testListMetadataByHeight(t *testing.T, contractInfo setup.StreamInfo) kwilT return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + result, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Height: 1, + Platform: platform, + Key: metadataKey, + Height: 1, }) if err != nil { return errors.Wrapf(err, "error listing metadata") } expected := ` - | stream_ref | value_i | value_f | value_b | value_s | value_ref | created_at | - |---------------|-----------|---------------------|-----------------|--------|------------|----------------|------------|------------|------------| - | 1 | 0 | | | | | 1 | - | 1 | 1 | | | | | 1 |` + | value_i | value_f | value_b | value_s | value_ref | created_at | + |---------|---------|---------|---------|-----------|------------| + | 0 | | | | | 1 | + | 1 | | | | | 1 |` table.AssertResultRowsEqualMarkdownTable(t, table.AssertResultRowsEqualMarkdownTableInput{ - Actual: result, - Expected: expected, - ExcludedColumns: []int{1}, + Actual: result, + Expected: expected, + ExcludedColumns: []int{0, 1, 2}, }) return nil @@ -245,17 +245,17 @@ func testListMetadataByHeightNoKey(t *testing.T, contractInfo setup.StreamInfo) return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + result, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Height: 1, + Platform: platform, + Height: 1, }) if err != nil { return errors.Wrapf(err, "error listing metadata") } - if (len(result) > 0) { // should return no rows - return errors.Wrapf(err, "expected empty results") + if len(result) > 0 { // should return no rows + return errors.New("expected empty results") } return nil @@ -288,15 +288,15 @@ func testListMetadataByHeightPagination(t *testing.T, contractInfo setup.StreamI return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + limit := 2 offset := 0 result, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Limit: &limit, - Offset: &offset, - Height: 1, + Platform: platform, + Key: metadataKey, + Limit: &limit, + Offset: &offset, + Height: 1, }) if err != nil { return errors.Wrapf(err, "error listing metadata") @@ -308,11 +308,11 @@ func testListMetadataByHeightPagination(t *testing.T, contractInfo setup.StreamI offset = 2 result, err = procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Limit: &limit, - Offset: &offset, - Height: 1, + Platform: platform, + Key: metadataKey, + Limit: &limit, + Offset: &offset, + Height: 1, }) if err != nil { return errors.Wrapf(err, "error listing metadata") @@ -352,15 +352,15 @@ func testListMetadataByHeightInvalidRange(t *testing.T, contractInfo setup.Strea return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + fromHeight := int64(10) - toHeight := int64(5) + toHeight := int64(5) _, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, + Platform: platform, + Key: metadataKey, FromHeight: &fromHeight, - ToHeight: &toHeight, - Height: 1, + ToHeight: &toHeight, + Height: 1, }) if err == nil { @@ -402,14 +402,14 @@ func testListMetadataByHeightInvalidPagination(t *testing.T, contractInfo setup. return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + // negative limit limit := -10 result, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Limit: &limit, - Height: 1, + Platform: platform, + Key: metadataKey, + Limit: &limit, + Height: 1, }) if err != nil { return errors.Wrap(err, "unexpected error with negative limit") @@ -420,11 +420,11 @@ func testListMetadataByHeightInvalidPagination(t *testing.T, contractInfo setup. } negativeOffset := -5 - result, err = procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Offset: &negativeOffset, - Height: 1, + _, err = procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ + Platform: platform, + Key: metadataKey, + Offset: &negativeOffset, + Height: 1, }) if err != nil { return errors.Wrap(err, "unexpected error with negative offset")