From 2fa0b06ee159aa3e9d2d23e2dcacbbfa1817ffef Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Fri, 5 Sep 2025 11:04:46 -0300 Subject: [PATCH 1/9] feat: enhance auto_digest functionality with improved transaction handling and logging This commit introduces several key improvements to the `auto_digest` process, including: - Enhanced transaction broadcasting with error handling for transaction results. - Added structured logging for better monitoring of processed days and deleted rows. - Introduced a new `DigestTxResult` type to encapsulate results from the auto_digest transactions. - Implemented a new method for parsing digest results from transaction logs, improving clarity and reliability. These changes aim to optimize the digest operation's performance and provide better insights into its execution outcomes. --- extensions/tn_digest/broadcast.go | 15 +- .../tn_digest/engine_ops_integration_test.go | 4 +- extensions/tn_digest/internal/engine_ops.go | 191 +++++++++++++++++- .../tn_digest/internal/engine_ops_test.go | 45 +++++ extensions/tn_digest/scheduler/constants.go | 13 ++ extensions/tn_digest/scheduler/scheduler.go | 93 ++++++++- 6 files changed, 349 insertions(+), 12 deletions(-) create mode 100644 extensions/tn_digest/internal/engine_ops_test.go create mode 100644 extensions/tn_digest/scheduler/constants.go diff --git a/extensions/tn_digest/broadcast.go b/extensions/tn_digest/broadcast.go index 41d2ba086..413dc029a 100644 --- a/extensions/tn_digest/broadcast.go +++ b/extensions/tn_digest/broadcast.go @@ -50,7 +50,7 @@ func normalizeListenAddressForClient(listen string) (*url.URL, error) { func makeBroadcasterFromURL(u *url.URL) TxBroadcaster { userClient := rpcuser.NewClient(u) return txBroadcasterFunc(func(ctx context.Context, tx *types.Transaction, sync uint8) (types.Hash, *types.TxResult, error) { - // Map sync flag to a broadcast mode; default to WaitAccept (mempool accept). + // Map sync flag to broadcast mode (callers should pass 1 for WaitCommit) mode := rpcclient.BroadcastWaitAccept if sync == uint8(rpcclient.BroadcastWaitCommit) || sync == 1 { mode = rpcclient.BroadcastWaitCommit @@ -59,6 +59,17 @@ func makeBroadcasterFromURL(u *url.URL) TxBroadcaster { if err != nil { return types.Hash{}, nil, err } - return h, nil, nil + + // Query the transaction result to get the log output for parsing + txQueryResp, err := userClient.TxQuery(ctx, h) + if err != nil { + return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", err) + } + + if txQueryResp.Result == nil { + return types.Hash{}, nil, fmt.Errorf("transaction result is nil") + } + + return h, txQueryResp.Result, nil }) } diff --git a/extensions/tn_digest/engine_ops_integration_test.go b/extensions/tn_digest/engine_ops_integration_test.go index c9c50d3e5..2897a23d8 100644 --- a/extensions/tn_digest/engine_ops_integration_test.go +++ b/extensions/tn_digest/engine_ops_integration_test.go @@ -36,7 +36,7 @@ func TestBuildAndBroadcastAutoDigestTx_VerifiesTxBuildSignAndDBEffect(t *testing // Create accounts service accts, err := accounts.InitializeAccountStore(ctx, platform.DB, log.New()) require.NoError(t, err) - + // Prepare EngineOperations ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New()) @@ -67,7 +67,7 @@ func TestBuildAndBroadcastAutoDigestTx_VerifiesTxBuildSignAndDBEffect(t *testing _, execErr := platform.Engine.Call(engCtx, platform.DB, "main", "auto_digest", []any{}, func(_ *common.Row) error { return nil }) require.NoError(t, execErr) - return types.Hash{}, &types.TxResult{Code: uint32(types.CodeOk)}, nil + return types.Hash{}, &types.TxResult{Code: uint32(types.CodeOk), Log: "auto_digest:{\"processed_days\":1,\"total_deleted_rows\":0,\"has_more_to_delete\":false}"}, nil } // Build and broadcast via ops diff --git a/extensions/tn_digest/internal/engine_ops.go b/extensions/tn_digest/internal/engine_ops.go index 4faccab17..6177a8cfb 100644 --- a/extensions/tn_digest/internal/engine_ops.go +++ b/extensions/tn_digest/internal/engine_ops.go @@ -3,6 +3,7 @@ package internal import ( "context" "fmt" + "strconv" "strings" "github.com/trufnetwork/kwil-db/common" @@ -12,6 +13,13 @@ import ( sql "github.com/trufnetwork/kwil-db/node/types/sql" ) +// DigestTxResult represents the parsed result from an auto_digest transaction +type DigestTxResult struct { + ProcessedDays int + TotalDeletedRows int + HasMoreToDelete bool +} + // EngineOperations wraps engine calls needed by the digest extension. type EngineOperations struct { engine common.Engine @@ -98,8 +106,189 @@ func (e *EngineOperations) BuildAndBroadcastAutoDigestTx(ctx context.Context, ch if err := tx.Sign(signer); err != nil { return fmt.Errorf("sign tx: %w", err) } - if _, _, err := broadcaster(ctx, tx, 0); err != nil { + hash, txResult, err := broadcaster(ctx, tx, 1) + if err != nil { return fmt.Errorf("broadcast tx: %w", err) } + + // Check transaction result code before parsing logs + if txResult.Code != uint32(ktypes.CodeOk) { + return fmt.Errorf("transaction failed with code %d (expected %d): %s", + txResult.Code, uint32(ktypes.CodeOk), txResult.Log) + } + + // Parse the digest result from the transaction log + result, err := parseDigestResultFromTxLog(txResult.Log) + if err != nil { + return fmt.Errorf("failed to parse digest result: %w", err) + } + + e.logger.Info("auto_digest completed", + "processed_days", result.ProcessedDays, + "deleted_rows", result.TotalDeletedRows, + "has_more", result.HasMoreToDelete, + "tx_hash", hash.String()) + return nil } + +// BroadcastAutoDigestWithArgsAndParse builds an ActionExecution tx for auto_digest with args and broadcasts it, +// then parses the result from the transaction log +func (e *EngineOperations) BroadcastAutoDigestWithArgsAndParse( + ctx context.Context, + chainID string, + signer auth.Signer, + broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), + deleteCap, expectedRecords, preserveDays int, +) (*DigestTxResult, error) { + // Get the signer account ID + signerAccountID, err := ktypes.GetSignerAccount(signer) + if err != nil { + return nil, fmt.Errorf("failed to get signer account: %w", err) + } + + // Get account information using the accounts service directly on database + account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + var nextNonce uint64 + if err != nil { + // Account doesn't exist yet - use nonce 1 for first transaction + e.logger.Info("DEBUG: Account not found, using nonce 1 for first transaction", "account", fmt.Sprintf("%x", signerAccountID.Identifier), "error", err) + nextNonce = uint64(1) + } else { + // Account exists - use next nonce + nextNonce = uint64(account.Nonce + 1) + e.logger.Info("DEBUG: Account found, using next nonce", "account", fmt.Sprintf("%x", signerAccountID.Identifier), "currentNonce", account.Nonce, "nextNonce", nextNonce, "balance", account.Balance) + } + + // Encode arguments + deleteCapArg, err := ktypes.EncodeValue(int64(deleteCap)) + if err != nil { + return nil, fmt.Errorf("encode deleteCap: %w", err) + } + expectedRecordsArg, err := ktypes.EncodeValue(int64(expectedRecords)) + if err != nil { + return nil, fmt.Errorf("encode expectedRecords: %w", err) + } + preserveDaysArg, err := ktypes.EncodeValue(int64(preserveDays)) + if err != nil { + return nil, fmt.Errorf("encode preserveDays: %w", err) + } + + payload := &ktypes.ActionExecution{ + Namespace: "main", + Action: "auto_digest", + Arguments: [][]*ktypes.EncodedValue{{ + deleteCapArg, expectedRecordsArg, preserveDaysArg, + }}, + } + + // Create transaction with proper nonce + tx, err := ktypes.CreateNodeTransaction(payload, chainID, nextNonce) + if err != nil { + return nil, fmt.Errorf("create tx: %w", err) + } + if err := tx.Sign(signer); err != nil { + return nil, fmt.Errorf("sign tx: %w", err) + } + + hash, txResult, err := broadcaster(ctx, tx, 1) + if err != nil { + return nil, fmt.Errorf("broadcast tx: %w", err) + } + + // Check transaction result code before parsing logs + if txResult.Code != uint32(ktypes.CodeOk) { + return nil, fmt.Errorf("transaction failed with code %d (expected %d): %s", + txResult.Code, uint32(ktypes.CodeOk), txResult.Log) + } + + // Parse the digest result from the transaction log + result, err := parseDigestResultFromTxLog(txResult.Log) + if err != nil { + return nil, fmt.Errorf("failed to parse digest result: %w", err) + } + + e.logger.Info("auto_digest with args completed", + "processed_days", result.ProcessedDays, + "deleted_rows", result.TotalDeletedRows, + "has_more", result.HasMoreToDelete, + "tx_hash", hash.String(), + "delete_cap", deleteCap, + "expected_records", expectedRecords, + "preserve_days", preserveDays) + + return result, nil +} + +// parseDigestResultFromTxLog parses the digest result from transaction log output +// The digest action outputs log entries like: "auto_digest:{...json...}" +func parseDigestResultFromTxLog(logOutput string) (*DigestTxResult, error) { + if logOutput == "" { + return nil, fmt.Errorf("empty log output") + } + + // Split log into lines and look for auto_digest entries + lines := strings.Split(logOutput, "\n") + var digestJSON string + + // Find the last auto_digest line + for _, line := range lines { + if strings.Contains(line, "auto_digest:") { + // Extract JSON part after "auto_digest:" + parts := strings.SplitN(line, "auto_digest:", 2) + if len(parts) == 2 { + digestJSON = strings.TrimSpace(parts[1]) + } + } + } + + if digestJSON == "" { + return nil, fmt.Errorf("no auto_digest log entry found") + } + + // Parse the JSON (simple parsing since we know the expected structure) + result := &DigestTxResult{} + + // Remove surrounding quotes if present + digestJSON = strings.Trim(digestJSON, `"`) + + // Extract processed_days + if start := strings.Index(digestJSON, `"processed_days":`); start != -1 { + start += len(`"processed_days":`) + if end := strings.Index(digestJSON[start:], ","); end != -1 { + if val, err := strconv.Atoi(digestJSON[start : start+end]); err == nil { + result.ProcessedDays = val + } + } else if end := strings.Index(digestJSON[start:], "}"); end != -1 { + if val, err := strconv.Atoi(digestJSON[start : start+end]); err == nil { + result.ProcessedDays = val + } + } + } + + // Extract total_deleted_rows + if start := strings.Index(digestJSON, `"total_deleted_rows":`); start != -1 { + start += len(`"total_deleted_rows":`) + if end := strings.Index(digestJSON[start:], ","); end != -1 { + if val, err := strconv.Atoi(digestJSON[start : start+end]); err == nil { + result.TotalDeletedRows = val + } + } else if end := strings.Index(digestJSON[start:], "}"); end != -1 { + if val, err := strconv.Atoi(digestJSON[start : start+end]); err == nil { + result.TotalDeletedRows = val + } + } + } + + // Extract has_more_to_delete + if start := strings.Index(digestJSON, `"has_more_to_delete":`); start != -1 { + start += len(`"has_more_to_delete":`) + if end := strings.Index(digestJSON[start:], ","); end != -1 { + result.HasMoreToDelete = digestJSON[start:start+end] == "true" + } else if end := strings.Index(digestJSON[start:], "}"); end != -1 { + result.HasMoreToDelete = digestJSON[start:start+end] == "true" + } + } + + return result, nil +} diff --git a/extensions/tn_digest/internal/engine_ops_test.go b/extensions/tn_digest/internal/engine_ops_test.go new file mode 100644 index 000000000..06ed878be --- /dev/null +++ b/extensions/tn_digest/internal/engine_ops_test.go @@ -0,0 +1,45 @@ +package internal + +import "testing" + +func TestParseDigestResultFromTxLog_HasMoreTrue(t *testing.T) { + log := "INFO something\nNOTICE: auto_digest:{\"processed_days\":2,\"total_deleted_rows\":500,\"has_more_to_delete\":true}\nother" + res, err := parseDigestResultFromTxLog(log) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if res.ProcessedDays != 2 { + t.Fatalf("processed_days: want 2, got %d", res.ProcessedDays) + } + if res.TotalDeletedRows != 500 { + t.Fatalf("total_deleted_rows: want 500, got %d", res.TotalDeletedRows) + } + if !res.HasMoreToDelete { + t.Fatalf("has_more_to_delete: want true, got false") + } +} + +func TestParseDigestResultFromTxLog_HasMoreFalse(t *testing.T) { + log := "auto_digest:{\"processed_days\":1,\"total_deleted_rows\":1234,\"has_more_to_delete\":false}" + res, err := parseDigestResultFromTxLog(log) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if res.ProcessedDays != 1 { + t.Fatalf("processed_days: want 1, got %d", res.ProcessedDays) + } + if res.TotalDeletedRows != 1234 { + t.Fatalf("total_deleted_rows: want 1234, got %d", res.TotalDeletedRows) + } + if res.HasMoreToDelete { + t.Fatalf("has_more_to_delete: want false, got true") + } +} + +func TestParseDigestResultFromTxLog_NoEntry(t *testing.T) { + log := "INFO: nothing relevant here\nNOTICE: something else" + _, err := parseDigestResultFromTxLog(log) + if err == nil { + t.Fatalf("expected error for missing auto_digest entry, got nil") + } +} diff --git a/extensions/tn_digest/scheduler/constants.go b/extensions/tn_digest/scheduler/constants.go new file mode 100644 index 000000000..a573b30f0 --- /dev/null +++ b/extensions/tn_digest/scheduler/constants.go @@ -0,0 +1,13 @@ +package scheduler + +import "time" + +const ( + // Digest drain mode constants (scheduler-scoped to avoid import cycles) + DigestDeleteCap = 100_000 + DigestExpectedRecordsPerStream = 24 + DigestPreservePastDays = 2 + DrainRunDelay = 60 * time.Second // 1 minute + DrainMaxRuns = 100 + DrainMaxConsecutiveFailures = 5 +) diff --git a/extensions/tn_digest/scheduler/scheduler.go b/extensions/tn_digest/scheduler/scheduler.go index 2b37c51df..79764c29b 100644 --- a/extensions/tn_digest/scheduler/scheduler.go +++ b/extensions/tn_digest/scheduler/scheduler.go @@ -72,7 +72,7 @@ func (s *DigestScheduler) Start(ctx context.Context, cronExpr string) error { // Clear any existing jobs to avoid duplicates on (re)start s.cron.Clear() - // Use background context for job execution to avoid cancellation issues + // Use scheduler context for job execution to enable cancellation on leadership loss jobFunc := func() { defer func() { if r := recover(); r != nil { @@ -80,8 +80,8 @@ func (s *DigestScheduler) Start(ctx context.Context, cronExpr string) error { } }() - // Create a fresh context for each job execution - jobCtx := context.Background() + // Use the scheduler's context so Stop() cancels the drain loop + jobCtx := s.ctx // Snapshot dependencies under lock to avoid races with setters. s.mu.Lock() @@ -96,11 +96,90 @@ func (s *DigestScheduler) Start(ctx context.Context, cronExpr string) error { return } chainID := kwilService.GenesisConfig.ChainID - if err := engineOps.BuildAndBroadcastAutoDigestTx(jobCtx, chainID, signer, broadcaster.BroadcastTx); err != nil { - s.logger.Warn("auto_digest broadcast failed", "error", err) - return + + // Implement drain mode: run auto_digest repeatedly until has_more=false + s.logger.Info("starting digest drain mode", + "delete_cap", DigestDeleteCap, + "expected_records", DigestExpectedRecordsPerStream, + "preserve_days", DigestPreservePastDays, + "max_runs", DrainMaxRuns) + + runs := 0 + consecutiveFailures := 0 + totalProcessedDays := 0 + totalDeletedRows := 0 + + for runs < DrainMaxRuns { + // Check for cancellation + select { + case <-jobCtx.Done(): + s.logger.Info("digest drain canceled", "runs_completed", runs) + return + default: + } + + runs++ + + result, err := engineOps.BroadcastAutoDigestWithArgsAndParse( + jobCtx, + chainID, + signer, + broadcaster.BroadcastTx, + DigestDeleteCap, + DigestExpectedRecordsPerStream, + DigestPreservePastDays, + ) + + if err != nil { + consecutiveFailures++ + s.logger.Warn("auto_digest broadcast failed", + "run", runs, + "consecutive_failures", consecutiveFailures, + "error", err) + + if consecutiveFailures >= DrainMaxConsecutiveFailures { + s.logger.Error("too many consecutive failures, aborting drain", + "consecutive_failures", consecutiveFailures, + "max_allowed", DrainMaxConsecutiveFailures) + return + } + } else { + consecutiveFailures = 0 + // Update cumulative totals + totalProcessedDays += result.ProcessedDays + totalDeletedRows += result.TotalDeletedRows + + s.logger.Info("digest run completed", + "run", runs, + "processed_days", result.ProcessedDays, + "deleted_rows", result.TotalDeletedRows, + "has_more", result.HasMoreToDelete, + "cumulative_processed", totalProcessedDays, + "cumulative_deleted", totalDeletedRows) + + // Check if we're done + if !result.HasMoreToDelete { + s.logger.Info("digest drain completed successfully", + "total_runs", runs, + "total_processed_days", totalProcessedDays, + "total_deleted_rows", totalDeletedRows) + return + } + } + + // Sleep between runs, but allow cancellation + select { + case <-jobCtx.Done(): + s.logger.Info("digest drain canceled during sleep", "runs_completed", runs) + return + case <-time.After(DrainRunDelay): + // Continue to next run + } } - s.logger.Info("auto_digest tx broadcasted") + + s.logger.Info("digest drain reached max runs", + "max_runs", DrainMaxRuns, + "runs_completed", runs) } if j, err := s.cron.Cron(cronExpr).Do(jobFunc); err != nil { From 50f47d9aea33b6f6f1fcfd77ea77f7d8aa778ad1 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Fri, 5 Sep 2025 11:05:06 -0300 Subject: [PATCH 2/9] refactor: enhance digest actions with leader authorization and improved processing This commit introduces significant updates to the digest actions, focusing on leader-only authorization and optimizations in the `batch_digest` and `auto_digest` processes. Key changes include: - Added leader authorization checks to ensure only the current block leader can execute digest operations, enhancing security and control. - Refined the `batch_digest` action to improve performance and clarity, utilizing optimized SQL queries for bulk processing. - Enhanced the `auto_digest` action to leverage the improved `batch_digest`, streamlining the processing of multiple pending days. - Introduced comprehensive tests to validate leader authorization functionality, ensuring robust error handling and expected behavior. These changes aim to improve the reliability and efficiency of the digest system while maintaining strict access controls for critical operations. --- internal/migrations/020-digest-actions.sql | 1543 ++++++++++--------- tests/streams/digest/digest_actions_test.go | 72 + 2 files changed, 850 insertions(+), 765 deletions(-) diff --git a/internal/migrations/020-digest-actions.sql b/internal/migrations/020-digest-actions.sql index 36f535d38..0d9d06750 100644 --- a/internal/migrations/020-digest-actions.sql +++ b/internal/migrations/020-digest-actions.sql @@ -1,765 +1,778 @@ -/* - * DIGEST ACTIONS MIGRATION - * - * Implements optimized digest system with UNNEST batch processing: - * - batch_digest: Direct OHLC processing with bulk operations (replaces digest_daily) - * - auto_digest: Batch process multiple pending days using optimized batch_digest - * - get_daily_ohlc: Query daily OHLC data from raw or digested sources - */ - --- ============================================================================= --- CORE DIGEST ACTIONS --- ============================================================================= - -/** - * batch_digest: Single-statement bulk processing with CTEs for maximum performance - * - * Implements complete bulk operations in a single SQL statement using CTEs. - * Eliminates array ↔︎ SQL round-trips and multiple statement overhead. - * - * Performance benefits: - * - Single SQL statement with all operations - * - Zero array conversions and UNNEST overhead - * - Relation-based joins instead of expensive anti-joins - * - Reuses intermediate results efficiently - * - Planner can optimize the entire pipeline - */ -CREATE OR REPLACE ACTION batch_digest( - $stream_refs INT[], - $day_indexes INT[], - $delete_cap INT DEFAULT 10000, - $preserve_past_days INT DEFAULT 2 -) PUBLIC RETURNS TABLE( - processed_days INT, - total_deleted_rows INT, - total_preserved_rows INT, - has_more_to_delete BOOL -) { - -- Leader authorization check, keep it commented out for now so test passing and until we can inject how leader is - -- if @caller != @leader { - -- ERROR('Only the leader node can execute batch digest operations'); - -- } - - -- Validate input arrays have same length - if COALESCE(array_length($stream_refs), 0) != COALESCE(array_length($day_indexes), 0) { - ERROR('stream_refs and day_indexes arrays must have the same length'); - } - - if COALESCE(array_length($stream_refs), 0) = 0 { - RETURN 0, 0, 0, false; - } - - $total_processed := 0; - $total_deleted := 0; - $total_preserved := 0; - $has_more_to_delete := false; - $events_probe_count := 0; - $events_deleted_count := 0; - $marker_cap := 0; - $markers_probe_count := 0; - $markers_deleted_count := 0; - - -- Kwil-compatible execution with separate statements and EXISTS - -- Check if we have any targets to process - $has_targets := false; - for $result in - WITH targets AS ( - SELECT - ord, - sr AS stream_ref, - di AS day_index, - (di * 86400) AS day_start, - (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) - WITH ORDINALITY AS u(sr, di, ord) - ) - SELECT COUNT(*) AS target_count FROM targets - { - if $result.target_count > 0 { - $has_targets := true; - } - } - - -- Only proceed if we have targets - if $has_targets { - -- Step 2: Derive day events using windowed ranges (fewer scans) - for $result in - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ) - SELECT COUNT(*) AS processed_days FROM ohlc_rows - { - $total_processed := $result.processed_days; - } - - -- Step 3: Build keep-set relation - for $result in - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, - MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ), - keep_rows AS ( - SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL - UNION ALL - SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL - UNION ALL - SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL - UNION ALL - SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL - ) - SELECT COUNT(*) AS preserved_count FROM (SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows) k - { - $total_preserved := $result.preserved_count; - } - - -- Step 4: Probe events to delete (cap+1 to detect leftovers) - for $result in - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, - MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ), - keep_rows AS ( - SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL - UNION ALL - SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL - UNION ALL - SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL - UNION ALL - SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL - ), - keep_rows_dedup AS ( - SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows - ), - events_probe AS ( - SELECT de.stream_ref, de.event_time, de.created_at - FROM day_events de - LEFT JOIN keep_rows_dedup k - ON k.stream_ref = de.stream_ref - AND k.event_time = de.event_time - AND k.created_at = de.created_at - WHERE k.stream_ref IS NULL - LIMIT $delete_cap + 1 - ) - SELECT COUNT(*) AS probe_count FROM events_probe - { - $events_probe_count := $result.probe_count; - } - - -- Step 5: Delete events (capped) - if $events_probe_count > 0 { - -- Delete with cap - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, - MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ), - keep_rows AS ( - SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL - UNION ALL - SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL - UNION ALL - SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL - UNION ALL - SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL - ), - keep_rows_dedup AS ( - SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows - ), - events_to_delete AS ( - SELECT de.stream_ref, de.event_time, de.created_at - FROM day_events de - LEFT JOIN keep_rows_dedup k - ON k.stream_ref = de.stream_ref - AND k.event_time = de.event_time - AND k.created_at = de.created_at - WHERE k.stream_ref IS NULL - LIMIT $delete_cap - ) - DELETE FROM primitive_events - WHERE EXISTS ( - SELECT 1 FROM events_to_delete d - WHERE d.stream_ref = primitive_events.stream_ref - AND d.event_time = primitive_events.event_time - AND d.created_at = primitive_events.created_at - ); - -- Set deleted events count from pre-delete probe - $events_deleted_count := LEAST($delete_cap, $events_probe_count); - } - - -- Step 6: Calculate marker cap and probe - $marker_cap := GREATEST(0, $delete_cap - $events_deleted_count); - for $result in - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - markers_probe AS ( - SELECT pet.stream_ref, pet.event_time - FROM primitive_event_type pet - JOIN windows w - ON pet.stream_ref = w.stream_ref - AND pet.event_time >= w.lo * 86400 - AND pet.event_time < (w.hi + 1) * 86400 - LIMIT $marker_cap + 1 - ) - SELECT COUNT(*) AS probe_count FROM markers_probe - { - $markers_probe_count := $result.probe_count; - } - - -- Step 7: Delete markers (capped) - if $markers_probe_count > 0 { - -- Delete with cap - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - markers_to_delete AS ( - SELECT pet.stream_ref, pet.event_time - FROM primitive_event_type pet - JOIN windows w - ON pet.stream_ref = w.stream_ref - AND pet.event_time >= w.lo * 86400 - AND pet.event_time < (w.hi + 1) * 86400 - LIMIT $marker_cap - ) - DELETE FROM primitive_event_type - WHERE EXISTS ( - SELECT 1 FROM markers_to_delete m - WHERE m.stream_ref = primitive_event_type.stream_ref - AND m.event_time = primitive_event_type.event_time - ); - -- Set deleted markers count from pre-delete probe - $markers_deleted_count := LEAST($marker_cap, $markers_probe_count); - } - - -- Step 8: Upsert markers - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ), - windows AS ( - SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi - FROM ( - SELECT stream_ref, day_index, - day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp - FROM targets - ) s - GROUP BY stream_ref, grp - ), - day_events AS ( - SELECT pe.stream_ref, - (pe.event_time / 86400)::INT AS day_index, - pe.event_time, pe.created_at, pe.value - FROM primitive_events pe - JOIN windows w - ON pe.stream_ref = w.stream_ref - AND pe.event_time >= w.lo * 86400 - AND pe.event_time < (w.hi + 1) * 86400 - ), - ranked AS ( - SELECT de.*, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY event_time DESC, created_at DESC) AS rn_close, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index - ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low - FROM day_events de - ), - ohlc_rows AS ( - SELECT stream_ref, day_index, - MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, - MAX(CASE WHEN rn_open = 1 THEN created_at END) AS open_created_at, - MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, - MAX(CASE WHEN rn_close = 1 THEN created_at END) AS close_created_at, - MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, - MAX(CASE WHEN rn_high = 1 THEN created_at END) AS high_created_at, - MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, - MAX(CASE WHEN rn_low = 1 THEN created_at END) AS low_created_at - FROM ranked - GROUP BY stream_ref, day_index - HAVING COUNT(*) >= 1 - ), - ohlc_markers AS ( - SELECT stream_ref, open_time AS event_time, 1 AS type FROM ohlc_rows WHERE open_time IS NOT NULL - UNION ALL - SELECT stream_ref, close_time AS event_time, 8 AS type FROM ohlc_rows WHERE close_time IS NOT NULL - UNION ALL - SELECT stream_ref, high_time AS event_time, 2 AS type FROM ohlc_rows WHERE high_time IS NOT NULL - UNION ALL - SELECT stream_ref, low_time AS event_time, 4 AS type FROM ohlc_rows WHERE low_time IS NOT NULL - ), - aggregated_markers AS ( - SELECT stream_ref, event_time, SUM(type)::INT AS type - FROM (SELECT DISTINCT stream_ref, event_time, type FROM ohlc_markers) d - GROUP BY stream_ref, event_time - ) - INSERT INTO primitive_event_type (stream_ref, event_time, type) - SELECT stream_ref, event_time, type FROM aggregated_markers - ON CONFLICT (stream_ref, event_time) DO UPDATE SET type = EXCLUDED.type; - - -- Step 9: Calculate totals - $total_deleted := $events_deleted_count + $markers_deleted_count; - $has_more_to_delete := ($events_probe_count > $delete_cap) OR ($markers_probe_count > $marker_cap); - - -- Step 10: Cleanup pending_prune_days only when no leftovers - if NOT $has_more_to_delete { - WITH targets AS ( - SELECT ord, sr AS stream_ref, di AS day_index, - (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end - FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) - ) - DELETE FROM pending_prune_days - WHERE EXISTS ( - SELECT 1 FROM targets t - WHERE t.stream_ref = pending_prune_days.stream_ref - AND t.day_index = pending_prune_days.day_index - ); - } - } - - -- Uncomment for debugging - -- NOTICE('Batch digest completed: Processed '|| $total_processed::TEXT ||' days, Deleted '|| $total_deleted::TEXT ||' rows, Preserved '|| $total_preserved::TEXT ||' rows'); - - RETURN $total_processed, $total_deleted, $total_preserved, $has_more_to_delete; -}; - -/** - * auto_digest: Batch process multiple pending days (optimized version) - * - * Now uses the efficient batch_digest internally for better performance - */ -CREATE OR REPLACE ACTION auto_digest( - $delete_cap INT DEFAULT 10000, - -- Expected records per stream per day, used to calculate optimal batch size - -- Default of 24 represents typical hourly data collection (24 hours per day) - $expected_records_per_stream INT DEFAULT 24, - -- Number of most recent full/partial days to preserve from digestion - -- Example: 2 preserves today and yesterday - $preserve_past_days INT DEFAULT 2 -) PUBLIC RETURNS TABLE( - processed_days INT, - total_deleted_rows INT, - -- has_more_to_delete indicates that there are still pending batches to process - has_more_to_delete BOOL -) { - - -- Validate expected_records_per_stream to prevent divide by zero - if $expected_records_per_stream IS NULL OR $expected_records_per_stream < 1 { - ERROR('expected_records_per_stream must be a positive integer (minimum 1), got: ' || COALESCE($expected_records_per_stream::TEXT, 'NULL')); - } - - -- Calculate batch size dynamically - -- Formula: floor((delete_cap * 3) / (expected_records_per_stream * 2)) - -- Equivalent to: (delete_cap / expected_records_per_stream) * 1.5 (but ensuring we don't use float like types) - $batch_size := GREATEST(1, (($delete_cap * 3) / ($expected_records_per_stream * 2))); - $batch_size_plus_one := $batch_size + 1; - -- Leader authorization check, keep it commented out for now so test passing and until we can inject how leader is - -- if @caller != @leader { - -- ERROR('Only the leader node can execute auto digest operations'); - -- } - - -- Allow preserve_past_days to be zero (process including current day); only validate non-null - if $preserve_past_days IS NULL { - ERROR('preserve_past_days must not be NULL'); - } - - -- Compute current day and cutoff day to preserve most recent days - $current_day INT := (@block_timestamp / 86400)::INT; - $cutoff_day INT := $current_day - $preserve_past_days; - - -- Get candidates using efficient ARRAY_AGG batch collection, excluding the last $preserve_past_days days - $stream_refs INT[]; - $day_indexes INT[]; - -- will help our user determine if they need to call auto_digest again - $has_more BOOL := false; - - -- Get batch_size + 1 items to check if there are more available - -- Use ARRAY_AGG for efficient batch collection in a single query - for $result in - WITH candidates AS ( - SELECT stream_ref, day_index - FROM pending_prune_days - WHERE day_index <= $cutoff_day - ORDER BY day_index ASC, stream_ref ASC - LIMIT $batch_size_plus_one - ), - aggregated AS ( - SELECT - ARRAY_AGG(stream_ref ORDER BY day_index ASC, stream_ref ASC) AS all_stream_refs, - ARRAY_AGG(day_index ORDER BY day_index ASC, stream_ref ASC) AS all_day_indexes, - COUNT(*) AS total_count - FROM candidates - ) - SELECT - CASE WHEN total_count > $batch_size - THEN all_stream_refs[1:$batch_size] - ELSE all_stream_refs - END AS stream_refs, - CASE WHEN total_count > $batch_size - THEN all_day_indexes[1:$batch_size] - ELSE all_day_indexes - END AS day_indexes, - CASE WHEN total_count > $batch_size - THEN true - ELSE false - END AS has_more_flag - FROM aggregated - { - $stream_refs := $result.stream_refs; - $day_indexes := $result.day_indexes; - $has_more := $result.has_more_flag; - } - - -- Handle empty result case - if $stream_refs IS NULL OR COALESCE(array_length($stream_refs), 0) = 0 { - emit_auto_digest_notice(0, 0, $has_more); - RETURN 0, 0, $has_more; - } - - -- Process using optimized batch_digest - $processed := 0; - $total_deleted := 0; - - for $result in batch_digest($stream_refs, $day_indexes, $delete_cap, $preserve_past_days) { - $processed := $result.processed_days; - $total_deleted := $result.total_deleted_rows; - - if $result.has_more_to_delete { - emit_auto_digest_notice($processed, $total_deleted, true); - $has_more := true; - RETURN $processed, $total_deleted, $has_more; - } - } - emit_auto_digest_notice($processed, $total_deleted, $has_more); - RETURN $processed, $total_deleted, $has_more; -}; - --- private helper to emit structured NOTICE logs for parsing --- we use this because there's no way to get this returned from action executions on sdks -CREATE OR REPLACE ACTION emit_auto_digest_notice( - $processed INT, - $deleted INT, - $has_more BOOL -) PRIVATE VIEW { - $has_more_text := 'false'; - if $has_more { - $has_more_text := 'true'; - } - NOTICE('auto_digest:' || '{"processed_days":' || $processed::TEXT || ',"total_deleted_rows":' || $deleted::TEXT || ',"has_more_to_delete":' || $has_more_text || '}'); -}; - -/** - * get_daily_ohlc: Query daily OHLC data - * - * Returns OHLC values for a specific day and stream - */ -CREATE OR REPLACE ACTION get_daily_ohlc( - $stream_ref INT, - $day INT -) PUBLIC VIEW RETURNS TABLE( - open_value NUMERIC(36,18), - high_value NUMERIC(36,18), - low_value NUMERIC(36,18), - close_value NUMERIC(36,18) -) { - $day_start := $day * 86400; - $day_end := $day_start + 86400; - - -- Check if this day has been digested (ensure markers correspond to existing events) - $is_digested BOOL := false; - for $unused in - SELECT 1 - FROM primitive_event_type t - JOIN primitive_events p - ON p.stream_ref = t.stream_ref - AND p.event_time = t.event_time - WHERE t.stream_ref = $stream_ref - AND t.event_time >= $day_start AND t.event_time < $day_end - LIMIT 1 { - $is_digested := true; - } - - -- Declare variables to store the OHLC values - $open_value NUMERIC(36,18); - $high_value NUMERIC(36,18); - $low_value NUMERIC(36,18); - $close_value NUMERIC(36,18); - - if $is_digested { - -- Calculate from digested data using type markers with robust selection - -- to handle potential stale markers and ensure correct OHLC values - for $result in - SELECT - (SELECT p.value FROM primitive_events p - JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time - WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end - AND (t.type % 2) = 1 - ORDER BY p.event_time ASC, p.created_at DESC - LIMIT 1) AS open_value, - - (SELECT p.value FROM primitive_events p - JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time - WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end - AND ((t.type / 2) % 2) = 1 - ORDER BY p.value DESC, p.event_time ASC, p.created_at DESC - LIMIT 1) AS high_value, - - (SELECT p.value FROM primitive_events p - JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time - WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end - AND ((t.type / 4) % 2) = 1 - ORDER BY p.value ASC, p.event_time ASC, p.created_at DESC - LIMIT 1) AS low_value, - - (SELECT p.value FROM primitive_events p - JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time - WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end - AND ((t.type / 8) % 2) = 1 - ORDER BY p.event_time DESC, p.created_at DESC - LIMIT 1) AS close_value - { - $open_value := $result.open_value; - $high_value := $result.high_value; - $low_value := $result.low_value; - $close_value := $result.close_value; - } - } else { - -- Calculate from raw data (single query with window functions) - for $result in - SELECT - MAX(CASE WHEN rn_open = 1 THEN value END) AS open_value, - MAX(CASE WHEN rn_high = 1 THEN value END) AS high_value, - MAX(CASE WHEN rn_low = 1 THEN value END) AS low_value, - MAX(CASE WHEN rn_close = 1 THEN value END) AS close_value - FROM ( - SELECT value, - ROW_NUMBER() OVER (ORDER BY event_time ASC, created_at DESC) AS rn_open, - ROW_NUMBER() OVER (ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, - ROW_NUMBER() OVER (ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, - ROW_NUMBER() OVER (ORDER BY event_time DESC, created_at DESC) AS rn_close - FROM primitive_events - WHERE stream_ref = $stream_ref - AND event_time >= $day_start AND event_time < $day_end - ) r { - $open_value := $result.open_value; - $high_value := $result.high_value; - $low_value := $result.low_value; - $close_value := $result.close_value; - } - } - - -- Return the calculated values - RETURN $open_value, $high_value, $low_value, $close_value; -}; +/* + * DIGEST ACTIONS MIGRATION + * + * Implements optimized digest system with UNNEST batch processing: + * - batch_digest: Direct OHLC processing with bulk operations (replaces digest_daily) + * - auto_digest: Batch process multiple pending days using optimized batch_digest + * - get_daily_ohlc: Query daily OHLC data from raw or digested sources + */ + +-- ============================================================================= +-- CORE DIGEST ACTIONS +-- ============================================================================= + +/** + * batch_digest: Single-statement bulk processing with CTEs for maximum performance + * + * Implements complete bulk operations in a single SQL statement using CTEs. + * Eliminates array ↔︎ SQL round-trips and multiple statement overhead. + * + * Performance benefits: + * - Single SQL statement with all operations + * - Zero array conversions and UNNEST overhead + * - Relation-based joins instead of expensive anti-joins + * - Reuses intermediate results efficiently + * - Planner can optimize the entire pipeline + */ +CREATE OR REPLACE ACTION batch_digest( + $stream_refs INT[], + $day_indexes INT[], + $delete_cap INT DEFAULT 10000, + $preserve_past_days INT DEFAULT 2 +) PUBLIC RETURNS TABLE( + processed_days INT, + total_deleted_rows INT, + total_preserved_rows INT, + has_more_to_delete BOOL +) { + -- Leader authorization check using dedicated helper function + check_leader_authorization(); + + -- Validate input arrays have same length + if COALESCE(array_length($stream_refs), 0) != COALESCE(array_length($day_indexes), 0) { + ERROR('stream_refs and day_indexes arrays must have the same length'); + } + + if COALESCE(array_length($stream_refs), 0) = 0 { + RETURN 0, 0, 0, false; + } + + $total_processed := 0; + $total_deleted := 0; + $total_preserved := 0; + $has_more_to_delete := false; + $events_probe_count := 0; + $events_deleted_count := 0; + $marker_cap := 0; + $markers_probe_count := 0; + $markers_deleted_count := 0; + + -- Kwil-compatible execution with separate statements and EXISTS + -- Check if we have any targets to process + $has_targets := false; + for $result in + WITH targets AS ( + SELECT + ord, + sr AS stream_ref, + di AS day_index, + (di * 86400) AS day_start, + (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) + WITH ORDINALITY AS u(sr, di, ord) + ) + SELECT COUNT(*) AS target_count FROM targets + { + if $result.target_count > 0 { + $has_targets := true; + } + } + + -- Only proceed if we have targets + if $has_targets { + -- Step 2: Derive day events using windowed ranges (fewer scans) + for $result in + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ) + SELECT COUNT(*) AS processed_days FROM ohlc_rows + { + $total_processed := $result.processed_days; + } + + -- Step 3: Build keep-set relation + for $result in + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, + MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ), + keep_rows AS ( + SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL + UNION ALL + SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL + UNION ALL + SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL + UNION ALL + SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL + ) + SELECT COUNT(*) AS preserved_count FROM (SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows) k + { + $total_preserved := $result.preserved_count; + } + + -- Step 4: Probe events to delete (cap+1 to detect leftovers) + for $result in + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, + MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ), + keep_rows AS ( + SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL + UNION ALL + SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL + UNION ALL + SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL + UNION ALL + SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL + ), + keep_rows_dedup AS ( + SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows + ), + events_probe AS ( + SELECT de.stream_ref, de.event_time, de.created_at + FROM day_events de + LEFT JOIN keep_rows_dedup k + ON k.stream_ref = de.stream_ref + AND k.event_time = de.event_time + AND k.created_at = de.created_at + WHERE k.stream_ref IS NULL + LIMIT $delete_cap + 1 + ) + SELECT COUNT(*) AS probe_count FROM events_probe + { + $events_probe_count := $result.probe_count; + } + + -- Step 5: Delete events (capped) + if $events_probe_count > 0 { + -- Delete with cap + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, + MAX(created_at) OVER (PARTITION BY stream_ref, day_index, event_time) AS created_at_max + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at_max END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at_max END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at_max END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at_max END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ), + keep_rows AS ( + SELECT stream_ref, open_time AS event_time, open_created_at AS created_at FROM ohlc_rows WHERE open_time IS NOT NULL + UNION ALL + SELECT stream_ref, close_time AS event_time, close_created_at AS created_at FROM ohlc_rows WHERE close_time IS NOT NULL + UNION ALL + SELECT stream_ref, high_time AS event_time, high_created_at AS created_at FROM ohlc_rows WHERE high_time IS NOT NULL + UNION ALL + SELECT stream_ref, low_time AS event_time, low_created_at AS created_at FROM ohlc_rows WHERE low_time IS NOT NULL + ), + keep_rows_dedup AS ( + SELECT DISTINCT stream_ref, event_time, created_at FROM keep_rows + ), + events_to_delete AS ( + SELECT de.stream_ref, de.event_time, de.created_at + FROM day_events de + LEFT JOIN keep_rows_dedup k + ON k.stream_ref = de.stream_ref + AND k.event_time = de.event_time + AND k.created_at = de.created_at + WHERE k.stream_ref IS NULL + LIMIT $delete_cap + ) + DELETE FROM primitive_events + WHERE EXISTS ( + SELECT 1 FROM events_to_delete d + WHERE d.stream_ref = primitive_events.stream_ref + AND d.event_time = primitive_events.event_time + AND d.created_at = primitive_events.created_at + ); + -- Set deleted events count from pre-delete probe + $events_deleted_count := LEAST($delete_cap, $events_probe_count); + } + + -- Step 6: Calculate marker cap and probe + $marker_cap := GREATEST(0, $delete_cap - $events_deleted_count); + for $result in + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + markers_probe AS ( + SELECT pet.stream_ref, pet.event_time + FROM primitive_event_type pet + JOIN windows w + ON pet.stream_ref = w.stream_ref + AND pet.event_time >= w.lo * 86400 + AND pet.event_time < (w.hi + 1) * 86400 + LIMIT $marker_cap + 1 + ) + SELECT COUNT(*) AS probe_count FROM markers_probe + { + $markers_probe_count := $result.probe_count; + } + + -- Step 7: Delete markers (capped) + if $markers_probe_count > 0 { + -- Delete with cap + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + markers_to_delete AS ( + SELECT pet.stream_ref, pet.event_time + FROM primitive_event_type pet + JOIN windows w + ON pet.stream_ref = w.stream_ref + AND pet.event_time >= w.lo * 86400 + AND pet.event_time < (w.hi + 1) * 86400 + LIMIT $marker_cap + ) + DELETE FROM primitive_event_type + WHERE EXISTS ( + SELECT 1 FROM markers_to_delete m + WHERE m.stream_ref = primitive_event_type.stream_ref + AND m.event_time = primitive_event_type.event_time + ); + -- Set deleted markers count from pre-delete probe + $markers_deleted_count := LEAST($marker_cap, $markers_probe_count); + } + + -- Step 8: Upsert markers + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ), + windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp + ), + day_events AS ( + SELECT pe.stream_ref, + (pe.event_time / 86400)::INT AS day_index, + pe.event_time, pe.created_at, pe.value + FROM primitive_events pe + JOIN windows w + ON pe.stream_ref = w.stream_ref + AND pe.event_time >= w.lo * 86400 + AND pe.event_time < (w.hi + 1) * 86400 + ), + ranked AS ( + SELECT de.*, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY event_time DESC, created_at DESC) AS rn_close, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (PARTITION BY stream_ref, day_index + ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low + FROM day_events de + ), + ohlc_rows AS ( + SELECT stream_ref, day_index, + MAX(CASE WHEN rn_open = 1 THEN event_time END) AS open_time, + MAX(CASE WHEN rn_open = 1 THEN created_at END) AS open_created_at, + MAX(CASE WHEN rn_close = 1 THEN event_time END) AS close_time, + MAX(CASE WHEN rn_close = 1 THEN created_at END) AS close_created_at, + MAX(CASE WHEN rn_high = 1 THEN event_time END) AS high_time, + MAX(CASE WHEN rn_high = 1 THEN created_at END) AS high_created_at, + MAX(CASE WHEN rn_low = 1 THEN event_time END) AS low_time, + MAX(CASE WHEN rn_low = 1 THEN created_at END) AS low_created_at + FROM ranked + GROUP BY stream_ref, day_index + HAVING COUNT(*) >= 1 + ), + ohlc_markers AS ( + SELECT stream_ref, open_time AS event_time, 1 AS type FROM ohlc_rows WHERE open_time IS NOT NULL + UNION ALL + SELECT stream_ref, close_time AS event_time, 8 AS type FROM ohlc_rows WHERE close_time IS NOT NULL + UNION ALL + SELECT stream_ref, high_time AS event_time, 2 AS type FROM ohlc_rows WHERE high_time IS NOT NULL + UNION ALL + SELECT stream_ref, low_time AS event_time, 4 AS type FROM ohlc_rows WHERE low_time IS NOT NULL + ), + aggregated_markers AS ( + SELECT stream_ref, event_time, SUM(type)::INT AS type + FROM (SELECT DISTINCT stream_ref, event_time, type FROM ohlc_markers) d + GROUP BY stream_ref, event_time + ) + INSERT INTO primitive_event_type (stream_ref, event_time, type) + SELECT stream_ref, event_time, type FROM aggregated_markers + ON CONFLICT (stream_ref, event_time) DO UPDATE SET type = EXCLUDED.type; + + -- Step 9: Calculate totals + $total_deleted := $events_deleted_count + $markers_deleted_count; + $has_more_to_delete := ($events_probe_count > $delete_cap) OR ($markers_probe_count > $marker_cap); + + -- Step 10: Cleanup pending_prune_days only when no leftovers + if NOT $has_more_to_delete { + WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) + ) + DELETE FROM pending_prune_days + WHERE EXISTS ( + SELECT 1 FROM targets t + WHERE t.stream_ref = pending_prune_days.stream_ref + AND t.day_index = pending_prune_days.day_index + ); + } + } + + -- Uncomment for debugging + -- NOTICE('Batch digest completed: Processed '|| $total_processed::TEXT ||' days, Deleted '|| $total_deleted::TEXT ||' rows, Preserved '|| $total_preserved::TEXT ||' rows'); + + RETURN $total_processed, $total_deleted, $total_preserved, $has_more_to_delete; +}; + +/** + * auto_digest: Batch process multiple pending days (optimized version) + * + * Now uses the efficient batch_digest internally for better performance + */ +CREATE OR REPLACE ACTION auto_digest( + $delete_cap INT DEFAULT 10000, + -- Expected records per stream per day, used to calculate optimal batch size + -- Default of 24 represents typical hourly data collection (24 hours per day) + $expected_records_per_stream INT DEFAULT 24, + -- Number of most recent full/partial days to preserve from digestion + -- Example: 2 preserves today and yesterday + $preserve_past_days INT DEFAULT 2 +) PUBLIC RETURNS TABLE( + processed_days INT, + total_deleted_rows INT, + -- has_more_to_delete indicates that there are still pending batches to process + has_more_to_delete BOOL +) { + + -- Validate expected_records_per_stream to prevent divide by zero + if $expected_records_per_stream IS NULL OR $expected_records_per_stream < 1 { + ERROR('expected_records_per_stream must be a positive integer (minimum 1), got: ' || COALESCE($expected_records_per_stream::TEXT, 'NULL')); + } + + -- Calculate batch size dynamically + -- Formula: floor((delete_cap * 3) / (expected_records_per_stream * 2)) + -- Equivalent to: (delete_cap / expected_records_per_stream) * 1.5 (but ensuring we don't use float like types) + $batch_size := GREATEST(1, (($delete_cap * 3) / ($expected_records_per_stream * 2))); + $batch_size_plus_one := $batch_size + 1; + -- Leader authorization check using dedicated helper function + check_leader_authorization(); + + -- Allow preserve_past_days to be zero (process including current day); only validate non-null + if $preserve_past_days IS NULL { + ERROR('preserve_past_days must not be NULL'); + } + + -- Compute current day and cutoff day to preserve most recent days + $current_day INT := (@block_timestamp / 86400)::INT; + $cutoff_day INT := $current_day - $preserve_past_days; + + -- Get candidates using efficient ARRAY_AGG batch collection, excluding the last $preserve_past_days days + $stream_refs INT[]; + $day_indexes INT[]; + -- will help our user determine if they need to call auto_digest again + $has_more BOOL := false; + + -- Get batch_size + 1 items to check if there are more available + -- Use ARRAY_AGG for efficient batch collection in a single query + for $result in + WITH candidates AS ( + SELECT stream_ref, day_index + FROM pending_prune_days + WHERE day_index <= $cutoff_day + ORDER BY day_index ASC, stream_ref ASC + LIMIT $batch_size_plus_one + ), + aggregated AS ( + SELECT + ARRAY_AGG(stream_ref ORDER BY day_index ASC, stream_ref ASC) AS all_stream_refs, + ARRAY_AGG(day_index ORDER BY day_index ASC, stream_ref ASC) AS all_day_indexes, + COUNT(*) AS total_count + FROM candidates + ) + SELECT + CASE WHEN total_count > $batch_size + THEN all_stream_refs[1:$batch_size] + ELSE all_stream_refs + END AS stream_refs, + CASE WHEN total_count > $batch_size + THEN all_day_indexes[1:$batch_size] + ELSE all_day_indexes + END AS day_indexes, + CASE WHEN total_count > $batch_size + THEN true + ELSE false + END AS has_more_flag + FROM aggregated + { + $stream_refs := $result.stream_refs; + $day_indexes := $result.day_indexes; + $has_more := $result.has_more_flag; + } + + -- Handle empty result case + if $stream_refs IS NULL OR COALESCE(array_length($stream_refs), 0) = 0 { + emit_auto_digest_notice(0, 0, $has_more); + RETURN 0, 0, $has_more; + } + + -- Process using optimized batch_digest + $processed := 0; + $total_deleted := 0; + + for $result in batch_digest($stream_refs, $day_indexes, $delete_cap, $preserve_past_days) { + $processed := $result.processed_days; + $total_deleted := $result.total_deleted_rows; + + if $result.has_more_to_delete { + emit_auto_digest_notice($processed, $total_deleted, true); + $has_more := true; + RETURN $processed, $total_deleted, $has_more; + } + } + emit_auto_digest_notice($processed, $total_deleted, $has_more); + RETURN $processed, $total_deleted, $has_more; +}; + +-- private helper to emit structured NOTICE logs for parsing +-- we use this because there's no way to get this returned from action executions on sdks +CREATE OR REPLACE ACTION emit_auto_digest_notice( + $processed INT, + $deleted INT, + $has_more BOOL +) PRIVATE VIEW { + $has_more_text := 'false'; + if $has_more { + $has_more_text := 'true'; + } + NOTICE('auto_digest:' || '{"processed_days":' || $processed::TEXT || ',"total_deleted_rows":' || $deleted::TEXT || ',"has_more_to_delete":' || $has_more_text || '}'); +}; + +-- private helper for leader authorization with detailed diagnostics +CREATE OR REPLACE ACTION check_leader_authorization() PRIVATE { + -- RETURN; + -- @signer and @leader are already BYTEA, so we can compare them directly + -- For debugging, encode to hex for readable output + IF @leader_sender != @signer { + $leader_sender_hex := encode(@leader_sender, 'hex'); + $signer_hex := encode(@signer, 'hex'); + ERROR('Only the current block leader can execute this operation: leader_sender: ' || $leader_sender_hex::TEXT || ' signer: ' || $signer_hex::TEXT); + } +}; + +-- Helper function to derive Ethereum address from secp256k1 public key +-- This implements the Ethereum address derivation algorithm using available PostgreSQL functions +-- NOTE: This is an approximation since we can't decompress the public key without elliptic curve ops +-- Removed derive_ethereum_address_from_pubkey: inaccurate without EC decompression + +/** + * get_daily_ohlc: Query daily OHLC data + * + * Returns OHLC values for a specific day and stream + */ +CREATE OR REPLACE ACTION get_daily_ohlc( + $stream_ref INT, + $day INT +) PUBLIC VIEW RETURNS TABLE( + open_value NUMERIC(36,18), + high_value NUMERIC(36,18), + low_value NUMERIC(36,18), + close_value NUMERIC(36,18) +) { + $day_start := $day * 86400; + $day_end := $day_start + 86400; + + -- Check if this day has been digested (ensure markers correspond to existing events) + $is_digested BOOL := false; + for $unused in + SELECT 1 + FROM primitive_event_type t + JOIN primitive_events p + ON p.stream_ref = t.stream_ref + AND p.event_time = t.event_time + WHERE t.stream_ref = $stream_ref + AND t.event_time >= $day_start AND t.event_time < $day_end + LIMIT 1 { + $is_digested := true; + } + + -- Declare variables to store the OHLC values + $open_value NUMERIC(36,18); + $high_value NUMERIC(36,18); + $low_value NUMERIC(36,18); + $close_value NUMERIC(36,18); + + if $is_digested { + -- Calculate from digested data using type markers with robust selection + -- to handle potential stale markers and ensure correct OHLC values + for $result in + SELECT + (SELECT p.value FROM primitive_events p + JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time + WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end + AND (t.type % 2) = 1 + ORDER BY p.event_time ASC, p.created_at DESC + LIMIT 1) AS open_value, + + (SELECT p.value FROM primitive_events p + JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time + WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end + AND ((t.type / 2) % 2) = 1 + ORDER BY p.value DESC, p.event_time ASC, p.created_at DESC + LIMIT 1) AS high_value, + + (SELECT p.value FROM primitive_events p + JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time + WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end + AND ((t.type / 4) % 2) = 1 + ORDER BY p.value ASC, p.event_time ASC, p.created_at DESC + LIMIT 1) AS low_value, + + (SELECT p.value FROM primitive_events p + JOIN primitive_event_type t ON t.stream_ref = p.stream_ref AND t.event_time = p.event_time + WHERE t.stream_ref = $stream_ref AND t.event_time >= $day_start AND t.event_time < $day_end + AND ((t.type / 8) % 2) = 1 + ORDER BY p.event_time DESC, p.created_at DESC + LIMIT 1) AS close_value + { + $open_value := $result.open_value; + $high_value := $result.high_value; + $low_value := $result.low_value; + $close_value := $result.close_value; + } + } else { + -- Calculate from raw data (single query with window functions) + for $result in + SELECT + MAX(CASE WHEN rn_open = 1 THEN value END) AS open_value, + MAX(CASE WHEN rn_high = 1 THEN value END) AS high_value, + MAX(CASE WHEN rn_low = 1 THEN value END) AS low_value, + MAX(CASE WHEN rn_close = 1 THEN value END) AS close_value + FROM ( + SELECT value, + ROW_NUMBER() OVER (ORDER BY event_time ASC, created_at DESC) AS rn_open, + ROW_NUMBER() OVER (ORDER BY value DESC, event_time ASC, created_at DESC) AS rn_high, + ROW_NUMBER() OVER (ORDER BY value ASC, event_time ASC, created_at DESC) AS rn_low, + ROW_NUMBER() OVER (ORDER BY event_time DESC, created_at DESC) AS rn_close + FROM primitive_events + WHERE stream_ref = $stream_ref + AND event_time >= $day_start AND event_time < $day_end + ) r { + $open_value := $result.open_value; + $high_value := $result.high_value; + $low_value := $result.low_value; + $close_value := $result.close_value; + } + } + + -- Return the calculated values + RETURN $open_value, $high_value, $low_value, $close_value; +}; diff --git a/tests/streams/digest/digest_actions_test.go b/tests/streams/digest/digest_actions_test.go index 2c38c3eb6..5edd36216 100644 --- a/tests/streams/digest/digest_actions_test.go +++ b/tests/streams/digest/digest_actions_test.go @@ -11,6 +11,9 @@ import ( "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + extauth "github.com/trufnetwork/kwil-db/extensions/auth" kwilTesting "github.com/trufnetwork/kwil-db/testing" @@ -61,6 +64,75 @@ func TestDigestActions(t *testing.T) { }, testutils.GetTestOptionsWithCache().Options) } +// Verifies leader-only authorization on digest actions using BlockContext.Proposer. +func TestDigestActionsLeaderAuthorization(t *testing.T) { + kwilTesting.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "digest_actions_leader_authorization", + SeedScripts: migrations.GetSeedScriptPaths(), + FunctionTests: []kwilTesting.TestFunc{ + WithSignerAndProvider(func(ctx context.Context, platform *kwilTesting.Platform) error { + // Create a secp256k1 leader key + _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + return errors.Wrap(err, "generate secp256k1 key") + } + pub, ok := pubGeneric.(*crypto.Secp256k1PublicKey) + if !ok { + return errors.New("unexpected pubkey type") + } + + // Helper to call action with explicit BlockContext.Proposer and signer/auth + callWithCtx := func(action string, args []any, signer []byte, authenticator string) (*common.CallResult, error) { + caller := "" + if ident, e := extauth.GetIdentifier(authenticator, signer); e == nil { + caller = ident + } + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{Height: 1, Proposer: pub}, + Signer: signer, + Caller: caller, + TxID: platform.Txid(), + Authenticator: authenticator, + } + eng := &common.EngineContext{TxContext: tx} + return platform.Engine.Call(eng, platform.DB, "", action, args, func(*common.Row) error { return nil }) + } + + // Non-leader: signer != leader_sender → expect leader-only error + if r, err := callWithCtx("batch_digest", []any{[]int{}, []int{}}, platform.Deployer, coreauth.EthPersonalSignAuth); err != nil { + return errors.Wrap(err, "batch_digest non-leader call error") + } else if r == nil || r.Error == nil || !strings.Contains(r.Error.Error(), "Only the current block leader") { + return errors.New("expected leader-only error for batch_digest when not leader") + } + + if r, err := callWithCtx("auto_digest", []any{10, 24, 2}, platform.Deployer, coreauth.EthPersonalSignAuth); err != nil { + return errors.Wrap(err, "auto_digest non-leader call error") + } else if r == nil || r.Error == nil || !strings.Contains(r.Error.Error(), "Only the current block leader") { + return errors.New("expected leader-only error for auto_digest when not leader") + } + + // Leader: signer equals derived leader_sender → expect success + signerGood := crypto.EthereumAddressFromPubKey(pub) + + if r, err := callWithCtx("batch_digest", []any{[]int{}, []int{}}, signerGood, coreauth.EthPersonalSignAuth); err != nil { + return errors.Wrap(err, "batch_digest leader call error") + } else if r != nil && r.Error != nil { + return errors.Wrap(r.Error, "batch_digest leader call failed") + } + + if r, err := callWithCtx("auto_digest", []any{10, 24, 2}, signerGood, coreauth.EthPersonalSignAuth); err != nil { + return errors.Wrap(err, "auto_digest leader call error") + } else if r != nil && r.Error != nil { + return errors.Wrap(r.Error, "auto_digest leader call failed") + } + + return nil + }), + }, + }, testutils.GetTestOptionsWithCache().Options) +} + // WithDigestTestSetup sets up test environment with digest-specific data func WithDigestTestSetup(testFn func(ctx context.Context, platform *kwilTesting.Platform) error) func(ctx context.Context, platform *kwilTesting.Platform) error { const md = ` From 165382b598b0ee06af3b38a15fb90718e591bb5e Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Fri, 5 Sep 2025 14:42:25 -0300 Subject: [PATCH 3/9] chore: update kwil-db and sdk-go dependencies to latest versions This commit updates the `kwil-db`, `kwil-db/core`, and `sdk-go` dependencies in the `go.mod` and `go.sum` files to their latest versions, ensuring compatibility and access to the latest features and fixes. The updated versions are: - `kwil-db` updated to v0.10.3-0.20250904210327-3e5f363152f3 - `kwil-db/core` updated to v0.4.3-0.20250904210327-3e5f363152f3 - `sdk-go` updated to v0.3.2-0.20250630062504-841b40cdb709 These changes aim to maintain the project's dependency health and improve overall stability. --- go.mod | 6 +++--- go.sum | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index f1faf3564..70ff152d0 100644 --- a/go.mod +++ b/go.mod @@ -19,9 +19,9 @@ require ( github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.37.0 - github.com/trufnetwork/kwil-db v0.10.3-0.20250829040222-799cf48f758e - github.com/trufnetwork/kwil-db/core v0.4.3-0.20250829040222-799cf48f758e - github.com/trufnetwork/sdk-go v0.4.3-0.20250821041427-ae33ba5dcd2c + github.com/trufnetwork/kwil-db v0.10.3-0.20250904210327-3e5f363152f3 + github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904210327-3e5f363152f3 + github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa golang.org/x/sync v0.15.0 diff --git a/go.sum b/go.sum index 95380158b..80d2aff3e 100644 --- a/go.sum +++ b/go.sum @@ -1214,8 +1214,44 @@ github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPD github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= github.com/trufnetwork/kwil-db v0.10.3-0.20250829040222-799cf48f758e h1:pfdGR7fw1v7C5A5DQc8CdR+ROV2yKaRpWLuT4W86zx8= github.com/trufnetwork/kwil-db v0.10.3-0.20250829040222-799cf48f758e/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904131329-4447aa11a7d4 h1:3UvujJcFA5ICJxAbyB0mguN3SEZKEMf030crPc8bf1Q= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904131329-4447aa11a7d4/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904132138-3572b373277e h1:cyakjztlSTeSps5KPO/8QR3+1C926RAIIup/gujVjFo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904132138-3572b373277e/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904135904-8e1d9fb56031 h1:2VLjw1VJnoBiSvdtlZuXPSIhULgL0T+tj+YESZ0ZK2s= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904135904-8e1d9fb56031/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904141114-d3152d5af76e h1:xcgRUe5nQ7TGLiZ4DEUKU1iBJAV9SVg5eOQprBsiE1s= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904141114-d3152d5af76e/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904142049-5dd93120cf5b h1:4UGeh19hDIKP4QKQpikeX8/zW6lXrNVO3lPFiIY/Q70= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904142049-5dd93120cf5b/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904144248-83120eb2ec69 h1:XGTMt2p73pJrS+tECXHwyto6NGik2sEIw074of7koMA= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904144248-83120eb2ec69/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904181745-8eb811b1ab3c h1:R9dWNtqY0p0Aeb2pnsekrS/xxXu6PZFz8u9Xm6NaHY0= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904181745-8eb811b1ab3c/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904202635-b08d4fca2009 h1:0X05d42YqgEiS8gvboS4IxCpbl+PIHgQmizlS0h80NE= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904202635-b08d4fca2009/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904210327-3e5f363152f3 h1:m9S1HEA81z8Vl0VAC5Q3RUmk435pEJfIWXy5vE9A/K0= +github.com/trufnetwork/kwil-db v0.10.3-0.20250904210327-3e5f363152f3/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= github.com/trufnetwork/kwil-db/core v0.4.3-0.20250829040222-799cf48f758e h1:k0LKW2SdfBcJE2rpMrgWMR3xJojnS4wJPBOGEHB1Il4= github.com/trufnetwork/kwil-db/core v0.4.3-0.20250829040222-799cf48f758e/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904131329-4447aa11a7d4 h1:TS3ArPF8wZpNPIpHqln7PrUNFVuZzgG4ylJt9x4k9E4= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904131329-4447aa11a7d4/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904132138-3572b373277e h1:9Yf2QMwBq4cjpR5fOXLBrkZxuU562dLp/Zzm2H3HJuo= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904132138-3572b373277e/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904135904-8e1d9fb56031 h1:FKdNMhIJjfOuYzUZufncTPAZFIFq6um6CrjAZlrLJAs= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904135904-8e1d9fb56031/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904141114-d3152d5af76e h1:3Ugf6TTGD1Ly22+Ll4C5u9JSSI2A8IZMKd9cauy+I/c= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904141114-d3152d5af76e/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904142049-5dd93120cf5b h1:4nZI56K04jge1JynQn5jPX2tOVv24FFga7yaDirxHpA= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904142049-5dd93120cf5b/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904144248-83120eb2ec69 h1:/O40EZQL2QTdfooGUMT0XmIEGSBh7p6sX82Gq5E/zio= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904144248-83120eb2ec69/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904181745-8eb811b1ab3c h1:BrIm7Jeqxr36FpIcgOQ1SMoHZaRqXKBe3EI6IUTtbks= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904181745-8eb811b1ab3c/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904202635-b08d4fca2009 h1:H5q7rOn3YdamdENT+aIR9WvoDUuW9l5XUF6soHK6xFs= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904202635-b08d4fca2009/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904210327-3e5f363152f3 h1:9aVsC9UZJIXGOPWBQttXX0wycAD0RopY/zJEza7lj+w= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904210327-3e5f363152f3/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2/go.mod h1:Y0MJpPp9QXU5vC6Gpoilql2NkgmGNcbHm9HYC2v2N8s= github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 h1:d9EqPXIjbq/atzEncK5dM3Z9oStx1BxCGuL/sjefeCw= From 88b857f4369462438dc537002bfc170abe4473e8 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Fri, 5 Sep 2025 14:46:22 -0300 Subject: [PATCH 4/9] refactor: improve transaction handling and JSON parsing in digest operations This commit introduces several enhancements to the digest operations, focusing on transaction handling and JSON parsing. Key changes include: - Implemented short polling for transaction results in `makeBroadcasterFromURL`, improving reliability in `BroadcastWaitAccept` mode. - Enhanced error handling for transaction queries, ensuring robust responses in case of failures. - Refactored JSON parsing in `parseDigestResultFromTxLog` to utilize `json.Unmarshal`, simplifying the extraction of `processed_days`, `total_deleted_rows`, and `has_more_to_delete` values. These changes aim to optimize the digest operation's performance and provide clearer insights into transaction outcomes. --- extensions/tn_digest/broadcast.go | 30 +++++++++++-- extensions/tn_digest/internal/engine_ops.go | 50 ++++++--------------- extensions/tn_digest/scheduler_lifecycle.go | 4 +- 3 files changed, 41 insertions(+), 43 deletions(-) diff --git a/extensions/tn_digest/broadcast.go b/extensions/tn_digest/broadcast.go index 413dc029a..3b790a6c3 100644 --- a/extensions/tn_digest/broadcast.go +++ b/extensions/tn_digest/broadcast.go @@ -6,6 +6,7 @@ import ( "net" "net/url" "strings" + "time" rpcclient "github.com/trufnetwork/kwil-db/core/rpc/client" rpcuser "github.com/trufnetwork/kwil-db/core/rpc/client/user/jsonrpc" @@ -61,12 +62,33 @@ func makeBroadcasterFromURL(u *url.URL) TxBroadcaster { } // Query the transaction result to get the log output for parsing - txQueryResp, err := userClient.TxQuery(ctx, h) - if err != nil { - return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", err) + var txQueryResp *types.TxQueryResponse + var queryErr error + if mode == rpcclient.BroadcastWaitAccept { + // In Accept mode, commit may not be immediate: perform short polling. + for tries := 0; tries < 10; tries++ { + txQueryResp, queryErr = userClient.TxQuery(ctx, h) + if queryErr == nil && txQueryResp != nil && txQueryResp.Result != nil { + break + } + // brief backoff (non-blocking if ctx is canceled) + select { + case <-ctx.Done(): + return types.Hash{}, nil, ctx.Err() + case <-time.After(200 * time.Millisecond): + } + } + if queryErr != nil { + return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", queryErr) + } + } else { + txQueryResp, queryErr = userClient.TxQuery(ctx, h) + if queryErr != nil { + return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", queryErr) + } } - if txQueryResp.Result == nil { + if txQueryResp == nil || txQueryResp.Result == nil { return types.Hash{}, nil, fmt.Errorf("transaction result is nil") } diff --git a/extensions/tn_digest/internal/engine_ops.go b/extensions/tn_digest/internal/engine_ops.go index 6177a8cfb..05351a30d 100644 --- a/extensions/tn_digest/internal/engine_ops.go +++ b/extensions/tn_digest/internal/engine_ops.go @@ -2,8 +2,8 @@ package internal import ( "context" + "encoding/json" "fmt" - "strconv" "strings" "github.com/trufnetwork/kwil-db/common" @@ -246,48 +246,24 @@ func parseDigestResultFromTxLog(logOutput string) (*DigestTxResult, error) { return nil, fmt.Errorf("no auto_digest log entry found") } - // Parse the JSON (simple parsing since we know the expected structure) - result := &DigestTxResult{} - - // Remove surrounding quotes if present + // Remove surrounding quotes if present (in case the JSON is quoted in the log) digestJSON = strings.Trim(digestJSON, `"`) - // Extract processed_days - if start := strings.Index(digestJSON, `"processed_days":`); start != -1 { - start += len(`"processed_days":`) - if end := strings.Index(digestJSON[start:], ","); end != -1 { - if val, err := strconv.Atoi(digestJSON[start : start+end]); err == nil { - result.ProcessedDays = val - } - } else if end := strings.Index(digestJSON[start:], "}"); end != -1 { - if val, err := strconv.Atoi(digestJSON[start : start+end]); err == nil { - result.ProcessedDays = val - } - } + // Parse JSON properly + var jsonResult struct { + ProcessedDays int `json:"processed_days"` + TotalDeletedRows int `json:"total_deleted_rows"` + HasMoreToDelete bool `json:"has_more_to_delete"` } - // Extract total_deleted_rows - if start := strings.Index(digestJSON, `"total_deleted_rows":`); start != -1 { - start += len(`"total_deleted_rows":`) - if end := strings.Index(digestJSON[start:], ","); end != -1 { - if val, err := strconv.Atoi(digestJSON[start : start+end]); err == nil { - result.TotalDeletedRows = val - } - } else if end := strings.Index(digestJSON[start:], "}"); end != -1 { - if val, err := strconv.Atoi(digestJSON[start : start+end]); err == nil { - result.TotalDeletedRows = val - } - } + if err := json.Unmarshal([]byte(digestJSON), &jsonResult); err != nil { + return nil, fmt.Errorf("failed to parse digest JSON: %w", err) } - // Extract has_more_to_delete - if start := strings.Index(digestJSON, `"has_more_to_delete":`); start != -1 { - start += len(`"has_more_to_delete":`) - if end := strings.Index(digestJSON[start:], ","); end != -1 { - result.HasMoreToDelete = digestJSON[start:start+end] == "true" - } else if end := strings.Index(digestJSON[start:], "}"); end != -1 { - result.HasMoreToDelete = digestJSON[start:start+end] == "true" - } + result := &DigestTxResult{ + ProcessedDays: jsonResult.ProcessedDays, + TotalDeletedRows: jsonResult.TotalDeletedRows, + HasMoreToDelete: jsonResult.HasMoreToDelete, } return result, nil diff --git a/extensions/tn_digest/scheduler_lifecycle.go b/extensions/tn_digest/scheduler_lifecycle.go index 58c833356..e59e833a2 100644 --- a/extensions/tn_digest/scheduler_lifecycle.go +++ b/extensions/tn_digest/scheduler_lifecycle.go @@ -40,8 +40,8 @@ func (e *Extension) ensureSchedulerWithService(service *common.Service) bool { return true } -func (e *Extension) startScheduler(ctx context.Context) error { - return e.Scheduler().Start(ctx, e.Schedule()) +func (e *Extension) startScheduler(_ context.Context) error { + return e.Scheduler().Start(context.Background(), e.Schedule()) } func (e *Extension) stopSchedulerIfRunning() { From b59ca2041318eb0edb5dc529ce7a184f1c77527a Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Fri, 5 Sep 2025 14:50:06 -0300 Subject: [PATCH 5/9] chore: update golang.org/x/text dependency in go.mod and go.sum This commit modifies the `go.mod` and `go.sum` files to ensure the `golang.org/x/text` dependency is correctly specified without the indirect comment. This change aims to maintain clarity and consistency in the dependency management of the project. --- go.mod | 2 +- go.sum | 38 -------------------------------------- 2 files changed, 1 insertion(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 70ff152d0..5179530ea 100644 --- a/go.mod +++ b/go.mod @@ -268,7 +268,7 @@ require ( golang.org/x/net v0.40.0 // indirect golang.org/x/sys v0.34.0 // indirect golang.org/x/term v0.32.0 // indirect - golang.org/x/text v0.25.0 // indirect + golang.org/x/text v0.25.0 golang.org/x/time v0.10.0 // indirect golang.org/x/tools v0.30.0 // indirect gonum.org/v1/gonum v0.15.1 // indirect diff --git a/go.sum b/go.sum index 80d2aff3e..970c92534 100644 --- a/go.sum +++ b/go.sum @@ -1212,52 +1212,14 @@ github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZ github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY= github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPDo= github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= -github.com/trufnetwork/kwil-db v0.10.3-0.20250829040222-799cf48f758e h1:pfdGR7fw1v7C5A5DQc8CdR+ROV2yKaRpWLuT4W86zx8= -github.com/trufnetwork/kwil-db v0.10.3-0.20250829040222-799cf48f758e/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904131329-4447aa11a7d4 h1:3UvujJcFA5ICJxAbyB0mguN3SEZKEMf030crPc8bf1Q= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904131329-4447aa11a7d4/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904132138-3572b373277e h1:cyakjztlSTeSps5KPO/8QR3+1C926RAIIup/gujVjFo= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904132138-3572b373277e/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904135904-8e1d9fb56031 h1:2VLjw1VJnoBiSvdtlZuXPSIhULgL0T+tj+YESZ0ZK2s= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904135904-8e1d9fb56031/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904141114-d3152d5af76e h1:xcgRUe5nQ7TGLiZ4DEUKU1iBJAV9SVg5eOQprBsiE1s= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904141114-d3152d5af76e/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904142049-5dd93120cf5b h1:4UGeh19hDIKP4QKQpikeX8/zW6lXrNVO3lPFiIY/Q70= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904142049-5dd93120cf5b/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904144248-83120eb2ec69 h1:XGTMt2p73pJrS+tECXHwyto6NGik2sEIw074of7koMA= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904144248-83120eb2ec69/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904181745-8eb811b1ab3c h1:R9dWNtqY0p0Aeb2pnsekrS/xxXu6PZFz8u9Xm6NaHY0= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904181745-8eb811b1ab3c/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904202635-b08d4fca2009 h1:0X05d42YqgEiS8gvboS4IxCpbl+PIHgQmizlS0h80NE= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904202635-b08d4fca2009/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= github.com/trufnetwork/kwil-db v0.10.3-0.20250904210327-3e5f363152f3 h1:m9S1HEA81z8Vl0VAC5Q3RUmk435pEJfIWXy5vE9A/K0= github.com/trufnetwork/kwil-db v0.10.3-0.20250904210327-3e5f363152f3/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250829040222-799cf48f758e h1:k0LKW2SdfBcJE2rpMrgWMR3xJojnS4wJPBOGEHB1Il4= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250829040222-799cf48f758e/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904131329-4447aa11a7d4 h1:TS3ArPF8wZpNPIpHqln7PrUNFVuZzgG4ylJt9x4k9E4= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904131329-4447aa11a7d4/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904132138-3572b373277e h1:9Yf2QMwBq4cjpR5fOXLBrkZxuU562dLp/Zzm2H3HJuo= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904132138-3572b373277e/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904135904-8e1d9fb56031 h1:FKdNMhIJjfOuYzUZufncTPAZFIFq6um6CrjAZlrLJAs= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904135904-8e1d9fb56031/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904141114-d3152d5af76e h1:3Ugf6TTGD1Ly22+Ll4C5u9JSSI2A8IZMKd9cauy+I/c= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904141114-d3152d5af76e/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904142049-5dd93120cf5b h1:4nZI56K04jge1JynQn5jPX2tOVv24FFga7yaDirxHpA= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904142049-5dd93120cf5b/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904144248-83120eb2ec69 h1:/O40EZQL2QTdfooGUMT0XmIEGSBh7p6sX82Gq5E/zio= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904144248-83120eb2ec69/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904181745-8eb811b1ab3c h1:BrIm7Jeqxr36FpIcgOQ1SMoHZaRqXKBe3EI6IUTtbks= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904181745-8eb811b1ab3c/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904202635-b08d4fca2009 h1:H5q7rOn3YdamdENT+aIR9WvoDUuW9l5XUF6soHK6xFs= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904202635-b08d4fca2009/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904210327-3e5f363152f3 h1:9aVsC9UZJIXGOPWBQttXX0wycAD0RopY/zJEza7lj+w= github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904210327-3e5f363152f3/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2/go.mod h1:Y0MJpPp9QXU5vC6Gpoilql2NkgmGNcbHm9HYC2v2N8s= github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 h1:d9EqPXIjbq/atzEncK5dM3Z9oStx1BxCGuL/sjefeCw= github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709/go.mod h1:lMhUy1bin5eNoDVkeuvG8kNfJuGDmXLpNPicR9vh4eA= -github.com/trufnetwork/sdk-go v0.4.3-0.20250821041427-ae33ba5dcd2c h1:8S/w2IHvHMD613knvfwbV7/AiW7wMH9gNQzxpEqk/9s= -github.com/trufnetwork/sdk-go v0.4.3-0.20250821041427-ae33ba5dcd2c/go.mod h1:Qt82Hye1L7QB09QNSXcuN6+ZgoeBLZFX5ErDqEO5DMs= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.10 h1:p8Fspmz3iTctJstry1PYS3HVdllxnEzTEsgIgtxTrCk= github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= From 6e396800e1ab442d00a7959957f2f877362de2e4 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Fri, 5 Sep 2025 14:52:19 -0300 Subject: [PATCH 6/9] chore: update kwil-db and kwil-db/core dependencies to latest versions This commit updates the `kwil-db` and `kwil-db/core` dependencies in the `go.mod` and `go.sum` files to their latest versions, ensuring compatibility and access to the latest features and fixes. The updated versions are: - `kwil-db` updated to v0.10.3-0.20250905175054-602e824e33c2 - `kwil-db/core` updated to v0.4.3-0.20250905175054-602e824e33c2 These changes aim to maintain the project's dependency health and improve overall stability. --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 5179530ea..19350221b 100644 --- a/go.mod +++ b/go.mod @@ -19,8 +19,8 @@ require ( github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.37.0 - github.com/trufnetwork/kwil-db v0.10.3-0.20250904210327-3e5f363152f3 - github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904210327-3e5f363152f3 + github.com/trufnetwork/kwil-db v0.10.3-0.20250905175054-602e824e33c2 + github.com/trufnetwork/kwil-db/core v0.4.3-0.20250905175054-602e824e33c2 github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa diff --git a/go.sum b/go.sum index 970c92534..666ae848f 100644 --- a/go.sum +++ b/go.sum @@ -1212,10 +1212,10 @@ github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZ github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY= github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPDo= github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904210327-3e5f363152f3 h1:m9S1HEA81z8Vl0VAC5Q3RUmk435pEJfIWXy5vE9A/K0= -github.com/trufnetwork/kwil-db v0.10.3-0.20250904210327-3e5f363152f3/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904210327-3e5f363152f3 h1:9aVsC9UZJIXGOPWBQttXX0wycAD0RopY/zJEza7lj+w= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20250904210327-3e5f363152f3/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db v0.10.3-0.20250905175054-602e824e33c2 h1:sWkNssIA46NoEYD8db4zTHbXJecPvlm8OWbYbyW21Es= +github.com/trufnetwork/kwil-db v0.10.3-0.20250905175054-602e824e33c2/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250905175054-602e824e33c2 h1:ojCdkwDCQ3/i79zaKQa9PO5l/dBERRZnmP0SFhPOv24= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250905175054-602e824e33c2/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2/go.mod h1:Y0MJpPp9QXU5vC6Gpoilql2NkgmGNcbHm9HYC2v2N8s= github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 h1:d9EqPXIjbq/atzEncK5dM3Z9oStx1BxCGuL/sjefeCw= From 3027a6b6eee9ba153b90cdf901bd642110b78c39 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Fri, 5 Sep 2025 15:25:02 -0300 Subject: [PATCH 7/9] refactor: clean up formatting and improve test assertions in metadata tests This commit refactors the `metadata_test.go` file by improving the formatting of the test cases and enhancing the assertions for expected results. Key changes include: - Removed unnecessary whitespace and aligned the input parameters for better readability. - Updated the expected result tables to ensure clarity and accuracy in the test assertions. - Adjusted the excluded columns in the assertions to reflect the correct indices. These changes aim to enhance the maintainability and clarity of the test suite, ensuring more reliable test outcomes. --- tests/streams/query/metadata_test.go | 84 ++++++++++++++-------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/tests/streams/query/metadata_test.go b/tests/streams/query/metadata_test.go index 22819cadb..c07a1b2ec 100644 --- a/tests/streams/query/metadata_test.go +++ b/tests/streams/query/metadata_test.go @@ -194,26 +194,26 @@ func testListMetadataByHeight(t *testing.T, contractInfo setup.StreamInfo) kwilT return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + result, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Height: 1, + Platform: platform, + Key: metadataKey, + Height: 1, }) if err != nil { return errors.Wrapf(err, "error listing metadata") } expected := ` - | stream_ref | value_i | value_f | value_b | value_s | value_ref | created_at | - |---------------|-----------|---------------------|-----------------|--------|------------|----------------|------------|------------|------------| - | 1 | 0 | | | | | 1 | - | 1 | 1 | | | | | 1 |` + | value_i | value_f | value_b | value_s | value_ref | created_at | + |---------|---------|---------|---------|-----------|------------| + | 0 | | | | | 1 | + | 1 | | | | | 1 |` table.AssertResultRowsEqualMarkdownTable(t, table.AssertResultRowsEqualMarkdownTableInput{ - Actual: result, - Expected: expected, - ExcludedColumns: []int{1}, + Actual: result, + Expected: expected, + ExcludedColumns: []int{0, 1, 2}, }) return nil @@ -245,16 +245,16 @@ func testListMetadataByHeightNoKey(t *testing.T, contractInfo setup.StreamInfo) return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + result, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Height: 1, + Platform: platform, + Height: 1, }) if err != nil { return errors.Wrapf(err, "error listing metadata") } - if (len(result) > 0) { // should return no rows + if len(result) > 0 { // should return no rows return errors.Wrapf(err, "expected empty results") } @@ -288,15 +288,15 @@ func testListMetadataByHeightPagination(t *testing.T, contractInfo setup.StreamI return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + limit := 2 offset := 0 result, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Limit: &limit, - Offset: &offset, - Height: 1, + Platform: platform, + Key: metadataKey, + Limit: &limit, + Offset: &offset, + Height: 1, }) if err != nil { return errors.Wrapf(err, "error listing metadata") @@ -308,11 +308,11 @@ func testListMetadataByHeightPagination(t *testing.T, contractInfo setup.StreamI offset = 2 result, err = procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Limit: &limit, - Offset: &offset, - Height: 1, + Platform: platform, + Key: metadataKey, + Limit: &limit, + Offset: &offset, + Height: 1, }) if err != nil { return errors.Wrapf(err, "error listing metadata") @@ -352,15 +352,15 @@ func testListMetadataByHeightInvalidRange(t *testing.T, contractInfo setup.Strea return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + fromHeight := int64(10) - toHeight := int64(5) + toHeight := int64(5) _, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, + Platform: platform, + Key: metadataKey, FromHeight: &fromHeight, - ToHeight: &toHeight, - Height: 1, + ToHeight: &toHeight, + Height: 1, }) if err == nil { @@ -402,14 +402,14 @@ func testListMetadataByHeightInvalidPagination(t *testing.T, contractInfo setup. return errors.Wrapf(err, "error inserting metadata with key %s", item.Key) } } - + // negative limit limit := -10 result, err := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Limit: &limit, - Height: 1, + Platform: platform, + Key: metadataKey, + Limit: &limit, + Height: 1, }) if err != nil { return errors.Wrap(err, "unexpected error with negative limit") @@ -420,11 +420,11 @@ func testListMetadataByHeightInvalidPagination(t *testing.T, contractInfo setup. } negativeOffset := -5 - result, err = procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ - Platform: platform, - Key: metadataKey, - Offset: &negativeOffset, - Height: 1, + _, err = procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ + Platform: platform, + Key: metadataKey, + Offset: &negativeOffset, + Height: 1, }) if err != nil { return errors.Wrap(err, "unexpected error with negative offset") From fdf379550221148747e0fd509eee58d36e661d88 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Fri, 5 Sep 2025 15:54:34 -0300 Subject: [PATCH 8/9] refactor: improve error handling and logging in account retrieval This commit enhances the `BroadcastAutoDigestWithArgsAndParse` function in `engine_ops.go` by refining the error handling for account retrieval. Key changes include: - Updated the error handling to only treat "not found" or "no rows" as missing accounts, failing fast on other errors. - Improved logging messages for both account not found and account found scenarios, enhancing clarity and consistency. Additionally, the `metadata_test.go` file was modified to replace a wrapped error with a simpler error message for expected empty results, improving test clarity. These changes aim to enhance the robustness and maintainability of the codebase. --- extensions/tn_digest/internal/engine_ops.go | 21 +++++++++++++++++---- tests/streams/query/metadata_test.go | 2 +- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/extensions/tn_digest/internal/engine_ops.go b/extensions/tn_digest/internal/engine_ops.go index 05351a30d..020ffc722 100644 --- a/extensions/tn_digest/internal/engine_ops.go +++ b/extensions/tn_digest/internal/engine_ops.go @@ -151,13 +151,26 @@ func (e *EngineOperations) BroadcastAutoDigestWithArgsAndParse( account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) var nextNonce uint64 if err != nil { - // Account doesn't exist yet - use nonce 1 for first transaction - e.logger.Info("DEBUG: Account not found, using nonce 1 for first transaction", "account", fmt.Sprintf("%x", signerAccountID.Identifier), "error", err) - nextNonce = uint64(1) + // Only treat “not found” / “no rows” as missing-account; fail fast on any other error + msg := strings.ToLower(err.Error()) + if !strings.Contains(msg, "not found") && !strings.Contains(msg, "no rows") { + return nil, fmt.Errorf("get account: %w", err) + } + e.logger.Info( + "Account not found, using nonce 1 for first transaction", + "account", fmt.Sprintf("%x", signerAccountID.Identifier), + ) + nextNonce = 1 } else { // Account exists - use next nonce nextNonce = uint64(account.Nonce + 1) - e.logger.Info("DEBUG: Account found, using next nonce", "account", fmt.Sprintf("%x", signerAccountID.Identifier), "currentNonce", account.Nonce, "nextNonce", nextNonce, "balance", account.Balance) + e.logger.Info( + "Account found, using next nonce", + "account", fmt.Sprintf("%x", signerAccountID.Identifier), + "currentNonce", account.Nonce, + "nextNonce", nextNonce, + "balance", account.Balance, + ) } // Encode arguments diff --git a/tests/streams/query/metadata_test.go b/tests/streams/query/metadata_test.go index c07a1b2ec..44d869452 100644 --- a/tests/streams/query/metadata_test.go +++ b/tests/streams/query/metadata_test.go @@ -255,7 +255,7 @@ func testListMetadataByHeightNoKey(t *testing.T, contractInfo setup.StreamInfo) } if len(result) > 0 { // should return no rows - return errors.Wrapf(err, "expected empty results") + return errors.New("expected empty results") } return nil From 0b1fbdd4843531dd9a1632062b0163f4e5bc1aa4 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Fri, 5 Sep 2025 16:05:44 -0300 Subject: [PATCH 9/9] chore: add CI cleanup task and script for managing Kwil DB resources This commit introduces a new task in the Taskfile for cleaning up lingering Kwil DB containers and processes, aimed at improving CI reliability. Additionally, a new script `ci-cleanup.sh` is added to handle the cleanup operations, including stopping and removing Docker containers and images associated with Kwil DB, as well as killing any processes that may bind to specific ports. The CI workflow is updated to invoke this cleanup script before each test attempt, ensuring a clean environment for retries. --- .github/workflows/ci.yaml | 12 ++++----- Taskfile.yml | 7 ++++++ scripts/ci-cleanup.sh | 53 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 6 deletions(-) create mode 100644 scripts/ci-cleanup.sh diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ad1047119..46c445cc9 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -79,13 +79,13 @@ jobs: while [ $attempt -le $max_attempts ]; do echo "Test attempt $attempt of $max_attempts" - # Clean dangling Docker state only on retries + # Always try to cleanup lingering Kwil DB resources before each attempt + bash node/scripts/ci-cleanup.sh || true + + # Additional cleanup only on retries if [ $attempt -gt 1 ]; then - if [ -f docker-compose.yml ]; then - docker compose down -v || true - else - docker system prune -af --volumes || true - fi + docker compose -f node/compose.yaml down -v || true + docker system prune -af --volumes || true sleep 5 fi diff --git a/Taskfile.yml b/Taskfile.yml index be6ff5073..7c6e6b488 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -235,6 +235,13 @@ tasks: - go install github.com/golangci/golangci-lint/cmd/golangci-lint@v2.2.1 - go install github.com/vektra/mockery/v2@v2.42.1 + # ─── ci ─────────────────────────────────────────────────────────────────────── + + ci:cleanup: + desc: Cleanup any lingering Kwil DB containers/processes (for CI retries) + cmds: + - bash ./scripts/ci-cleanup.sh + mockery: desc: Generate mocks cmds: diff --git a/scripts/ci-cleanup.sh b/scripts/ci-cleanup.sh new file mode 100644 index 000000000..e891e9029 --- /dev/null +++ b/scripts/ci-cleanup.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +set -euo pipefail + +echo "[ci-cleanup] Starting cleanup of Kwil DB resources..." + +# Stop and remove docker compose stack(s) +if [ -f "compose.yaml" ]; then + echo "[ci-cleanup] docker compose down for single stack" + docker compose -f compose.yaml down -v --remove-orphans || true +fi + +# Common container names/images +names=("tn-db" "kwil-postgres" "kwild" "postgres") +images=("kwildb/postgres" "tn-db:local" "kwildb/postgres:latest" "kwildb/postgres:16.8-1") + +echo "[ci-cleanup] Stopping/removing lingering containers by name..." +for n in "${names[@]}"; do + ids=$(docker ps -aq --filter "name=${n}") || true + if [ -n "${ids}" ]; then + echo "[ci-cleanup] Removing containers matching name ${n}: ${ids}" + docker rm -f ${ids} || true + fi +done + +echo "[ci-cleanup] Stopping/removing lingering containers by image..." +for img in "${images[@]}"; do + ids=$(docker ps -aq --filter "ancestor=${img}") || true + if [ -n "${ids}" ]; then + echo "[ci-cleanup] Removing containers for image ${img}: ${ids}" + docker rm -f ${ids} || true + fi +done + +# Kill leftover processes that might bind ports (8484, 5432) +ports=(8484 5432) +for p in "${ports[@]}"; do + if command -v lsof >/dev/null 2>&1; then + pids=$(lsof -t -i :${p}) || true + if [ -n "${pids}" ]; then + echo "[ci-cleanup] Killing processes on port ${p}: ${pids}" + kill -9 ${pids} || true + fi + fi +done + +echo "[ci-cleanup] Cleanup complete." + +# Best-effort: kill kwild process if it was started directly (not via docker) +if command -v pkill >/dev/null 2>&1; then + pkill -9 -f "\bkwild\b" 2>/dev/null || true +fi + +