From f37cbf9d769aab3aa6a35d116449250dc8234f18 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Wed, 5 Nov 2025 15:20:35 -0300 Subject: [PATCH 01/11] chore: add transaction ledger schema and helper actions Introduce a new SQL migration for transaction ledger management, including the creation of tables for transaction methods, events, and distributions. Implement helper actions for recording transaction events, appending fee distributions, retrieving transaction details, and listing transaction fees. This enhances the system's capability to manage transaction fees and their distribution effectively. --- .../migrations/026-transaction-schemas.sql | 51 +++++ .../migrations/027-transaction-actions.sql | 178 ++++++++++++++++++ 2 files changed, 229 insertions(+) create mode 100644 internal/migrations/026-transaction-schemas.sql create mode 100644 internal/migrations/027-transaction-actions.sql diff --git a/internal/migrations/026-transaction-schemas.sql b/internal/migrations/026-transaction-schemas.sql new file mode 100644 index 000000000..6d548a415 --- /dev/null +++ b/internal/migrations/026-transaction-schemas.sql @@ -0,0 +1,51 @@ +/** + * ============================================================================= + * Transaction Ledger Schema + * ============================================================================= + * + * Defines tables used to record transaction fee events and their per-recipient + * distributions. Helper actions live in 027-transaction-actions.sql. + */ + +CREATE TABLE IF NOT EXISTS transaction_methods ( + method_id INT PRIMARY KEY, + name TEXT NOT NULL UNIQUE +); + +INSERT INTO transaction_methods (method_id, name) VALUES + (1, 'deployStream'), + (2, 'insertRecords'), + (3, 'setTaxonomies'), + (4, 'transferTN'), + (5, 'withdrawTN'), + (6, 'requestAttestation'), + (7, 'setMetadata') +ON CONFLICT (method_id) DO NOTHING; + +CREATE TABLE IF NOT EXISTS transaction_events ( + tx_id TEXT PRIMARY KEY, + block_height INT8 NOT NULL, + method_id INT NOT NULL REFERENCES transaction_methods(method_id), + caller TEXT NOT NULL, + fee_amount NUMERIC(78, 0) NOT NULL DEFAULT 0, + fee_recipient TEXT, + metadata TEXT +); + +CREATE INDEX IF NOT EXISTS transaction_events_block_idx + ON transaction_events (block_height, tx_id); + +CREATE INDEX IF NOT EXISTS transaction_events_caller_idx + ON transaction_events (caller, block_height, tx_id); + +CREATE TABLE IF NOT EXISTS transaction_event_distributions ( + tx_id TEXT NOT NULL REFERENCES transaction_events(tx_id) ON DELETE CASCADE, + sequence INT NOT NULL, + recipient TEXT NOT NULL, + amount NUMERIC(78, 0) NOT NULL, + note TEXT, + PRIMARY KEY (tx_id, sequence) +); + +CREATE INDEX IF NOT EXISTS transaction_event_dist_recipient_idx + ON transaction_event_distributions (recipient, tx_id); diff --git a/internal/migrations/027-transaction-actions.sql b/internal/migrations/027-transaction-actions.sql new file mode 100644 index 000000000..f5e24b432 --- /dev/null +++ b/internal/migrations/027-transaction-actions.sql @@ -0,0 +1,178 @@ +/** + * ============================================================================= + * Transaction Ledger Helper Actions + * ============================================================================= + */ + +CREATE OR REPLACE ACTION record_transaction_event( + $method_id INT, + $fee_amount NUMERIC(78, 0), + $fee_recipient TEXT, + $metadata TEXT +) PRIVATE { + IF $method_id IS NULL { + ERROR('record_transaction_event: method_id is required'); + } + + IF @txid IS NULL { + ERROR('record_transaction_event: missing transaction id'); + } + + $tx_hex TEXT := LOWER(@txid); + IF LENGTH($tx_hex) != 64 { + ERROR('record_transaction_event: txid must be 32-byte hex string'); + } + $tx_id TEXT := '0x' || $tx_hex; + + $caller_lower TEXT := LOWER(@caller); + IF NOT check_ethereum_address($caller_lower) { + ERROR('record_transaction_event: caller is not a valid address: ' || $caller_lower); + } + + $recipient_lower TEXT := NULL; + IF $fee_recipient IS NOT NULL AND $fee_recipient != '' { + $recipient_lower := LOWER($fee_recipient); + IF NOT check_ethereum_address($recipient_lower) { + ERROR('record_transaction_event: fee_recipient is not a valid address: ' || $fee_recipient); + } + } + + $amount NUMERIC(78, 0) := COALESCE($fee_amount, 0::NUMERIC(78, 0)); + + INSERT INTO transaction_events ( + tx_id, + block_height, + method_id, + caller, + fee_amount, + fee_recipient, + metadata + ) VALUES ( + $tx_id, + @height, + $method_id, + $caller_lower, + $amount, + $recipient_lower, + $metadata + ); + + IF $recipient_lower IS NOT NULL AND $amount > 0::NUMERIC(78, 0) { + INSERT INTO transaction_event_distributions ( + tx_id, + sequence, + recipient, + amount, + note + ) VALUES ( + $tx_id, + 1, + $recipient_lower, + $amount, + NULL + ); + } +}; + +/** + * append_fee_distribution: Adds an additional fee recipient for the current transaction. + */ +CREATE OR REPLACE ACTION append_fee_distribution( + $recipient TEXT, + $amount NUMERIC(78, 0) +) PRIVATE { + IF @txid IS NULL { + ERROR('append_fee_distribution: missing transaction id'); + } + + IF $recipient IS NULL { + ERROR('append_fee_distribution: recipient is required'); + } + + $recipient_lower TEXT := LOWER($recipient); + IF NOT check_ethereum_address($recipient_lower) { + ERROR('append_fee_distribution: invalid recipient address: ' || $recipient); + } + + $tx_hex TEXT := LOWER(@txid); + IF LENGTH($tx_hex) != 64 { + ERROR('append_fee_distribution: txid must be 32-byte hex string'); + } + $tx_id TEXT := '0x' || $tx_hex; + + $amount_sanitized NUMERIC(78, 0) := COALESCE($amount, 0::NUMERIC(78, 0)); + + $next_sequence INT := 1; + FOR $row IN + SELECT COALESCE(MAX(sequence), 0) + 1 AS seq + FROM transaction_event_distributions + WHERE tx_id = $tx_id + { + $next_sequence := $row.seq; + } + + INSERT INTO transaction_event_distributions ( + tx_id, + sequence, + recipient, + amount, + note + ) VALUES ( + $tx_id, + $next_sequence, + $recipient_lower, + $amount_sanitized, + NULL + ); +}; + +/** + * get_transaction_event: Fetches a single transaction ledger entry by tx hash. + * Accepts tx id with or without 0x prefix. + */ +CREATE OR REPLACE ACTION get_transaction_event( + $tx_id TEXT +) PUBLIC VIEW RETURNS ( + tx_id TEXT, + block_height INT8, + method TEXT, + caller TEXT, + fee_amount NUMERIC(78, 0), + fee_recipient TEXT, + metadata TEXT, + fee_distributions TEXT +) { + IF $tx_id IS NULL { + ERROR('tx_id is required'); + } + + $tx_clean TEXT := LOWER($tx_id); + IF substring($tx_clean, 1, 2) != '0x' { + $tx_clean := '0x' || $tx_clean; + } + + RETURN + SELECT + te.tx_id, + te.block_height, + tm.name, + te.caller, + te.fee_amount, + te.fee_recipient, + te.metadata, + COALESCE(d.fee_distributions, '') AS fee_distributions + FROM transaction_events te + JOIN transaction_methods tm ON tm.method_id = te.method_id + LEFT JOIN ( + SELECT + tx_id, + string_agg( + recipient || ':' || amount::TEXT, + ',' ORDER BY sequence ASC + ) AS fee_distributions + FROM transaction_event_distributions + GROUP BY tx_id + ) d ON d.tx_id = te.tx_id + WHERE te.tx_id = $tx_clean + LIMIT 1; +}; From 2f625cb9c467441ab5c7af07280efa08aab4e174 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 11:16:29 -0300 Subject: [PATCH 02/11] chore: enhance transaction event handling and fee recording This update introduces several improvements to transaction event management across various actions. Key changes include the addition of fee calculations and recipient tracking for stream creation, record insertion, and taxonomy insertion actions. The `record_transaction_event` function is now utilized to log transaction details, including fees and recipients, enhancing the system's ability to manage and audit transaction fees effectively. Additionally, new tests have been added to ensure the integrity of these features. --- internal/migrations/001-common-actions.sql | 26 +- .../migrations/003-primitive-insertion.sql | 22 +- internal/migrations/004-composed-taxonomy.sql | 20 +- .../010-get-latest-write-activity.sql | 254 +++++--- .../migrations/024-attestation-actions.sql | 11 +- .../migrations/026-transaction-schemas.sql | 4 +- .../migrations/027-transaction-actions.sql | 175 +++++- internal/migrations/901-utilities.sql | 1 - .../migrations/erc20-bridge/001-actions.sql | 32 +- .../002-public-transfer-actions.sql | 141 +++-- .../streams/transaction_events_ledger_test.go | 562 ++++++++++++++++++ 11 files changed, 1070 insertions(+), 178 deletions(-) create mode 100644 tests/streams/transaction_events_ledger_test.go diff --git a/internal/migrations/001-common-actions.sql b/internal/migrations/001-common-actions.sql index 87c0e9976..ddc700a19 100644 --- a/internal/migrations/001-common-actions.sql +++ b/internal/migrations/001-common-actions.sql @@ -59,6 +59,9 @@ CREATE OR REPLACE ACTION create_streams( ) PUBLIC { -- ===== FEE COLLECTION WITH ROLE EXEMPTION ===== $lower_caller TEXT := LOWER(@caller); + $fee_total NUMERIC(78, 0) := 0::NUMERIC(78, 0); + $fee_recipient TEXT := NULL; + $leader_hex TEXT := NULL; -- Get stream count (used for both fee calculation and validation) $num_streams INT := array_length($stream_ids); @@ -77,6 +80,11 @@ CREATE OR REPLACE ACTION create_streams( $fee_per_stream := 2000000000000000000::NUMERIC(78, 0); -- 2 TRUF with 18 decimals $total_fee := $fee_per_stream * $num_streams::NUMERIC(78, 0); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex := encode(@leader_sender, 'hex')::TEXT; + $caller_balance := ethereum_bridge.balance(@caller); IF $caller_balance < $total_fee { @@ -84,8 +92,9 @@ CREATE OR REPLACE ACTION create_streams( ERROR('Insufficient balance for stream creation. Required: ' || ($total_fee / 1000000000000000000::NUMERIC(78, 0))::TEXT || ' TRUF for ' || $num_streams::TEXT || ' stream(s)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $total_fee); + ethereum_bridge.transfer($leader_hex, $total_fee); + $fee_total := $total_fee; + $fee_recipient := '0x' || $leader_hex; } -- ===== END FEE COLLECTION ===== @@ -288,6 +297,12 @@ CREATE OR REPLACE ACTION create_streams( JOIN data_providers dp ON dp.address = $data_provider JOIN streams s ON s.data_provider_id = dp.id AND s.stream_id = t.stream_id; + record_transaction_event( + 1, + $fee_total, + $fee_recipient, + NULL + ); }; @@ -376,6 +391,13 @@ CREATE OR REPLACE ACTION insert_metadata( $current_block, $stream_ref ); + + record_transaction_event( + 7, + 0::NUMERIC(78, 0), + NULL, + NULL + ); }; /** diff --git a/internal/migrations/003-primitive-insertion.sql b/internal/migrations/003-primitive-insertion.sql index ddcdc706e..a91f411b2 100644 --- a/internal/migrations/003-primitive-insertion.sql +++ b/internal/migrations/003-primitive-insertion.sql @@ -30,6 +30,9 @@ CREATE OR REPLACE ACTION insert_records( -- Use helper function to avoid expensive for-loop roundtrips $data_provider := helper_lowercase_array($data_provider); $lower_caller TEXT := LOWER(@caller); + $fee_total NUMERIC(78, 0) := 0::NUMERIC(78, 0); + $fee_recipient TEXT := NULL; + $leader_hex TEXT := NULL; -- Get record count (used for both fee calculation and validation) $num_records INT := array_length($data_provider); @@ -49,6 +52,11 @@ CREATE OR REPLACE ACTION insert_records( $fee_per_record := 2000000000000000000::NUMERIC(78, 0); -- 2 TRUF with 18 decimals $total_fee := $fee_per_record * $num_records::NUMERIC(78, 0); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex := encode(@leader_sender, 'hex')::TEXT; + $caller_balance := ethereum_bridge.balance(@caller); IF $caller_balance < $total_fee { @@ -56,8 +64,9 @@ CREATE OR REPLACE ACTION insert_records( ERROR('Insufficient balance for write fee. Required: ' || ($total_fee / 1000000000000000000::NUMERIC(78, 0))::TEXT || ' TRUF for ' || $num_records::TEXT || ' record(s)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $total_fee); + ethereum_bridge.transfer($leader_hex, $total_fee); + $fee_total := $total_fee; + $fee_recipient := '0x' || $leader_hex; } -- ===== END FEE COLLECTION ===== if $num_records != array_length($stream_id) or $num_records != array_length($event_time) or $num_records != array_length($value) { @@ -102,4 +111,11 @@ CREATE OR REPLACE ACTION insert_records( $event_time, $value ); -}; \ No newline at end of file + + record_transaction_event( + 2, + $fee_total, + $fee_recipient, + NULL + ); +}; diff --git a/internal/migrations/004-composed-taxonomy.sql b/internal/migrations/004-composed-taxonomy.sql index ff7027e93..2306c7669 100644 --- a/internal/migrations/004-composed-taxonomy.sql +++ b/internal/migrations/004-composed-taxonomy.sql @@ -15,6 +15,9 @@ CREATE OR REPLACE ACTION insert_taxonomy( $child_data_providers[$i] := LOWER($child_data_providers[$i]); } $lower_caller := LOWER(@caller); + $fee_total NUMERIC(78, 0) := 0::NUMERIC(78, 0); + $fee_recipient TEXT := NULL; + $leader_hex TEXT := NULL; -- ensure it's a composed stream if is_primitive_stream($data_provider, $stream_id) == true { @@ -56,6 +59,11 @@ CREATE OR REPLACE ACTION insert_taxonomy( $fee_per_stream := 2000000000000000000::NUMERIC(78, 0); -- 2 TRUF with 18 decimals $total_fee := $fee_per_stream * $num_children::NUMERIC(78, 0); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex := encode(@leader_sender, 'hex')::TEXT; + $caller_balance := ethereum_bridge.balance(@caller); IF $caller_balance < $total_fee { @@ -63,8 +71,9 @@ CREATE OR REPLACE ACTION insert_taxonomy( ERROR('Insufficient balance for taxonomies creation. Required: ' || ($total_fee / 1000000000000000000::NUMERIC(78, 0))::TEXT || ' TRUF for ' || $num_children::TEXT || ' child stream(s)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $total_fee); + ethereum_bridge.transfer($leader_hex, $total_fee); + $fee_total := $total_fee; + $fee_recipient := '0x' || $leader_hex; } -- ===== END FEE COLLECTION ===== @@ -113,6 +122,13 @@ CREATE OR REPLACE ACTION insert_taxonomy( $child_stream_ref ); } + + record_transaction_event( + 3, + $fee_total, + $fee_recipient, + NULL + ); }; /** diff --git a/internal/migrations/010-get-latest-write-activity.sql b/internal/migrations/010-get-latest-write-activity.sql index 287670289..e6fe19a64 100644 --- a/internal/migrations/010-get-latest-write-activity.sql +++ b/internal/migrations/010-get-latest-write-activity.sql @@ -1,71 +1,183 @@ -CREATE OR REPLACE ACTION get_last_transactions( - $data_provider TEXT, - $limit_size INT8 -) PUBLIC VIEW RETURNS TABLE( - created_at INT8, - method TEXT -) { - $data_provider := LOWER($data_provider); - IF $limit_size IS NULL { - $limit_size := 6; - } - IF $limit_size <= 0 { - $limit_size := 6; - } - - IF $limit_size > 100 { - ERROR('Limit size cannot exceed 100'); - } - - -- Fetch top limit_size DISTINCT blocks per table, then take top limit_size overall - -- This gets the most recent activity from each table type and combines them - RETURN SELECT created_at, method FROM ( - SELECT created_at, method, ROW_NUMBER() OVER (PARTITION BY created_at ORDER BY priority ASC) AS rn FROM ( - SELECT s.created_at, 'deployStream' AS method, 1 AS priority - FROM ( - SELECT DISTINCT s.created_at - FROM streams s - JOIN data_providers dp ON s.data_provider_id = dp.id - WHERE COALESCE($data_provider, '') = '' OR dp.address = $data_provider - ORDER BY s.created_at DESC - LIMIT $limit_size - ) s - UNION ALL - SELECT pe.created_at, 'insertRecords', 2 - FROM ( - SELECT DISTINCT pe.created_at - FROM primitive_events pe - JOIN streams s ON pe.stream_ref = s.id - JOIN data_providers dp ON s.data_provider_id = dp.id - WHERE COALESCE($data_provider, '') = '' OR dp.address = $data_provider - ORDER BY pe.created_at DESC - LIMIT $limit_size - ) pe - UNION ALL - SELECT t.created_at, 'setTaxonomies', 3 - FROM ( - SELECT DISTINCT t.created_at - FROM taxonomies t - JOIN streams s ON t.stream_ref = s.id - JOIN data_providers dp ON s.data_provider_id = dp.id - WHERE COALESCE($data_provider, '') = '' OR dp.address = $data_provider - ORDER BY t.created_at DESC - LIMIT $limit_size - ) t - UNION ALL - SELECT m.created_at, 'setMetadata', 4 - FROM ( - SELECT DISTINCT m.created_at - FROM metadata m - JOIN streams s ON m.stream_ref = s.id - JOIN data_providers dp ON s.data_provider_id = dp.id - WHERE COALESCE($data_provider, '') = '' OR dp.address = $data_provider - ORDER BY m.created_at DESC - LIMIT $limit_size - ) m - ) AS combined - ) AS ranked - WHERE rn = 1 - ORDER BY created_at DESC - LIMIT $limit_size; -} +/** + * Transaction history views + * + * get_last_transactions_v1 - legacy implementation (no fee/caller metadata) + * get_last_transactions_v2 - ledger-backed implementation with fee details + * get_last_transactions - temporary wrapper returning the v2 signature but + * still sourcing data from v1. This will be replaced + * with v2 once callers migrate. + */ + +CREATE OR REPLACE ACTION get_last_transactions_v1( + $data_provider TEXT, + $limit_size INT8 +) PUBLIC VIEW RETURNS TABLE( + created_at INT8, + method TEXT +) { + $normalized_provider TEXT := NULL; + IF COALESCE($data_provider, '') != '' { + $normalized_provider := LOWER($data_provider); + } + + IF $limit_size IS NULL { + $limit_size := 6; + } + IF $limit_size <= 0 { + $limit_size := 6; + } + + IF $limit_size > 100 { + ERROR('Limit size cannot exceed 100'); + } + + RETURN SELECT created_at, method FROM ( + SELECT created_at, method, ROW_NUMBER() OVER (PARTITION BY created_at ORDER BY priority ASC) AS rn FROM ( + SELECT s.created_at, 'deployStream' AS method, 1 AS priority + FROM ( + SELECT DISTINCT s.created_at + FROM streams s + JOIN data_providers dp ON s.data_provider_id = dp.id + WHERE COALESCE($normalized_provider, '') = '' OR dp.address = $normalized_provider + ORDER BY s.created_at DESC + LIMIT $limit_size + ) s + UNION ALL + SELECT pe.created_at, 'insertRecords', 2 + FROM ( + SELECT DISTINCT pe.created_at + FROM primitive_events pe + JOIN streams s ON pe.stream_ref = s.id + JOIN data_providers dp ON s.data_provider_id = dp.id + WHERE COALESCE($normalized_provider, '') = '' OR dp.address = $normalized_provider + ORDER BY pe.created_at DESC + LIMIT $limit_size + ) pe + UNION ALL + SELECT t.created_at, 'setTaxonomies', 3 + FROM ( + SELECT DISTINCT t.created_at + FROM taxonomies t + JOIN streams s ON t.stream_ref = s.id + JOIN data_providers dp ON s.data_provider_id = dp.id + WHERE COALESCE($normalized_provider, '') = '' OR dp.address = $normalized_provider + ORDER BY t.created_at DESC + LIMIT $limit_size + ) t + UNION ALL + SELECT m.created_at, 'setMetadata', 4 + FROM ( + SELECT DISTINCT m.created_at + FROM metadata m + JOIN streams s ON m.stream_ref = s.id + JOIN data_providers dp ON s.data_provider_id = dp.id + WHERE COALESCE($normalized_provider, '') = '' OR dp.address = $normalized_provider + ORDER BY m.created_at DESC + LIMIT $limit_size + ) m + ) AS combined + ) AS ranked + WHERE rn = 1 + ORDER BY created_at DESC + LIMIT $limit_size; +}; + +CREATE OR REPLACE ACTION get_last_transactions_v2( + $data_provider TEXT, + $limit_size INT8 +) PUBLIC VIEW RETURNS TABLE( + tx_id TEXT, + created_at INT8, + method TEXT, + caller TEXT, + fee_amount NUMERIC(78, 0), + fee_recipient TEXT, + metadata TEXT, + fee_distributions TEXT +) { + $normalized_provider TEXT := NULL; + IF COALESCE($data_provider, '') != '' { + $normalized_provider := LOWER($data_provider); + IF NOT check_ethereum_address($normalized_provider) { + ERROR('Invalid data provider address. Must be a valid Ethereum address: ' || $data_provider); + } + } + + IF $limit_size IS NULL OR $limit_size <= 0 { + $limit_size := 6; + } + + IF $limit_size > 100 { + ERROR('Limit size cannot exceed 100'); + } + + RETURN + WITH distributions AS ( + SELECT + tx_id, + string_agg( + recipient || ':' || amount::TEXT, + ',' ORDER BY sequence ASC + ) AS fee_distributions + FROM transaction_event_distributions + GROUP BY tx_id + ) + SELECT + te.tx_id, + te.block_height AS created_at, + tm.name AS method, + te.caller, + te.fee_amount, + te.fee_recipient, + te.metadata, + COALESCE(d.fee_distributions, '') AS fee_distributions + FROM transaction_events te + JOIN transaction_methods tm ON tm.method_id = te.method_id + LEFT JOIN distributions d ON d.tx_id = te.tx_id + WHERE $normalized_provider IS NULL OR te.caller = $normalized_provider + ORDER BY te.block_height DESC, te.tx_id DESC + LIMIT $limit_size; +}; + +CREATE OR REPLACE ACTION get_last_transactions( + $data_provider TEXT, + $limit_size INT8 +) PUBLIC VIEW RETURNS TABLE( + tx_id TEXT, + created_at INT8, + method TEXT, + caller TEXT, + fee_amount NUMERIC(78, 0), + fee_recipient TEXT, + metadata TEXT, + fee_distributions TEXT +) { + $normalized_provider TEXT := NULL; + IF COALESCE($data_provider, '') != '' { + $normalized_provider := LOWER($data_provider); + IF NOT check_ethereum_address($normalized_provider) { + ERROR('Invalid data provider address. Must be a valid Ethereum address: ' || $data_provider); + } + } + + $limit_val INT := COALESCE($limit_size, 6); + IF $limit_val <= 0 { + $limit_val := 6; + } + + IF $limit_val > 100 { + ERROR('Limit size cannot exceed 100'); + } + + RETURN + SELECT + NULL::TEXT AS tx_id, + lt.created_at, + lt.method, + NULL::TEXT AS caller, + NULL::NUMERIC(78, 0) AS fee_amount, + NULL::TEXT AS fee_recipient, + NULL::TEXT AS metadata, + ''::TEXT AS fee_distributions + FROM get_last_transactions_v1($normalized_provider, $limit_val) lt; +}; diff --git a/internal/migrations/024-attestation-actions.sql b/internal/migrations/024-attestation-actions.sql index c333f9e45..5f2b9418d 100644 --- a/internal/migrations/024-attestation-actions.sql +++ b/internal/migrations/024-attestation-actions.sql @@ -151,8 +151,15 @@ CREATE OR REPLACE ACTION request_attestation( $created_height, NULL, NULL, NULL ); --- Queue for signing (no-op on non-leader validators; handled by precompile) -tn_attestation.queue_for_signing(encode($attestation_hash, 'hex')); + -- Queue for signing (no-op on non-leader validators; handled by precompile) + tn_attestation.queue_for_signing(encode($attestation_hash, 'hex')); + + record_transaction_event( + 6, + $attestation_fee, + '0x' || $leader_addr, + NULL + ); RETURN $request_tx_id, $attestation_hash; }; diff --git a/internal/migrations/026-transaction-schemas.sql b/internal/migrations/026-transaction-schemas.sql index 6d548a415..d1e137d2d 100644 --- a/internal/migrations/026-transaction-schemas.sql +++ b/internal/migrations/026-transaction-schemas.sql @@ -29,7 +29,7 @@ CREATE TABLE IF NOT EXISTS transaction_events ( caller TEXT NOT NULL, fee_amount NUMERIC(78, 0) NOT NULL DEFAULT 0, fee_recipient TEXT, - metadata TEXT + metadata TEXT -- future could be JSONB when supported ); CREATE INDEX IF NOT EXISTS transaction_events_block_idx @@ -47,5 +47,5 @@ CREATE TABLE IF NOT EXISTS transaction_event_distributions ( PRIMARY KEY (tx_id, sequence) ); -CREATE INDEX IF NOT EXISTS transaction_event_dist_recipient_idx +CREATE INDEX IF NOT EXISTS tx_event_dist_rec_idx ON transaction_event_distributions (recipient, tx_id); diff --git a/internal/migrations/027-transaction-actions.sql b/internal/migrations/027-transaction-actions.sql index f5e24b432..00f8374a2 100644 --- a/internal/migrations/027-transaction-actions.sql +++ b/internal/migrations/027-transaction-actions.sql @@ -5,6 +5,7 @@ */ CREATE OR REPLACE ACTION record_transaction_event( + -- check internal/migrations/026-transaction-schemas.sql for method_id values $method_id INT, $fee_amount NUMERIC(78, 0), $fee_recipient TEXT, @@ -39,44 +40,52 @@ CREATE OR REPLACE ACTION record_transaction_event( $amount NUMERIC(78, 0) := COALESCE($fee_amount, 0::NUMERIC(78, 0)); - INSERT INTO transaction_events ( - tx_id, - block_height, - method_id, - caller, - fee_amount, - fee_recipient, - metadata - ) VALUES ( - $tx_id, - @height, - $method_id, - $caller_lower, - $amount, - $recipient_lower, - $metadata - ); + $event_exists BOOL := FALSE; + FOR $row IN + SELECT 1 + FROM transaction_events + WHERE tx_id = $tx_id + LIMIT 1 + { + $event_exists := TRUE; + } - IF $recipient_lower IS NOT NULL AND $amount > 0::NUMERIC(78, 0) { - INSERT INTO transaction_event_distributions ( + IF !$event_exists { + INSERT INTO transaction_events ( tx_id, - sequence, - recipient, - amount, - note + block_height, + method_id, + caller, + fee_amount, + fee_recipient, + metadata ) VALUES ( $tx_id, - 1, - $recipient_lower, + @height, + $method_id, + $caller_lower, $amount, - NULL + $recipient_lower, + $metadata ); + IF $recipient_lower IS NOT NULL AND $amount > 0::NUMERIC(78, 0) { + INSERT INTO transaction_event_distributions ( + tx_id, + sequence, + recipient, + amount, + note + ) VALUES ( + $tx_id, + 1, + $recipient_lower, + $amount, + NULL + ); + } } }; -/** - * append_fee_distribution: Adds an additional fee recipient for the current transaction. - */ CREATE OR REPLACE ACTION append_fee_distribution( $recipient TEXT, $amount NUMERIC(78, 0) @@ -124,12 +133,9 @@ CREATE OR REPLACE ACTION append_fee_distribution( $amount_sanitized, NULL ); + }; -/** - * get_transaction_event: Fetches a single transaction ledger entry by tx hash. - * Accepts tx id with or without 0x prefix. - */ CREATE OR REPLACE ACTION get_transaction_event( $tx_id TEXT ) PUBLIC VIEW RETURNS ( @@ -176,3 +182,106 @@ CREATE OR REPLACE ACTION get_transaction_event( WHERE te.tx_id = $tx_clean LIMIT 1; }; + +CREATE OR REPLACE ACTION list_transaction_fees( + $wallet TEXT, + $mode TEXT DEFAULT 'paid', + $limit INT DEFAULT 20, + $offset INT DEFAULT 0 +) PUBLIC VIEW RETURNS TABLE ( + tx_id TEXT, + block_height INT8, + method TEXT, + caller TEXT, + total_fee NUMERIC(78, 0), + fee_recipient TEXT, + metadata TEXT, + fee_distributions TEXT +) { + IF $wallet IS NULL OR trim($wallet) = '' { + ERROR('wallet is required'); + } + + $wallet_lower TEXT := LOWER($wallet); + IF NOT check_ethereum_address($wallet_lower) { + ERROR('wallet must be a valid Ethereum address: ' || $wallet); + } + + $mode_normalized TEXT := LOWER(COALESCE($mode, 'paid')); + IF $mode_normalized != 'paid' AND $mode_normalized != 'received' AND $mode_normalized != 'both' { + ERROR('mode must be one of paid, received, or both'); + } + + $limit_val INT := COALESCE($limit, 20); + IF $limit_val <= 0 { + $limit_val := 20; + } + IF $limit_val > 1000 { + ERROR('limit cannot exceed 1000'); + } + + $offset_val INT := COALESCE($offset, 0); + IF $offset_val < 0 { + $offset_val := 0; + } + + RETURN + WITH distributions AS ( + SELECT + tx_id, + string_agg( + recipient || ':' || amount::TEXT, + ',' ORDER BY sequence ASC + ) AS fee_distributions + FROM transaction_event_distributions + GROUP BY tx_id + ), + filtered AS ( + SELECT + te.tx_id, + te.block_height, + tm.name AS method, + te.caller, + te.fee_amount, + te.fee_recipient, + te.metadata, + COALESCE(d.fee_distributions, '') AS fee_distributions + FROM transaction_events te + JOIN transaction_methods tm ON tm.method_id = te.method_id + LEFT JOIN distributions d ON d.tx_id = te.tx_id + WHERE + ($mode_normalized = 'paid' AND te.caller = $wallet_lower) + OR ($mode_normalized = 'received' AND ( + te.fee_recipient = $wallet_lower + OR EXISTS ( + SELECT 1 + FROM transaction_event_distributions ted + WHERE ted.tx_id = te.tx_id + AND ted.recipient = $wallet_lower + ) + )) + OR ($mode_normalized = 'both' AND ( + te.caller = $wallet_lower + OR te.fee_recipient = $wallet_lower + OR EXISTS ( + SELECT 1 + FROM transaction_event_distributions ted + WHERE ted.tx_id = te.tx_id + AND ted.recipient = $wallet_lower + ) + )) + ) + SELECT + tx_id, + block_height, + method, + caller, + fee_amount AS total_fee, + fee_recipient, + metadata, + fee_distributions + FROM filtered + ORDER BY block_height DESC, tx_id DESC + LIMIT $limit_val + OFFSET $offset_val; +}; diff --git a/internal/migrations/901-utilities.sql b/internal/migrations/901-utilities.sql index 1531a8c51..487b02eda 100644 --- a/internal/migrations/901-utilities.sql +++ b/internal/migrations/901-utilities.sql @@ -107,7 +107,6 @@ CREATE OR REPLACE ACTION helper_lowercase_array( ) t; }; - CREATE OR REPLACE ACTION helper_check_cache( $data_provider TEXT, $stream_id TEXT, diff --git a/internal/migrations/erc20-bridge/001-actions.sql b/internal/migrations/erc20-bridge/001-actions.sql index 6239ea94c..8801b43d4 100644 --- a/internal/migrations/erc20-bridge/001-actions.sql +++ b/internal/migrations/erc20-bridge/001-actions.sql @@ -36,12 +36,24 @@ CREATE OR REPLACE ACTION sepolia_bridge_tokens($recipient TEXT DEFAULT NULL, $am ($withdrawal_fee / '1000000000000000000'::NUMERIC(78, 0))::TEXT || ' TRUF fee)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $withdrawal_fee); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex TEXT := encode(@leader_sender, 'hex')::TEXT; + ethereum_bridge.transfer($leader_hex, $withdrawal_fee); -- ===== END FEE COLLECTION ===== + $bridge_recipient TEXT := LOWER(COALESCE($recipient, @caller)); + -- Execute withdrawal using the bridge extension sepolia_bridge.bridge(COALESCE($recipient, @caller), $withdrawal_amount); + + record_transaction_event( + 5, + $withdrawal_fee, + '0x' || $leader_hex, + NULL + ); }; -- MAINNET @@ -82,10 +94,22 @@ CREATE OR REPLACE ACTION ethereum_bridge_tokens($recipient TEXT DEFAULT NULL, $a ($withdrawal_fee / '1000000000000000000'::NUMERIC(78, 0))::TEXT || ' TRUF fee)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $withdrawal_fee); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex TEXT := encode(@leader_sender, 'hex')::TEXT; + ethereum_bridge.transfer($leader_hex, $withdrawal_fee); -- ===== END FEE COLLECTION ===== + $bridge_recipient TEXT := LOWER(COALESCE($recipient, @caller)); + -- Execute withdrawal using the bridge extension ethereum_bridge.bridge(COALESCE($recipient, @caller), $withdrawal_amount); + + record_transaction_event( + 5, + $withdrawal_fee, + '0x' || $leader_hex, + NULL + ); }; diff --git a/internal/migrations/erc20-bridge/002-public-transfer-actions.sql b/internal/migrations/erc20-bridge/002-public-transfer-actions.sql index 7a55b715b..4bbc037d7 100644 --- a/internal/migrations/erc20-bridge/002-public-transfer-actions.sql +++ b/internal/migrations/erc20-bridge/002-public-transfer-actions.sql @@ -1,58 +1,83 @@ --- Public Transfer Actions for TRUF Token Operations --- Enables users to transfer TRUF tokens between addresses on TN - --- SEPOLIA TESTNET TRANSFERS -CREATE OR REPLACE ACTION sepolia_transfer($to_address TEXT, $amount TEXT) PUBLIC { - -- Validate Ethereum address format - if NOT check_ethereum_address($to_address) { - ERROR('Invalid Ethereum address format. Must be a valid Ethereum address: ' || $to_address); - } - - -- Validate amount is positive - if $amount::NUMERIC(78, 0) <= 0::NUMERIC(78, 0) { - ERROR('Transfer amount must be positive'); - } - - -- Fee - $fee := 1000000000000000000::NUMERIC(78, 0); -- 1 TRUF with 18 decimals - $caller_balance := COALESCE(sepolia_bridge.balance(@caller), 0::NUMERIC(78, 0)); - - IF ($caller_balance < ($amount::NUMERIC(78, 0) + $fee)) { - ERROR('Insufficient balance for transfer. Requires an extra 1 TRUF fee on top of the transfer amount'); - } - - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - sepolia_bridge.transfer($leader_addr, $fee); - - -- Execute transfer using the bridge extension - sepolia_bridge.transfer($to_address, $amount::NUMERIC(78, 0)); -}; - - --- MAINNET TRANSFERS -CREATE OR REPLACE ACTION ethereum_transfer($to_address TEXT, $amount TEXT) PUBLIC { - -- Validate Ethereum address format - if NOT check_ethereum_address($to_address) { - ERROR('Invalid Ethereum address format. Must be a valid Ethereum address: ' || $to_address); - } - - -- Validate amount is positive - if $amount::NUMERIC(78, 0) <= 0::NUMERIC(78, 0) { - ERROR('Transfer amount must be positive'); - } - - -- Fee - $fee := 1000000000000000000::NUMERIC(78, 0); -- 1 TRUF with 18 decimals - $caller_balance := COALESCE(ethereum_bridge.balance(@caller), 0::NUMERIC(78, 0)); - - IF ($caller_balance < ($amount::NUMERIC(78, 0) + $fee)) { - ERROR('Insufficient balance for transfer. Requires an extra 1 TRUF fee on top of the transfer amount'); - } - - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $fee); - - -- Execute transfer using the bridge extension - ethereum_bridge.transfer($to_address, $amount::NUMERIC(78, 0)); -}; - +-- Public Transfer Actions for TRUF Token Operations +-- Enables users to transfer TRUF tokens between addresses on TN + +-- SEPOLIA TESTNET TRANSFERS +CREATE OR REPLACE ACTION sepolia_transfer($to_address TEXT, $amount TEXT) PUBLIC { + $recipient_lower TEXT := LOWER($to_address); + + -- Validate Ethereum address format + if NOT check_ethereum_address($recipient_lower) { + ERROR('Invalid Ethereum address format. Must be a valid Ethereum address: ' || $to_address); + } + + -- Validate amount is positive + if $amount::NUMERIC(78, 0) <= 0::NUMERIC(78, 0) { + ERROR('Transfer amount must be positive'); + } + + -- Fee + $fee := 1000000000000000000::NUMERIC(78, 0); -- 1 TRUF with 18 decimals + $caller_balance := COALESCE(sepolia_bridge.balance(@caller), 0::NUMERIC(78, 0)); + + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex TEXT := encode(@leader_sender, 'hex')::TEXT; + + IF ($caller_balance < ($amount::NUMERIC(78, 0) + $fee)) { + ERROR('Insufficient balance for transfer. Requires an extra 1 TRUF fee on top of the transfer amount'); + } + + sepolia_bridge.transfer($leader_hex, $fee); + + -- Execute transfer using the bridge extension + sepolia_bridge.transfer($to_address, $amount::NUMERIC(78, 0)); + + record_transaction_event( + 4, + $fee, + '0x' || $leader_hex, + NULL + ); +}; + + +-- MAINNET TRANSFERS +CREATE OR REPLACE ACTION ethereum_transfer($to_address TEXT, $amount TEXT) PUBLIC { + $recipient_lower TEXT := LOWER($to_address); + + -- Validate Ethereum address format + if NOT check_ethereum_address($recipient_lower) { + ERROR('Invalid Ethereum address format. Must be a valid Ethereum address: ' || $to_address); + } + + -- Validate amount is positive + if $amount::NUMERIC(78, 0) <= 0::NUMERIC(78, 0) { + ERROR('Transfer amount must be positive'); + } + + -- Fee + $fee := 1000000000000000000::NUMERIC(78, 0); -- 1 TRUF with 18 decimals + $caller_balance := COALESCE(ethereum_bridge.balance(@caller), 0::NUMERIC(78, 0)); + + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex TEXT := encode(@leader_sender, 'hex')::TEXT; + + IF ($caller_balance < ($amount::NUMERIC(78, 0) + $fee)) { + ERROR('Insufficient balance for transfer. Requires an extra 1 TRUF fee on top of the transfer amount'); + } + + ethereum_bridge.transfer($leader_hex, $fee); + + -- Execute transfer using the bridge extension + ethereum_bridge.transfer($to_address, $amount::NUMERIC(78, 0)); + + record_transaction_event( + 4, + $fee, + '0x' || $leader_hex, + NULL + ); +}; diff --git a/tests/streams/transaction_events_ledger_test.go b/tests/streams/transaction_events_ledger_test.go new file mode 100644 index 000000000..954986ad8 --- /dev/null +++ b/tests/streams/transaction_events_ledger_test.go @@ -0,0 +1,562 @@ +//go:build kwiltest + +package tests + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + kwilTypes "github.com/trufnetwork/kwil-db/core/types" + erc20shim "github.com/trufnetwork/kwil-db/node/exts/erc20-bridge/erc20" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/extensions/tn_utils" + "github.com/trufnetwork/node/internal/migrations" + testutils "github.com/trufnetwork/node/tests/streams/utils" + testerc20 "github.com/trufnetwork/node/tests/streams/utils/erc20" + "github.com/trufnetwork/node/tests/streams/utils/setup" + sdkTypes "github.com/trufnetwork/sdk-go/core/types" + "github.com/trufnetwork/sdk-go/core/util" +) + +const ( + ledgerChain = "sepolia" + ledgerEscrow = "0x502430eD0BbE0f230215870c9C2853e126eE5Ae3" + ledgerERC20 = "0x2222222222222222222222222222222222222222" + ledgerExtensionAlias = "sepolia_bridge" + + feeOneTRUF = "1000000000000000000" + feeTwoTRUF = "2000000000000000000" + feeFourTRUF = "4000000000000000000" + feeFortyTRUF = "40000000000000000000" + transferAmount = "5000000000000000000" + withdrawAmount = "10000000000000000000" + initialUserFunds = "200000000000000000000" +) + +var ledgerPointCounter int64 = 20000 + +func TestTransactionEventsLedger(t *testing.T) { + testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "LEDGER_FEE01_TransactionEventsLedger", + SeedStatements: migrations.GetSeedScriptStatements(), + FunctionTests: []kwilTesting.TestFunc{ + runTransactionEventsLedgerScenario(t), + }, + }, testutils.GetTestOptionsWithCache()) +} + +func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + systemAdmin := util.Unsafe_NewEthereumAddressFromString("0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf") + platform.Deployer = systemAdmin.Bytes() + + require.NoError(t, setup.AddMemberToRoleBypass(ctx, platform, "system", "network_writers_manager", systemAdmin.Address())) + require.NoError(t, setup.CreateDataProvider(ctx, platform, systemAdmin.Address())) + + err := erc20shim.ForTestingSeedAndActivateInstance(ctx, platform, ledgerChain, ledgerEscrow, ledgerERC20, 18, 60, ledgerExtensionAlias) + if err != nil { + if !strings.Contains(err.Error(), "alias \"sepolia_bridge\" already exists") { + require.NoError(t, err, "failed to seed ERC-20 bridge instance") + } + require.NoError(t, erc20shim.ForTestingInitializeExtension(ctx, platform), "failed to reinitialize ERC-20 bridge extension") + } + require.NoError(t, erc20shim.ForTestingInitializeExtension(ctx, platform)) + + actorVal := util.Unsafe_NewEthereumAddressFromString("0x9999999999999999999999999999999999999999") + actor := &actorVal + require.NoError(t, setup.CreateDataProviderWithoutRole(ctx, platform, actor.Address())) + require.NoError(t, ledgerGiveBalance(ctx, platform, actor.Address(), initialUserFunds)) + + receiverVal := util.Unsafe_NewEthereumAddressFromString("0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") + receiver := &receiverVal + + attestationStream := util.GenerateStreamId("ledger_attestation_stream") + require.NoError(t, setup.CreateStream(ctx, platform, setup.StreamInfo{ + Type: setup.ContractTypePrimitive, + Locator: sdkTypes.StreamLocator{ + StreamId: attestationStream, + DataProvider: systemAdmin, + }, + })) + + userLower := strings.ToLower(actor.Address()) + height := int64(5) + + primitiveStream := util.GenerateStreamId("ledger_primitive_stream") + composedStream := util.GenerateStreamId("ledger_composed_stream") + + createLeaderPub, createLeaderAddr := newLeader(t) + createTx, err := callActionWithLeader(ctx, platform, actor, createLeaderPub, height, "create_streams", []any{ + []string{primitiveStream.String(), composedStream.String()}, + []string{"primitive", "composed"}, + }) + require.NoError(t, err) + height++ + + insertLeaderPub, insertLeaderAddr := newLeader(t) + insertValue, err := kwilTypes.ParseDecimalExplicit("10.5", 36, 18) + require.NoError(t, err) + insertTx, err := callActionWithLeader(ctx, platform, actor, insertLeaderPub, height, "insert_records", []any{ + []string{userLower}, + []string{primitiveStream.String()}, + []int64{1000}, + []*kwilTypes.Decimal{insertValue}, + }) + require.NoError(t, err) + height++ + + taxLeaderPub, taxLeaderAddr := newLeader(t) + weight, err := kwilTypes.ParseDecimalExplicit("1.0", 36, 18) + require.NoError(t, err) + taxTx, err := callActionWithLeader(ctx, platform, actor, taxLeaderPub, height, "insert_taxonomy", []any{ + userLower, + composedStream.String(), + []string{userLower}, + []string{primitiveStream.String()}, + []*kwilTypes.Decimal{weight}, + nil, + }) + require.NoError(t, err) + height++ + + transferLeaderPub, transferLeaderAddr := newLeader(t) + transferTx, err := callActionWithLeader(ctx, platform, actor, transferLeaderPub, height, "sepolia_transfer", []any{ + receiver.Address(), + transferAmount, + }) + require.NoError(t, err) + height++ + + withdrawLeaderPub, withdrawLeaderAddr := newLeader(t) + withdrawTx, err := callActionWithLeader(ctx, platform, actor, withdrawLeaderPub, height, "sepolia_bridge_tokens", []any{ + actor.Address(), + withdrawAmount, + }) + require.NoError(t, err) + height++ + + require.NoError(t, setup.AddMemberToRoleBypass(ctx, platform, "system", "network_writer", actor.Address())) + + argsBytes, err := tn_utils.EncodeActionArgs([]any{ + strings.ToLower(systemAdmin.Address()), + attestationStream.String(), + int64(0), + int64(999), + int64(99999), + false, + }) + require.NoError(t, err) + + attLeaderPub, attLeaderAddr := newLeader(t) + attTx, err := callActionWithLeader(ctx, platform, actor, attLeaderPub, height, "request_attestation", []any{ + strings.ToLower(systemAdmin.Address()), + attestationStream.String(), + "get_record", + argsBytes, + false, + nil, + }) + require.NoError(t, err) + + assertNoMetadata := func(meta metadataMap) { + require.Empty(t, meta, "expected no metadata for ledger event") + } + + expected := map[string]ledgerExpectation{ + createTx: { + method: "deployStream", + fee: feeFourTRUF, + feeRecipient: createLeaderAddr, + feeDistribution: buildDistribution(createLeaderAddr, feeFourTRUF), + assertMetadata: assertNoMetadata, + }, + insertTx: { + method: "insertRecords", + fee: feeTwoTRUF, + feeRecipient: insertLeaderAddr, + feeDistribution: buildDistribution(insertLeaderAddr, feeTwoTRUF), + assertMetadata: assertNoMetadata, + }, + taxTx: { + method: "setTaxonomies", + fee: feeTwoTRUF, + feeRecipient: taxLeaderAddr, + feeDistribution: buildDistribution(taxLeaderAddr, feeTwoTRUF), + assertMetadata: assertNoMetadata, + }, + transferTx: { + method: "transferTN", + fee: feeOneTRUF, + feeRecipient: transferLeaderAddr, + feeDistribution: buildDistribution(transferLeaderAddr, feeOneTRUF), + assertMetadata: assertNoMetadata, + }, + withdrawTx: { + method: "withdrawTN", + fee: feeFortyTRUF, + feeRecipient: withdrawLeaderAddr, + feeDistribution: buildDistribution(withdrawLeaderAddr, feeFortyTRUF), + assertMetadata: assertNoMetadata, + }, + attTx: { + method: "requestAttestation", + fee: feeFortyTRUF, + feeRecipient: attLeaderAddr, + feeDistribution: buildDistribution(attLeaderAddr, feeFortyTRUF), + assertMetadata: assertNoMetadata, + }, + } + + paidRows, err := fetchTransactionFees(ctx, platform, actor.Address(), userLower, "paid") + require.NoError(t, err) + require.Len(t, paidRows, len(expected)) + for _, row := range paidRows { + exp, ok := expected[row.TxID] + require.True(t, ok, "unexpected tx in paid results: %s", row.TxID) + require.Equal(t, exp.method, row.Method) + require.Equal(t, userLower, row.Caller) + require.Equal(t, exp.fee, row.FeeAmount) + require.Equal(t, exp.feeRecipient, row.FeeRecipient) + require.Equal(t, exp.feeDistribution, row.FeeDistributions) + t.Logf("metadata for %s (%s): raw=%s parsed=%v", row.TxID, row.Method, row.RawMetadata, row.Metadata) + exp.assertMetadata(row.Metadata) + } + + bothRows, err := fetchTransactionFees(ctx, platform, actor.Address(), userLower, "both") + require.NoError(t, err) + require.Len(t, bothRows, len(expected)) + + withdrawReceivedRows, err := fetchTransactionFees(ctx, platform, actor.Address(), withdrawLeaderAddr, "received") + require.NoError(t, err) + require.Len(t, withdrawReceivedRows, 1) + require.Equal(t, withdrawTx, withdrawReceivedRows[0].TxID) + require.Equal(t, withdrawLeaderAddr, withdrawReceivedRows[0].FeeRecipient) + require.Equal(t, userLower, withdrawReceivedRows[0].Caller) + + lastTxRows, err := fetchLastTransactions(ctx, platform, actor.Address(), userLower, int64(len(expected))) + require.NoError(t, err) + require.Len(t, lastTxRows, len(expected)) + for _, row := range lastTxRows { + exp, ok := expected[row.TxID] + require.True(t, ok, "unexpected tx in last transactions: %s", row.TxID) + require.Equal(t, exp.method, row.Method) + require.Equal(t, exp.fee, row.FeeAmount) + require.Equal(t, exp.feeRecipient, row.FeeRecipient) + require.Equal(t, exp.feeDistribution, row.FeeDistributions) + exp.assertMetadata(row.Metadata) + } + + for txID, exp := range expected { + eventRow, err := fetchTransactionEvent(ctx, platform, actor.Address(), txID) + require.NoError(t, err, "failed to fetch transaction event for %s", txID) + require.Equal(t, exp.method, eventRow.Method) + require.Equal(t, exp.fee, eventRow.FeeAmount) + require.Equal(t, exp.feeRecipient, eventRow.FeeRecipient) + require.Equal(t, exp.feeDistribution, eventRow.FeeDistributions) + exp.assertMetadata(eventRow.Metadata) + } + + return nil + } +} + +type ledgerExpectation struct { + method string + fee string + feeRecipient string + feeDistribution string + assertMetadata func(meta metadataMap) +} + +type ledgerRow struct { + TxID string + Method string + Caller string + FeeAmount string + FeeRecipient string + Metadata metadataMap + FeeDistributions string + RawMetadata string +} + +func newLeader(t *testing.T) (*crypto.Secp256k1PublicKey, string) { + t.Helper() + _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) + require.NoError(t, err) + pub := pubGeneric.(*crypto.Secp256k1PublicKey) + return pub, addressFromPub(pub) +} + +func addressFromPub(pub *crypto.Secp256k1PublicKey) string { + return fmt.Sprintf("0x%x", crypto.EthereumAddressFromPubKey(pub)) +} + +func callActionWithLeader(ctx context.Context, platform *kwilTesting.Platform, signer *util.EthereumAddress, leaderPub *crypto.Secp256k1PublicKey, height int64, action string, args []any) (string, error) { + txID := strings.ToLower(platform.Txid()) + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: height, + Proposer: leaderPub, + }, + Signer: signer.Bytes(), + Caller: signer.Address(), + TxID: txID, + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx} + res, err := platform.Engine.Call(engineCtx, platform.DB, "", action, args, func(row *common.Row) error { return nil }) + if err != nil { + return "", err + } + if res != nil && res.Error != nil { + return "", res.Error + } + return "0x" + txID, nil +} + +func fetchTransactionFees(ctx context.Context, platform *kwilTesting.Platform, caller string, wallet string, mode string) ([]ledgerRow, error) { + rows := make([]ledgerRow, 0, 8) + args := []any{strings.ToLower(wallet), mode, 20, 0} + err := callView(ctx, platform, caller, "list_transaction_fees", args, func(row *common.Row) error { + feeRecipient := "" + if row.Values[5] != nil { + feeRecipient = strings.ToLower(row.Values[5].(string)) + } + + rawMetadata := stringOrEmpty(row.Values[6]) + meta := parseMetadata(row.Values[6]) + + rows = append(rows, ledgerRow{ + TxID: row.Values[0].(string), + Method: row.Values[2].(string), + Caller: strings.ToLower(row.Values[3].(string)), + FeeAmount: decimalToString(row.Values[4]), + FeeRecipient: feeRecipient, + Metadata: meta, + FeeDistributions: stringOrEmpty(row.Values[7]), + RawMetadata: rawMetadata, + }) + return nil + }) + return rows, err +} + +func fetchLastTransactions(ctx context.Context, platform *kwilTesting.Platform, caller string, dataProvider string, limit int64) ([]ledgerRow, error) { + rows := make([]ledgerRow, 0, 8) + args := []any{dataProvider, limit} + err := callView(ctx, platform, caller, "get_last_transactions_v2", args, func(row *common.Row) error { + feeRecipient := "" + if row.Values[5] != nil { + feeRecipient = strings.ToLower(row.Values[5].(string)) + } + + rawMetadata := stringOrEmpty(row.Values[6]) + rows = append(rows, ledgerRow{ + TxID: row.Values[0].(string), + Method: row.Values[2].(string), + Caller: strings.ToLower(row.Values[3].(string)), + FeeAmount: decimalToString(row.Values[4]), + FeeRecipient: feeRecipient, + Metadata: parseMetadata(row.Values[6]), + FeeDistributions: stringOrEmpty(row.Values[7]), + RawMetadata: rawMetadata, + }) + return nil + }) + return rows, err +} + +func fetchTransactionEvent(ctx context.Context, platform *kwilTesting.Platform, caller string, txID string) (ledgerRow, error) { + var result ledgerRow + err := callView(ctx, platform, caller, "get_transaction_event", []any{txID}, func(row *common.Row) error { + feeRecipient := "" + if row.Values[5] != nil { + feeRecipient = strings.ToLower(row.Values[5].(string)) + } + + rawMetadata := stringOrEmpty(row.Values[6]) + result = ledgerRow{ + TxID: row.Values[0].(string), + Method: row.Values[2].(string), + Caller: strings.ToLower(row.Values[3].(string)), + FeeAmount: decimalToString(row.Values[4]), + FeeRecipient: feeRecipient, + Metadata: parseMetadata(row.Values[6]), + FeeDistributions: stringOrEmpty(row.Values[7]), + RawMetadata: rawMetadata, + } + return nil + }) + return result, err +} + +func callView(ctx context.Context, platform *kwilTesting.Platform, caller string, action string, args []any, fn func(*common.Row) error) error { + address, err := util.NewEthereumAddressFromBytes(platform.Deployer) + if err != nil { + return err + } + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 100, + }, + Signer: address.Bytes(), + Caller: strings.ToLower(caller), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx} + res, err := platform.Engine.Call(engineCtx, platform.DB, "", action, args, fn) + if err != nil { + return err + } + if res != nil && res.Error != nil { + return res.Error + } + return nil +} + +func decimalToString(val any) string { + if val == nil { + return "" + } + dec, ok := val.(*kwilTypes.Decimal) + if !ok { + panic(fmt.Sprintf("expected *types.Decimal, got %T", val)) + } + return dec.String() +} + +type metadataMap map[string]any + +func parseMetadata(val any) metadataMap { + if val == nil { + return metadataMap{} + } + + switch v := val.(type) { + case string: + if v == "" { + return metadataMap{} + } + var m metadataMap + if err := json.Unmarshal([]byte(v), &m); err != nil { + panic(fmt.Sprintf("failed to parse metadata %q: %v", v, err)) + } + return m + default: + panic(fmt.Sprintf("unexpected metadata type %T", v)) + } +} + +func (m metadataMap) String(key string) string { + val, ok := m[key] + if !ok { + return "" + } + str, ok := val.(string) + if !ok { + panic(fmt.Sprintf("metadata %s is not string: %T", key, val)) + } + return strings.ToLower(str) +} + +func (m metadataMap) Bool(key string) bool { + val, ok := m[key] + if !ok { + return false + } + b, ok := val.(bool) + if !ok { + panic(fmt.Sprintf("metadata %s is not bool: %T", key, val)) + } + return b +} + +func (m metadataMap) Int(key string) int64 { + val, ok := m[key] + if !ok { + return 0 + } + switch num := val.(type) { + case float64: + return int64(num) + case json.Number: + i, err := num.Int64() + if err != nil { + panic(fmt.Sprintf("metadata %s invalid json number: %v", key, err)) + } + return i + default: + panic(fmt.Sprintf("metadata %s is not numeric: %T", key, val)) + } +} + +func (m metadataMap) StringSlice(key string) []string { + val, ok := m[key] + if !ok { + return nil + } + arr, ok := val.([]any) + if !ok { + panic(fmt.Sprintf("metadata %s is not array: %T", key, val)) + } + out := make([]string, 0, len(arr)) + for _, v := range arr { + str, ok := v.(string) + if !ok { + panic(fmt.Sprintf("metadata array %s contains non-string %T", key, v)) + } + out = append(out, strings.ToLower(str)) + } + return out +} + +func (m metadataMap) Optional(key string) any { + val, ok := m[key] + if !ok { + return nil + } + if val == nil { + return nil + } + return val +} + +func stringOrEmpty(val any) string { + if val == nil { + return "" + } + return val.(string) +} + +func buildDistribution(recipient, amount string) string { + if amount == "" || recipient == "" { + return "" + } + return strings.ToLower(recipient) + ":" + amount +} + +func ledgerGiveBalance(ctx context.Context, platform *kwilTesting.Platform, wallet string, amount string) error { + ledgerPointCounter++ + return testerc20.InjectERC20Transfer( + ctx, + platform, + ledgerChain, + ledgerEscrow, + ledgerERC20, + wallet, + wallet, + amount, + ledgerPointCounter, + nil, + ) +} From 33866022d1cfe6edc274193ce678076355017909 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 15:03:00 -0300 Subject: [PATCH 03/11] chore: update transaction event views and enhance fee distribution handling This commit refines the `get_last_transactions_v2` action to improve the retrieval of transaction details, including fee distributions. The implementation now aggregates fee distribution data from the `transaction_event_distributions` table, ensuring accurate reporting of fees associated with each transaction. Additionally, the `transaction_events_ledger_test.go` file has been updated to reflect these changes, enhancing test coverage for the new distribution logic and ensuring the integrity of transaction data retrieval. --- .../010-get-latest-write-activity.sql | 52 +--- .../migrations/026-transaction-schemas.sql | 2 + .../migrations/027-transaction-actions.sql | 273 ++++++++++++++---- .../streams/transaction_events_ledger_test.go | 143 ++++++--- 4 files changed, 342 insertions(+), 128 deletions(-) diff --git a/internal/migrations/010-get-latest-write-activity.sql b/internal/migrations/010-get-latest-write-activity.sql index e6fe19a64..33ebd4b74 100644 --- a/internal/migrations/010-get-latest-write-activity.sql +++ b/internal/migrations/010-get-latest-write-activity.sql @@ -2,7 +2,7 @@ * Transaction history views * * get_last_transactions_v1 - legacy implementation (no fee/caller metadata) - * get_last_transactions_v2 - ledger-backed implementation with fee details + * get_last_transactions_v2 - ledger-backed implementation (redefined later in migration 027) * get_last_transactions - temporary wrapper returning the v2 signature but * still sourcing data from v1. This will be replaced * with v2 once callers migrate. @@ -95,48 +95,18 @@ CREATE OR REPLACE ACTION get_last_transactions_v2( metadata TEXT, fee_distributions TEXT ) { - $normalized_provider TEXT := NULL; - IF COALESCE($data_provider, '') != '' { - $normalized_provider := LOWER($data_provider); - IF NOT check_ethereum_address($normalized_provider) { - ERROR('Invalid data provider address. Must be a valid Ethereum address: ' || $data_provider); - } - } - - IF $limit_size IS NULL OR $limit_size <= 0 { - $limit_size := 6; - } - - IF $limit_size > 100 { - ERROR('Limit size cannot exceed 100'); - } - + -- Placeholder implementation: will be replaced by ledger-backed view in migration 027. RETURN - WITH distributions AS ( - SELECT - tx_id, - string_agg( - recipient || ':' || amount::TEXT, - ',' ORDER BY sequence ASC - ) AS fee_distributions - FROM transaction_event_distributions - GROUP BY tx_id - ) SELECT - te.tx_id, - te.block_height AS created_at, - tm.name AS method, - te.caller, - te.fee_amount, - te.fee_recipient, - te.metadata, - COALESCE(d.fee_distributions, '') AS fee_distributions - FROM transaction_events te - JOIN transaction_methods tm ON tm.method_id = te.method_id - LEFT JOIN distributions d ON d.tx_id = te.tx_id - WHERE $normalized_provider IS NULL OR te.caller = $normalized_provider - ORDER BY te.block_height DESC, te.tx_id DESC - LIMIT $limit_size; + NULL::TEXT AS tx_id, + lt.created_at, + lt.method, + NULL::TEXT AS caller, + NULL::NUMERIC(78, 0) AS fee_amount, + NULL::TEXT AS fee_recipient, + NULL::TEXT AS metadata, + ''::TEXT AS fee_distributions + FROM get_last_transactions_v1($data_provider, $limit_size) lt; }; CREATE OR REPLACE ACTION get_last_transactions( diff --git a/internal/migrations/026-transaction-schemas.sql b/internal/migrations/026-transaction-schemas.sql index d1e137d2d..eff9fc4d1 100644 --- a/internal/migrations/026-transaction-schemas.sql +++ b/internal/migrations/026-transaction-schemas.sql @@ -38,6 +38,8 @@ CREATE INDEX IF NOT EXISTS transaction_events_block_idx CREATE INDEX IF NOT EXISTS transaction_events_caller_idx ON transaction_events (caller, block_height, tx_id); +-- this table is the single source of truth for per-recipient breakdowns; downstream readers aggregate from it +-- E.g. attestations will soon pay a slice of the fee to each data provider that contributed to the data CREATE TABLE IF NOT EXISTS transaction_event_distributions ( tx_id TEXT NOT NULL REFERENCES transaction_events(tx_id) ON DELETE CASCADE, sequence INT NOT NULL, diff --git a/internal/migrations/027-transaction-actions.sql b/internal/migrations/027-transaction-actions.sql index 00f8374a2..de75c4419 100644 --- a/internal/migrations/027-transaction-actions.sql +++ b/internal/migrations/027-transaction-actions.sql @@ -51,6 +51,8 @@ CREATE OR REPLACE ACTION record_transaction_event( } IF !$event_exists { + -- Ledger rows are idempotent: only create the parent record once even if + -- record_transaction_event is invoked multiple times within the same tx. INSERT INTO transaction_events ( tx_id, block_height, @@ -68,7 +70,10 @@ CREATE OR REPLACE ACTION record_transaction_event( $recipient_lower, $metadata ); + $initial_summary TEXT := ''; IF $recipient_lower IS NOT NULL AND $amount > 0::NUMERIC(78, 0) { + -- transaction_event_distributions is the single source of truth for + -- per-recipient breakdowns; downstream readers aggregate from it. INSERT INTO transaction_event_distributions ( tx_id, sequence, @@ -82,7 +87,12 @@ CREATE OR REPLACE ACTION record_transaction_event( $amount, NULL ); + $initial_summary := $recipient_lower || ':' || $amount::TEXT; } + + INSERT INTO transaction_event_summaries (tx_id, fee_distributions) + VALUES ($tx_id, $initial_summary) + ON CONFLICT (tx_id) DO NOTHING; } }; @@ -134,6 +144,25 @@ CREATE OR REPLACE ACTION append_fee_distribution( NULL ); + $entry TEXT := $recipient_lower || ':' || $amount_sanitized::TEXT; + $updated BOOL := FALSE; + FOR $summary_row IN + UPDATE transaction_event_summaries + SET fee_distributions = CASE + WHEN fee_distributions = '' THEN $entry + ELSE fee_distributions || ',' || $entry + END + WHERE tx_id = $tx_id + RETURNING tx_id + { + $updated := TRUE; + } + + IF !$updated { + INSERT INTO transaction_event_summaries (tx_id, fee_distributions) + VALUES ($tx_id, $entry) + ON CONFLICT (tx_id) DO NOTHING; + } }; CREATE OR REPLACE ACTION get_transaction_event( @@ -157,32 +186,68 @@ CREATE OR REPLACE ACTION get_transaction_event( $tx_clean := '0x' || $tx_clean; } - RETURN - SELECT - te.tx_id, - te.block_height, - tm.name, - te.caller, - te.fee_amount, - te.fee_recipient, - te.metadata, - COALESCE(d.fee_distributions, '') AS fee_distributions - FROM transaction_events te - JOIN transaction_methods tm ON tm.method_id = te.method_id - LEFT JOIN ( + $out_tx_id TEXT := NULL; + $out_block_height INT8 := 0; + $out_method TEXT := NULL; + $out_caller TEXT := NULL; + $out_fee_amount NUMERIC(78, 0) := 0::NUMERIC(78, 0); + $out_fee_recipient TEXT := NULL; + $out_metadata TEXT := NULL; + + FOR $row IN SELECT - tx_id, - string_agg( - recipient || ':' || amount::TEXT, - ',' ORDER BY sequence ASC - ) AS fee_distributions + te.tx_id, + te.block_height, + tm.name AS method, + te.caller, + te.fee_amount, + te.fee_recipient, + te.metadata + FROM transaction_events te + JOIN transaction_methods tm ON tm.method_id = te.method_id + WHERE te.tx_id = $tx_clean + LIMIT 1 + { + $out_tx_id := $row.tx_id; + $out_block_height := $row.block_height; + $out_method := $row.method; + $out_caller := $row.caller; + $out_fee_amount := $row.fee_amount; + $out_fee_recipient := $row.fee_recipient; + $out_metadata := $row.metadata; + } + + IF $out_tx_id IS NULL { + RETURN; + } + + $distribution TEXT := ''; + FOR $dist IN + SELECT recipient, amount FROM transaction_event_distributions - GROUP BY tx_id - ) d ON d.tx_id = te.tx_id - WHERE te.tx_id = $tx_clean - LIMIT 1; + WHERE tx_id = $tx_clean + ORDER BY sequence ASC + { + $entry TEXT := $dist.recipient || ':' || $dist.amount::TEXT; + IF $distribution = '' { + $distribution := $entry; + } ELSE { + $distribution := $distribution || ',' || $entry; + } + } + + RETURN + $out_tx_id, + $out_block_height, + $out_method, + $out_caller, + $out_fee_amount, + $out_fee_recipient, + $out_metadata, + $distribution; }; + CREATE OR REPLACE ACTION list_transaction_fees( $wallet TEXT, $mode TEXT DEFAULT 'paid', @@ -196,6 +261,9 @@ CREATE OR REPLACE ACTION list_transaction_fees( total_fee NUMERIC(78, 0), fee_recipient TEXT, metadata TEXT, + distribution_sequence INT, + distribution_recipient TEXT, + distribution_amount NUMERIC(78, 0), fee_distributions TEXT ) { IF $wallet IS NULL OR trim($wallet) = '' { @@ -225,39 +293,38 @@ CREATE OR REPLACE ACTION list_transaction_fees( $offset_val := 0; } - RETURN - WITH distributions AS ( - SELECT - tx_id, - string_agg( - recipient || ':' || amount::TEXT, - ',' ORDER BY sequence ASC - ) AS fee_distributions - FROM transaction_event_distributions - GROUP BY tx_id - ), - filtered AS ( + RETURN SELECT + fe.tx_id, + fe.block_height, + fe.method, + fe.caller, + fe.fee_amount, + fe.fee_recipient, + fe.metadata, + COALESCE(ted.sequence, 0) AS distribution_sequence, + ted.recipient AS distribution_recipient, + ted.amount AS distribution_amount, + COALESCE(ts.fee_distributions, '') AS fee_distributions + FROM ( SELECT te.tx_id, te.block_height, tm.name AS method, - te.caller, + LOWER(te.caller) AS caller, te.fee_amount, te.fee_recipient, - te.metadata, - COALESCE(d.fee_distributions, '') AS fee_distributions + te.metadata FROM transaction_events te JOIN transaction_methods tm ON tm.method_id = te.method_id - LEFT JOIN distributions d ON d.tx_id = te.tx_id WHERE ($mode_normalized = 'paid' AND te.caller = $wallet_lower) OR ($mode_normalized = 'received' AND ( te.fee_recipient = $wallet_lower OR EXISTS ( SELECT 1 - FROM transaction_event_distributions ted - WHERE ted.tx_id = te.tx_id - AND ted.recipient = $wallet_lower + FROM transaction_event_distributions ted_inner + WHERE ted_inner.tx_id = te.tx_id + AND ted_inner.recipient = $wallet_lower ) )) OR ($mode_normalized = 'both' AND ( @@ -265,23 +332,123 @@ CREATE OR REPLACE ACTION list_transaction_fees( OR te.fee_recipient = $wallet_lower OR EXISTS ( SELECT 1 - FROM transaction_event_distributions ted - WHERE ted.tx_id = te.tx_id - AND ted.recipient = $wallet_lower + FROM transaction_event_distributions ted_inner + WHERE ted_inner.tx_id = te.tx_id + AND ted_inner.recipient = $wallet_lower ) )) - ) + ORDER BY te.block_height DESC, te.tx_id DESC + LIMIT $limit_val + OFFSET $offset_val + ) fe + LEFT JOIN transaction_event_distributions ted + ON ted.tx_id = fe.tx_id + LEFT JOIN transaction_event_summaries ts + ON ts.tx_id = fe.tx_id + ORDER BY fe.block_height DESC, + fe.tx_id DESC, + COALESCE(ted.sequence, 0) ASC; +}; + + + +CREATE OR REPLACE ACTION get_last_transactions_v2( + $data_provider TEXT, + $limit_size INT8 +) PUBLIC VIEW RETURNS TABLE( + tx_id TEXT, + created_at INT8, + method TEXT, + caller TEXT, + fee_amount NUMERIC(78, 0), + fee_recipient TEXT, + metadata TEXT, + fee_distributions TEXT +) { + $normalized_provider TEXT := NULL; + IF COALESCE($data_provider, '') != '' { + $normalized_provider := LOWER($data_provider); + IF NOT check_ethereum_address($normalized_provider) { + ERROR('Invalid data provider address. Must be a valid Ethereum address: ' || $data_provider); + } + } + + $limit_val INT := COALESCE($limit_size, 6); + IF $limit_val <= 0 { + $limit_val := 6; + } + + IF $limit_val > 100 { + ERROR('Limit size cannot exceed 100'); + } + + $tx_ids TEXT[] := '{}'::TEXT[]; + $block_heights TEXT[] := '{}'::TEXT[]; + $methods TEXT[] := '{}'::TEXT[]; + $callers TEXT[] := '{}'::TEXT[]; + $fee_amounts TEXT[] := '{}'::TEXT[]; + $fee_recipients TEXT[] := '{}'::TEXT[]; + $metadata_values TEXT[] := '{}'::TEXT[]; + $fee_summaries TEXT[] := '{}'::TEXT[]; + + FOR $row IN + SELECT + te.tx_id, + te.block_height, + tm.name AS method, + te.caller, + te.fee_amount, + te.fee_recipient, + te.metadata + FROM transaction_events te + JOIN transaction_methods tm ON tm.method_id = te.method_id + WHERE $normalized_provider IS NULL OR te.caller = $normalized_provider + ORDER BY te.block_height DESC, te.tx_id DESC + LIMIT $limit_val + { + $summary TEXT := ''; + FOR $dist IN + SELECT recipient, amount + FROM transaction_event_distributions + WHERE tx_id = $row.tx_id + ORDER BY sequence ASC + { + $entry TEXT := $dist.recipient || ':' || $dist.amount::TEXT; + IF $summary = '' { + $summary := $entry; + } ELSE { + $summary := $summary || ',' || $entry; + } + } + + $tx_ids := array_append($tx_ids, $row.tx_id); + $block_heights := array_append($block_heights, $row.block_height::TEXT); + $methods := array_append($methods, $row.method); + $callers := array_append($callers, LOWER($row.caller)); + $fee_amounts := array_append($fee_amounts, $row.fee_amount::TEXT); + $fee_recipients := array_append($fee_recipients, COALESCE($row.fee_recipient, '')); + $metadata_values := array_append($metadata_values, COALESCE($row.metadata, '')); + $fee_summaries := array_append($fee_summaries, $summary); + } + + RETURN SELECT tx_id, - block_height, + created_at::INT8, method, caller, - fee_amount AS total_fee, - fee_recipient, - metadata, + fee_amount::NUMERIC(78, 0), + NULLIF(fee_recipient, ''), + NULLIF(metadata, ''), fee_distributions - FROM filtered - ORDER BY block_height DESC, tx_id DESC - LIMIT $limit_val - OFFSET $offset_val; + FROM unnest( + $tx_ids, + $block_heights, + $methods, + $callers, + $fee_amounts, + $fee_recipients, + $metadata_values, + $fee_summaries + ) AS entries(tx_id, created_at, method, caller, fee_amount, fee_recipient, metadata, fee_distributions); }; diff --git a/tests/streams/transaction_events_ledger_test.go b/tests/streams/transaction_events_ledger_test.go index 954986ad8..6c7236408 100644 --- a/tests/streams/transaction_events_ledger_test.go +++ b/tests/streams/transaction_events_ledger_test.go @@ -224,13 +224,22 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, require.Equal(t, userLower, row.Caller) require.Equal(t, exp.fee, row.FeeAmount) require.Equal(t, exp.feeRecipient, row.FeeRecipient) - require.Equal(t, exp.feeDistribution, row.FeeDistributions) + require.Equal(t, exp.feeDistribution, buildDistribution(row.DistributionRecipient, row.DistributionAmount)) t.Logf("metadata for %s (%s): raw=%s parsed=%v", row.TxID, row.Method, row.RawMetadata, row.Metadata) exp.assertMetadata(row.Metadata) } bothRows, err := fetchTransactionFees(ctx, platform, actor.Address(), userLower, "both") require.NoError(t, err) + debugBothCount := 0 + err = callView(ctx, platform, actor.Address(), "list_transaction_fees", []any{userLower, "both", 20, 0}, func(row *common.Row) error { + debugBothCount++ + t.Logf("both row raw: %+v", row.Values) + return nil + }) + require.NoError(t, err) + t.Logf("both count raw: %d", debugBothCount) + t.Logf("bothRows len (helper): %d", len(bothRows)) require.Len(t, bothRows, len(expected)) withdrawReceivedRows, err := fetchTransactionFees(ctx, platform, actor.Address(), withdrawLeaderAddr, "received") @@ -242,6 +251,20 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, lastTxRows, err := fetchLastTransactions(ctx, platform, actor.Address(), userLower, int64(len(expected))) require.NoError(t, err) + debugLastCount := 0 + err = callView(ctx, platform, actor.Address(), "get_last_transactions_v2", []any{userLower, int64(len(expected))}, func(row *common.Row) error { + debugLastCount++ + t.Logf("last row raw: %+v", row.Values) + return nil + }) + require.NoError(t, err) + t.Logf("lastRows count raw: %d", debugLastCount) + err = callView(ctx, platform, actor.Address(), "get_last_transactions_v2", []any{"", int64(len(expected))}, func(row *common.Row) error { + t.Logf("last row (no filter): %+v", row.Values) + return nil + }) + require.NoError(t, err) + t.Logf("lastRows len (helper): %d", len(lastTxRows)) require.Len(t, lastTxRows, len(expected)) for _, row := range lastTxRows { exp, ok := expected[row.TxID] @@ -249,10 +272,25 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, require.Equal(t, exp.method, row.Method) require.Equal(t, exp.fee, row.FeeAmount) require.Equal(t, exp.feeRecipient, row.FeeRecipient) - require.Equal(t, exp.feeDistribution, row.FeeDistributions) + require.Equal(t, exp.feeDistribution, buildDistribution(row.DistributionRecipient, row.DistributionAmount)) exp.assertMetadata(row.Metadata) } + legacyRows, err := fetchLegacyLastTransactions(ctx, platform, actor.Address(), userLower, int64(len(expected))) + require.NoError(t, err) + require.NotEmpty(t, legacyRows) + + methodSet := make(map[string]struct{}) + for _, exp := range expected { + methodSet[exp.method] = struct{}{} + } + + for _, row := range legacyRows { + require.Greater(t, row.CreatedAt, int64(0)) + _, ok := methodSet[row.Method] + require.True(t, ok, "unexpected method in legacy view: %s", row.Method) + } + for txID, exp := range expected { eventRow, err := fetchTransactionEvent(ctx, platform, actor.Address(), txID) require.NoError(t, err, "failed to fetch transaction event for %s", txID) @@ -275,15 +313,24 @@ type ledgerExpectation struct { assertMetadata func(meta metadataMap) } +type legacyRow struct { + CreatedAt int64 + Method string +} + type ledgerRow struct { - TxID string - Method string - Caller string - FeeAmount string - FeeRecipient string - Metadata metadataMap - FeeDistributions string - RawMetadata string + TxID string + CreatedAt int64 + Method string + Caller string + FeeAmount string + FeeRecipient string + Metadata metadataMap + FeeDistributions string + DistributionSequence int + DistributionRecipient string + DistributionAmount string + RawMetadata string } func newLeader(t *testing.T) (*crypto.Secp256k1PublicKey, string) { @@ -334,15 +381,24 @@ func fetchTransactionFees(ctx context.Context, platform *kwilTesting.Platform, c rawMetadata := stringOrEmpty(row.Values[6]) meta := parseMetadata(row.Values[6]) + seq := int(row.Values[7].(int64)) + distRecipient := "" + if row.Values[8] != nil { + distRecipient = strings.ToLower(row.Values[8].(string)) + } + rows = append(rows, ledgerRow{ - TxID: row.Values[0].(string), - Method: row.Values[2].(string), - Caller: strings.ToLower(row.Values[3].(string)), - FeeAmount: decimalToString(row.Values[4]), - FeeRecipient: feeRecipient, - Metadata: meta, - FeeDistributions: stringOrEmpty(row.Values[7]), - RawMetadata: rawMetadata, + TxID: row.Values[0].(string), + CreatedAt: row.Values[1].(int64), + Method: row.Values[2].(string), + Caller: strings.ToLower(row.Values[3].(string)), + FeeAmount: decimalToString(row.Values[4]), + FeeRecipient: feeRecipient, + Metadata: meta, + DistributionSequence: seq, + DistributionRecipient: distRecipient, + DistributionAmount: decimalToString(row.Values[9]), + RawMetadata: rawMetadata, }) return nil }) @@ -360,14 +416,30 @@ func fetchLastTransactions(ctx context.Context, platform *kwilTesting.Platform, rawMetadata := stringOrEmpty(row.Values[6]) rows = append(rows, ledgerRow{ - TxID: row.Values[0].(string), - Method: row.Values[2].(string), - Caller: strings.ToLower(row.Values[3].(string)), - FeeAmount: decimalToString(row.Values[4]), - FeeRecipient: feeRecipient, - Metadata: parseMetadata(row.Values[6]), - FeeDistributions: stringOrEmpty(row.Values[7]), - RawMetadata: rawMetadata, + TxID: row.Values[0].(string), + CreatedAt: row.Values[1].(int64), + Method: row.Values[2].(string), + Caller: strings.ToLower(row.Values[3].(string)), + FeeAmount: decimalToString(row.Values[4]), + FeeRecipient: feeRecipient, + Metadata: parseMetadata(row.Values[6]), + DistributionSequence: int(row.Values[7].(int64)), + DistributionRecipient: strings.ToLower(stringOrEmpty(row.Values[8])), + DistributionAmount: decimalToString(row.Values[9]), + RawMetadata: rawMetadata, + }) + return nil + }) + return rows, err +} + +func fetchLegacyLastTransactions(ctx context.Context, platform *kwilTesting.Platform, caller string, dataProvider string, limit int64) ([]legacyRow, error) { + rows := make([]legacyRow, 0, 8) + args := []any{dataProvider, limit} + err := callView(ctx, platform, caller, "get_last_transactions_v1", args, func(row *common.Row) error { + rows = append(rows, legacyRow{ + CreatedAt: row.Values[0].(int64), + Method: row.Values[1].(string), }) return nil }) @@ -384,14 +456,17 @@ func fetchTransactionEvent(ctx context.Context, platform *kwilTesting.Platform, rawMetadata := stringOrEmpty(row.Values[6]) result = ledgerRow{ - TxID: row.Values[0].(string), - Method: row.Values[2].(string), - Caller: strings.ToLower(row.Values[3].(string)), - FeeAmount: decimalToString(row.Values[4]), - FeeRecipient: feeRecipient, - Metadata: parseMetadata(row.Values[6]), - FeeDistributions: stringOrEmpty(row.Values[7]), - RawMetadata: rawMetadata, + TxID: row.Values[0].(string), + Method: row.Values[2].(string), + Caller: strings.ToLower(row.Values[3].(string)), + FeeAmount: decimalToString(row.Values[4]), + FeeRecipient: feeRecipient, + Metadata: parseMetadata(row.Values[6]), + FeeDistributions: stringOrEmpty(row.Values[7]), + DistributionSequence: 0, + DistributionRecipient: "", + DistributionAmount: "", + RawMetadata: rawMetadata, } return nil }) From 3fb00029505ba9a8177c8256cd9a23a240ad2bc0 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 15:21:24 -0300 Subject: [PATCH 04/11] refactor: streamline transaction event actions and remove redundant fee handling This commit simplifies the `record_transaction_event` and `append_fee_distribution` actions by removing unnecessary variables and redundant logic related to fee summaries. The `list_transaction_fees` and `get_last_transactions_v2` actions have been updated to enhance clarity and efficiency in retrieving transaction details, including fee distributions. Additionally, the test suite has been cleaned up to remove outdated debug logging, ensuring a more focused and efficient testing process. --- .../migrations/027-transaction-actions.sql | 132 +++++------------- .../streams/transaction_events_ledger_test.go | 23 --- 2 files changed, 33 insertions(+), 122 deletions(-) diff --git a/internal/migrations/027-transaction-actions.sql b/internal/migrations/027-transaction-actions.sql index de75c4419..4a4ec1337 100644 --- a/internal/migrations/027-transaction-actions.sql +++ b/internal/migrations/027-transaction-actions.sql @@ -1,9 +1,3 @@ -/** - * ============================================================================= - * Transaction Ledger Helper Actions - * ============================================================================= - */ - CREATE OR REPLACE ACTION record_transaction_event( -- check internal/migrations/026-transaction-schemas.sql for method_id values $method_id INT, @@ -70,7 +64,6 @@ CREATE OR REPLACE ACTION record_transaction_event( $recipient_lower, $metadata ); - $initial_summary TEXT := ''; IF $recipient_lower IS NOT NULL AND $amount > 0::NUMERIC(78, 0) { -- transaction_event_distributions is the single source of truth for -- per-recipient breakdowns; downstream readers aggregate from it. @@ -87,12 +80,7 @@ CREATE OR REPLACE ACTION record_transaction_event( $amount, NULL ); - $initial_summary := $recipient_lower || ':' || $amount::TEXT; } - - INSERT INTO transaction_event_summaries (tx_id, fee_distributions) - VALUES ($tx_id, $initial_summary) - ON CONFLICT (tx_id) DO NOTHING; } }; @@ -143,26 +131,6 @@ CREATE OR REPLACE ACTION append_fee_distribution( $amount_sanitized, NULL ); - - $entry TEXT := $recipient_lower || ':' || $amount_sanitized::TEXT; - $updated BOOL := FALSE; - FOR $summary_row IN - UPDATE transaction_event_summaries - SET fee_distributions = CASE - WHEN fee_distributions = '' THEN $entry - ELSE fee_distributions || ',' || $entry - END - WHERE tx_id = $tx_id - RETURNING tx_id - { - $updated := TRUE; - } - - IF !$updated { - INSERT INTO transaction_event_summaries (tx_id, fee_distributions) - VALUES ($tx_id, $entry) - ON CONFLICT (tx_id) DO NOTHING; - } }; CREATE OR REPLACE ACTION get_transaction_event( @@ -263,8 +231,7 @@ CREATE OR REPLACE ACTION list_transaction_fees( metadata TEXT, distribution_sequence INT, distribution_recipient TEXT, - distribution_amount NUMERIC(78, 0), - fee_distributions TEXT + distribution_amount NUMERIC(78, 0) ) { IF $wallet IS NULL OR trim($wallet) = '' { ERROR('wallet is required'); @@ -293,6 +260,10 @@ CREATE OR REPLACE ACTION list_transaction_fees( $offset_val := 0; } + $mode_is_paid BOOL := $mode_normalized = 'paid'; + $mode_is_received BOOL := $mode_normalized = 'received'; + $mode_is_both BOOL := $mode_normalized = 'both'; + RETURN SELECT fe.tx_id, fe.block_height, @@ -303,8 +274,7 @@ CREATE OR REPLACE ACTION list_transaction_fees( fe.metadata, COALESCE(ted.sequence, 0) AS distribution_sequence, ted.recipient AS distribution_recipient, - ted.amount AS distribution_amount, - COALESCE(ts.fee_distributions, '') AS fee_distributions + ted.amount AS distribution_amount FROM ( SELECT te.tx_id, @@ -317,8 +287,8 @@ CREATE OR REPLACE ACTION list_transaction_fees( FROM transaction_events te JOIN transaction_methods tm ON tm.method_id = te.method_id WHERE - ($mode_normalized = 'paid' AND te.caller = $wallet_lower) - OR ($mode_normalized = 'received' AND ( + ($mode_is_paid AND te.caller = $wallet_lower) + OR ($mode_is_received AND ( te.fee_recipient = $wallet_lower OR EXISTS ( SELECT 1 @@ -327,7 +297,7 @@ CREATE OR REPLACE ACTION list_transaction_fees( AND ted_inner.recipient = $wallet_lower ) )) - OR ($mode_normalized = 'both' AND ( + OR ($mode_is_both AND ( te.caller = $wallet_lower OR te.fee_recipient = $wallet_lower OR EXISTS ( @@ -343,8 +313,6 @@ CREATE OR REPLACE ACTION list_transaction_fees( ) fe LEFT JOIN transaction_event_distributions ted ON ted.tx_id = fe.tx_id - LEFT JOIN transaction_event_summaries ts - ON ts.tx_id = fe.tx_id ORDER BY fe.block_height DESC, fe.tx_id DESC, COALESCE(ted.sequence, 0) ASC; @@ -363,7 +331,9 @@ CREATE OR REPLACE ACTION get_last_transactions_v2( fee_amount NUMERIC(78, 0), fee_recipient TEXT, metadata TEXT, - fee_distributions TEXT + distribution_sequence INT, + distribution_recipient TEXT, + distribution_amount NUMERIC(78, 0) ) { $normalized_provider TEXT := NULL; IF COALESCE($data_provider, '') != '' { @@ -382,73 +352,37 @@ CREATE OR REPLACE ACTION get_last_transactions_v2( ERROR('Limit size cannot exceed 100'); } - $tx_ids TEXT[] := '{}'::TEXT[]; - $block_heights TEXT[] := '{}'::TEXT[]; - $methods TEXT[] := '{}'::TEXT[]; - $callers TEXT[] := '{}'::TEXT[]; - $fee_amounts TEXT[] := '{}'::TEXT[]; - $fee_recipients TEXT[] := '{}'::TEXT[]; - $metadata_values TEXT[] := '{}'::TEXT[]; - $fee_summaries TEXT[] := '{}'::TEXT[]; - - FOR $row IN + RETURN + WITH limited_events AS ( SELECT te.tx_id, te.block_height, tm.name AS method, - te.caller, + LOWER(te.caller) AS caller, te.fee_amount, te.fee_recipient, te.metadata FROM transaction_events te JOIN transaction_methods tm ON tm.method_id = te.method_id - WHERE $normalized_provider IS NULL OR te.caller = $normalized_provider + WHERE $normalized_provider IS NULL OR LOWER(te.caller) = $normalized_provider ORDER BY te.block_height DESC, te.tx_id DESC LIMIT $limit_val - { - $summary TEXT := ''; - FOR $dist IN - SELECT recipient, amount - FROM transaction_event_distributions - WHERE tx_id = $row.tx_id - ORDER BY sequence ASC - { - $entry TEXT := $dist.recipient || ':' || $dist.amount::TEXT; - IF $summary = '' { - $summary := $entry; - } ELSE { - $summary := $summary || ',' || $entry; - } - } - - $tx_ids := array_append($tx_ids, $row.tx_id); - $block_heights := array_append($block_heights, $row.block_height::TEXT); - $methods := array_append($methods, $row.method); - $callers := array_append($callers, LOWER($row.caller)); - $fee_amounts := array_append($fee_amounts, $row.fee_amount::TEXT); - $fee_recipients := array_append($fee_recipients, COALESCE($row.fee_recipient, '')); - $metadata_values := array_append($metadata_values, COALESCE($row.metadata, '')); - $fee_summaries := array_append($fee_summaries, $summary); - } - - RETURN + ) SELECT - tx_id, - created_at::INT8, - method, - caller, - fee_amount::NUMERIC(78, 0), - NULLIF(fee_recipient, ''), - NULLIF(metadata, ''), - fee_distributions - FROM unnest( - $tx_ids, - $block_heights, - $methods, - $callers, - $fee_amounts, - $fee_recipients, - $metadata_values, - $fee_summaries - ) AS entries(tx_id, created_at, method, caller, fee_amount, fee_recipient, metadata, fee_distributions); + le.tx_id, + le.block_height, + le.method, + le.caller, + le.fee_amount, + le.fee_recipient, + le.metadata, + COALESCE(ted.sequence, 0) AS distribution_sequence, + ted.recipient AS distribution_recipient, + ted.amount AS distribution_amount + FROM limited_events le + LEFT JOIN transaction_event_distributions ted + ON ted.tx_id = le.tx_id + ORDER BY le.block_height DESC, + le.tx_id DESC, + COALESCE(ted.sequence, 0) ASC; }; diff --git a/tests/streams/transaction_events_ledger_test.go b/tests/streams/transaction_events_ledger_test.go index 6c7236408..7225d4200 100644 --- a/tests/streams/transaction_events_ledger_test.go +++ b/tests/streams/transaction_events_ledger_test.go @@ -231,15 +231,6 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, bothRows, err := fetchTransactionFees(ctx, platform, actor.Address(), userLower, "both") require.NoError(t, err) - debugBothCount := 0 - err = callView(ctx, platform, actor.Address(), "list_transaction_fees", []any{userLower, "both", 20, 0}, func(row *common.Row) error { - debugBothCount++ - t.Logf("both row raw: %+v", row.Values) - return nil - }) - require.NoError(t, err) - t.Logf("both count raw: %d", debugBothCount) - t.Logf("bothRows len (helper): %d", len(bothRows)) require.Len(t, bothRows, len(expected)) withdrawReceivedRows, err := fetchTransactionFees(ctx, platform, actor.Address(), withdrawLeaderAddr, "received") @@ -251,20 +242,6 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, lastTxRows, err := fetchLastTransactions(ctx, platform, actor.Address(), userLower, int64(len(expected))) require.NoError(t, err) - debugLastCount := 0 - err = callView(ctx, platform, actor.Address(), "get_last_transactions_v2", []any{userLower, int64(len(expected))}, func(row *common.Row) error { - debugLastCount++ - t.Logf("last row raw: %+v", row.Values) - return nil - }) - require.NoError(t, err) - t.Logf("lastRows count raw: %d", debugLastCount) - err = callView(ctx, platform, actor.Address(), "get_last_transactions_v2", []any{"", int64(len(expected))}, func(row *common.Row) error { - t.Logf("last row (no filter): %+v", row.Values) - return nil - }) - require.NoError(t, err) - t.Logf("lastRows len (helper): %d", len(lastTxRows)) require.Len(t, lastTxRows, len(expected)) for _, row := range lastTxRows { exp, ok := expected[row.TxID] From 54d787a6fde8a55dd889e77bfdc63108e1d9653d Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 15:59:39 -0300 Subject: [PATCH 05/11] refactor: enhance transaction fee handling and test coverage This commit updates the `list_transaction_fees` action to improve the handling of transaction event distributions by introducing a subquery for filtering recipients based on the transaction mode. Additionally, the test suite for transaction events has been expanded to include checks for bonus fee distributions and ensure the integrity of transaction data retrieval. These changes enhance clarity and accuracy in fee reporting across various transaction scenarios. --- .../migrations/027-transaction-actions.sql | 12 +- .../streams/transaction_events_ledger_test.go | 211 +++++++++++++----- 2 files changed, 163 insertions(+), 60 deletions(-) diff --git a/internal/migrations/027-transaction-actions.sql b/internal/migrations/027-transaction-actions.sql index 4a4ec1337..cd7d1c949 100644 --- a/internal/migrations/027-transaction-actions.sql +++ b/internal/migrations/027-transaction-actions.sql @@ -311,7 +311,17 @@ CREATE OR REPLACE ACTION list_transaction_fees( LIMIT $limit_val OFFSET $offset_val ) fe - LEFT JOIN transaction_event_distributions ted + LEFT JOIN ( + SELECT + tx_id, + sequence, + recipient, + amount + FROM transaction_event_distributions + WHERE + NOT $mode_is_received + OR recipient = $wallet_lower + ) ted ON ted.tx_id = fe.tx_id ORDER BY fe.block_height DESC, fe.tx_id DESC, diff --git a/tests/streams/transaction_events_ledger_test.go b/tests/streams/transaction_events_ledger_test.go index 7225d4200..619e5bb7c 100644 --- a/tests/streams/transaction_events_ledger_test.go +++ b/tests/streams/transaction_events_ledger_test.go @@ -76,6 +76,8 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, receiverVal := util.Unsafe_NewEthereumAddressFromString("0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") receiver := &receiverVal + bonusRecipientVal := util.Unsafe_NewEthereumAddressFromString("0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB") + bonusRecipientLower := strings.ToLower(bonusRecipientVal.Address()) attestationStream := util.GenerateStreamId("ledger_attestation_stream") require.NoError(t, setup.CreateStream(ctx, platform, setup.StreamInfo{ @@ -112,6 +114,41 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, require.NoError(t, err) height++ + tableCheck, err := platform.DB.Execute(ctx, + `SELECT table_schema::TEXT + FROM information_schema.tables + WHERE table_name = 'transaction_event_distributions'`, + ) + require.NoError(t, err) + require.NotEmpty(t, tableCheck.Rows, "transaction_event_distributions table missing") + + schemaName, ok := tableCheck.Rows[0][0].(string) + require.True(t, ok, "expected schema name as string") + t.Logf("ledger distribution table schema: %s", schemaName) + + updateSQL := fmt.Sprintf(` + UPDATE %s.transaction_event_distributions + SET amount = $1::NUMERIC(78, 0) + WHERE tx_id = $2 AND sequence = 1`, schemaName) + _, err = platform.DB.Execute(ctx, + updateSQL, + feeOneTRUF, + insertTx, + ) + require.NoError(t, err) + + insertSQL := fmt.Sprintf(` + INSERT INTO %s.transaction_event_distributions (tx_id, sequence, recipient, amount, note) + VALUES ($1, $2, $3, $4::NUMERIC(78, 0), NULL)`, schemaName) + _, err = platform.DB.Execute(ctx, + insertSQL, + insertTx, + 2, + bonusRecipientLower, + feeOneTRUF, + ) + require.NoError(t, err) + taxLeaderPub, taxLeaderAddr := newLeader(t) weight, err := kwilTypes.ParseDecimalExplicit("1.0", 36, 18) require.NoError(t, err) @@ -171,87 +208,142 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, expected := map[string]ledgerExpectation{ createTx: { - method: "deployStream", - fee: feeFourTRUF, - feeRecipient: createLeaderAddr, - feeDistribution: buildDistribution(createLeaderAddr, feeFourTRUF), - assertMetadata: assertNoMetadata, + method: "deployStream", + fee: feeFourTRUF, + feeRecipient: createLeaderAddr, + feeDistributions: []string{ + buildDistribution(createLeaderAddr, feeFourTRUF), + }, + assertMetadata: assertNoMetadata, }, insertTx: { - method: "insertRecords", - fee: feeTwoTRUF, - feeRecipient: insertLeaderAddr, - feeDistribution: buildDistribution(insertLeaderAddr, feeTwoTRUF), - assertMetadata: assertNoMetadata, + method: "insertRecords", + fee: feeTwoTRUF, + feeRecipient: insertLeaderAddr, + feeDistributions: []string{ + buildDistribution(insertLeaderAddr, feeOneTRUF), + buildDistribution(bonusRecipientLower, feeOneTRUF), + }, + assertMetadata: assertNoMetadata, }, taxTx: { - method: "setTaxonomies", - fee: feeTwoTRUF, - feeRecipient: taxLeaderAddr, - feeDistribution: buildDistribution(taxLeaderAddr, feeTwoTRUF), - assertMetadata: assertNoMetadata, + method: "setTaxonomies", + fee: feeTwoTRUF, + feeRecipient: taxLeaderAddr, + feeDistributions: []string{ + buildDistribution(taxLeaderAddr, feeTwoTRUF), + }, + assertMetadata: assertNoMetadata, }, transferTx: { - method: "transferTN", - fee: feeOneTRUF, - feeRecipient: transferLeaderAddr, - feeDistribution: buildDistribution(transferLeaderAddr, feeOneTRUF), - assertMetadata: assertNoMetadata, + method: "transferTN", + fee: feeOneTRUF, + feeRecipient: transferLeaderAddr, + feeDistributions: []string{ + buildDistribution(transferLeaderAddr, feeOneTRUF), + }, + assertMetadata: assertNoMetadata, }, withdrawTx: { - method: "withdrawTN", - fee: feeFortyTRUF, - feeRecipient: withdrawLeaderAddr, - feeDistribution: buildDistribution(withdrawLeaderAddr, feeFortyTRUF), - assertMetadata: assertNoMetadata, + method: "withdrawTN", + fee: feeFortyTRUF, + feeRecipient: withdrawLeaderAddr, + feeDistributions: []string{ + buildDistribution(withdrawLeaderAddr, feeFortyTRUF), + }, + assertMetadata: assertNoMetadata, }, attTx: { - method: "requestAttestation", - fee: feeFortyTRUF, - feeRecipient: attLeaderAddr, - feeDistribution: buildDistribution(attLeaderAddr, feeFortyTRUF), - assertMetadata: assertNoMetadata, + method: "requestAttestation", + fee: feeFortyTRUF, + feeRecipient: attLeaderAddr, + feeDistributions: []string{ + buildDistribution(attLeaderAddr, feeFortyTRUF), + }, + assertMetadata: assertNoMetadata, }, } + buildExpectedCounts := func() (map[string]map[string]int, int) { + counts := make(map[string]map[string]int, len(expected)) + total := 0 + for txID, exp := range expected { + distList := exp.feeDistributions + if len(distList) == 0 { + distList = []string{""} + } + counts[txID] = make(map[string]int, len(distList)) + for _, dist := range distList { + counts[txID][dist]++ + total++ + } + } + return counts, total + } + + assertLedgerRows := func(rows []ledgerRow) { + counts, total := buildExpectedCounts() + require.Len(t, rows, total) + seen := make(map[string]map[string]int, len(expected)) + for txID := range expected { + seen[txID] = make(map[string]int) + } + for _, row := range rows { + exp, ok := expected[row.TxID] + require.True(t, ok, "unexpected tx in results: %s", row.TxID) + require.Equal(t, exp.method, row.Method) + require.Equal(t, userLower, row.Caller) + require.Equal(t, exp.fee, row.FeeAmount) + require.Equal(t, exp.feeRecipient, row.FeeRecipient) + pair := buildDistribution(row.DistributionRecipient, row.DistributionAmount) + require.Contains(t, counts[row.TxID], pair, "unexpected distribution %s for tx %s", pair, row.TxID) + seen[row.TxID][pair]++ + t.Logf("metadata for %s (%s): raw=%s parsed=%v", row.TxID, row.Method, row.RawMetadata, row.Metadata) + exp.assertMetadata(row.Metadata) + } + for txID, expectedCounts := range counts { + for dist, want := range expectedCounts { + require.Equal(t, want, seen[txID][dist], "distribution %s for tx %s missing or duplicated", dist, txID) + } + } + } + paidRows, err := fetchTransactionFees(ctx, platform, actor.Address(), userLower, "paid") require.NoError(t, err) - require.Len(t, paidRows, len(expected)) - for _, row := range paidRows { - exp, ok := expected[row.TxID] - require.True(t, ok, "unexpected tx in paid results: %s", row.TxID) - require.Equal(t, exp.method, row.Method) - require.Equal(t, userLower, row.Caller) - require.Equal(t, exp.fee, row.FeeAmount) - require.Equal(t, exp.feeRecipient, row.FeeRecipient) - require.Equal(t, exp.feeDistribution, buildDistribution(row.DistributionRecipient, row.DistributionAmount)) - t.Logf("metadata for %s (%s): raw=%s parsed=%v", row.TxID, row.Method, row.RawMetadata, row.Metadata) - exp.assertMetadata(row.Metadata) - } + assertLedgerRows(paidRows) bothRows, err := fetchTransactionFees(ctx, platform, actor.Address(), userLower, "both") require.NoError(t, err) - require.Len(t, bothRows, len(expected)) + assertLedgerRows(bothRows) withdrawReceivedRows, err := fetchTransactionFees(ctx, platform, actor.Address(), withdrawLeaderAddr, "received") require.NoError(t, err) require.Len(t, withdrawReceivedRows, 1) require.Equal(t, withdrawTx, withdrawReceivedRows[0].TxID) require.Equal(t, withdrawLeaderAddr, withdrawReceivedRows[0].FeeRecipient) + require.Equal(t, withdrawLeaderAddr, withdrawReceivedRows[0].DistributionRecipient) + require.Equal(t, feeFortyTRUF, withdrawReceivedRows[0].DistributionAmount) require.Equal(t, userLower, withdrawReceivedRows[0].Caller) + insertLeaderReceivedRows, err := fetchTransactionFees(ctx, platform, actor.Address(), insertLeaderAddr, "received") + require.NoError(t, err) + require.Len(t, insertLeaderReceivedRows, 1) + require.Equal(t, insertTx, insertLeaderReceivedRows[0].TxID) + require.Equal(t, insertLeaderAddr, insertLeaderReceivedRows[0].FeeRecipient) + require.Equal(t, insertLeaderAddr, insertLeaderReceivedRows[0].DistributionRecipient) + require.Equal(t, feeOneTRUF, insertLeaderReceivedRows[0].DistributionAmount) + + bonusReceivedRows, err := fetchTransactionFees(ctx, platform, actor.Address(), bonusRecipientLower, "received") + require.NoError(t, err) + require.Len(t, bonusReceivedRows, 1) + require.Equal(t, insertTx, bonusReceivedRows[0].TxID) + require.Equal(t, insertLeaderAddr, bonusReceivedRows[0].FeeRecipient) + require.Equal(t, bonusRecipientLower, bonusReceivedRows[0].DistributionRecipient) + require.Equal(t, feeOneTRUF, bonusReceivedRows[0].DistributionAmount) + lastTxRows, err := fetchLastTransactions(ctx, platform, actor.Address(), userLower, int64(len(expected))) require.NoError(t, err) - require.Len(t, lastTxRows, len(expected)) - for _, row := range lastTxRows { - exp, ok := expected[row.TxID] - require.True(t, ok, "unexpected tx in last transactions: %s", row.TxID) - require.Equal(t, exp.method, row.Method) - require.Equal(t, exp.fee, row.FeeAmount) - require.Equal(t, exp.feeRecipient, row.FeeRecipient) - require.Equal(t, exp.feeDistribution, buildDistribution(row.DistributionRecipient, row.DistributionAmount)) - exp.assertMetadata(row.Metadata) - } + assertLedgerRows(lastTxRows) legacyRows, err := fetchLegacyLastTransactions(ctx, platform, actor.Address(), userLower, int64(len(expected))) require.NoError(t, err) @@ -274,7 +366,8 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, require.Equal(t, exp.method, eventRow.Method) require.Equal(t, exp.fee, eventRow.FeeAmount) require.Equal(t, exp.feeRecipient, eventRow.FeeRecipient) - require.Equal(t, exp.feeDistribution, eventRow.FeeDistributions) + expectedAggregate := strings.Join(exp.feeDistributions, ",") + require.Equal(t, expectedAggregate, eventRow.FeeDistributions) exp.assertMetadata(eventRow.Metadata) } @@ -283,11 +376,11 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, } type ledgerExpectation struct { - method string - fee string - feeRecipient string - feeDistribution string - assertMetadata func(meta metadataMap) + method string + fee string + feeRecipient string + feeDistributions []string + assertMetadata func(meta metadataMap) } type legacyRow struct { From 4ce51068a32234450cd3c115602acac6e524490f Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 16:20:43 -0300 Subject: [PATCH 06/11] refactor: update get_last_transactions_v2 action and enhance transaction retrieval logic This commit modifies the `get_last_transactions_v2` action to replace the `$data_provider` parameter with `$wallet`, improving clarity and functionality. The implementation now includes additional checks for valid Ethereum addresses and limits on the number of transactions retrieved. Furthermore, the SQL logic has been refined to aggregate transaction event distributions, ensuring accurate reporting. Corresponding updates have been made to the test suite to validate these changes and enhance coverage for transaction retrieval scenarios. --- .../010-get-latest-write-activity.sql | 74 ++++++++++++++++--- .../migrations/027-transaction-actions.sql | 66 ----------------- .../streams/transaction_events_ledger_test.go | 24 +++++- 3 files changed, 84 insertions(+), 80 deletions(-) diff --git a/internal/migrations/010-get-latest-write-activity.sql b/internal/migrations/010-get-latest-write-activity.sql index 33ebd4b74..9197321c9 100644 --- a/internal/migrations/010-get-latest-write-activity.sql +++ b/internal/migrations/010-get-latest-write-activity.sql @@ -8,6 +8,7 @@ * with v2 once callers migrate. */ +-- This is a legacy implementation that is no longer used. It will be removed in a future migration. CREATE OR REPLACE ACTION get_last_transactions_v1( $data_provider TEXT, $limit_size INT8 @@ -83,7 +84,7 @@ CREATE OR REPLACE ACTION get_last_transactions_v1( }; CREATE OR REPLACE ACTION get_last_transactions_v2( - $data_provider TEXT, + $wallet TEXT, $limit_size INT8 ) PUBLIC VIEW RETURNS TABLE( tx_id TEXT, @@ -93,20 +94,69 @@ CREATE OR REPLACE ACTION get_last_transactions_v2( fee_amount NUMERIC(78, 0), fee_recipient TEXT, metadata TEXT, - fee_distributions TEXT + distribution_sequence INT, + distribution_recipient TEXT, + distribution_amount NUMERIC(78, 0) ) { - -- Placeholder implementation: will be replaced by ledger-backed view in migration 027. + $normalized_wallet TEXT := NULL; + IF COALESCE($wallet, '') != '' { + $normalized_wallet := LOWER($wallet); + IF NOT check_ethereum_address($normalized_wallet) { + ERROR('Invalid wallet. Must be a valid Ethereum address: ' || $wallet); + } + } + + $limit_val INT := COALESCE($limit_size, 6); + IF $limit_val <= 0 { + $limit_val := 6; + } + + IF $limit_val > 100 { + ERROR('Limit size cannot exceed 100'); + } + RETURN + WITH limited_events AS ( + SELECT + te.tx_id, + te.block_height, + tm.name AS method, + LOWER(te.caller) AS caller, + te.fee_amount, + te.fee_recipient, + te.metadata + FROM transaction_events te + JOIN transaction_methods tm ON tm.method_id = te.method_id + WHERE + $normalized_wallet IS NULL + OR te.caller = $normalized_wallet + OR (te.fee_recipient IS NOT NULL AND te.fee_recipient = $normalized_wallet) + OR EXISTS ( + SELECT 1 + FROM transaction_event_distributions ted_inner + WHERE ted_inner.tx_id = te.tx_id + AND ted_inner.recipient = $normalized_wallet + ) + ORDER BY te.block_height DESC, te.tx_id DESC + LIMIT $limit_val + ) SELECT - NULL::TEXT AS tx_id, - lt.created_at, - lt.method, - NULL::TEXT AS caller, - NULL::NUMERIC(78, 0) AS fee_amount, - NULL::TEXT AS fee_recipient, - NULL::TEXT AS metadata, - ''::TEXT AS fee_distributions - FROM get_last_transactions_v1($data_provider, $limit_size) lt; + le.tx_id, + le.block_height, + le.method, + le.caller, + le.fee_amount, + le.fee_recipient, + le.metadata, + COALESCE(ted.sequence, 0) AS distribution_sequence, + ted.recipient AS distribution_recipient, + ted.amount AS distribution_amount + FROM limited_events le + LEFT JOIN transaction_event_distributions ted + ON ted.tx_id = le.tx_id + ORDER BY le.block_height DESC, + le.tx_id DESC, + COALESCE(ted.sequence, 0) ASC; }; CREATE OR REPLACE ACTION get_last_transactions( diff --git a/internal/migrations/027-transaction-actions.sql b/internal/migrations/027-transaction-actions.sql index cd7d1c949..781b2061a 100644 --- a/internal/migrations/027-transaction-actions.sql +++ b/internal/migrations/027-transaction-actions.sql @@ -330,69 +330,3 @@ CREATE OR REPLACE ACTION list_transaction_fees( -CREATE OR REPLACE ACTION get_last_transactions_v2( - $data_provider TEXT, - $limit_size INT8 -) PUBLIC VIEW RETURNS TABLE( - tx_id TEXT, - created_at INT8, - method TEXT, - caller TEXT, - fee_amount NUMERIC(78, 0), - fee_recipient TEXT, - metadata TEXT, - distribution_sequence INT, - distribution_recipient TEXT, - distribution_amount NUMERIC(78, 0) -) { - $normalized_provider TEXT := NULL; - IF COALESCE($data_provider, '') != '' { - $normalized_provider := LOWER($data_provider); - IF NOT check_ethereum_address($normalized_provider) { - ERROR('Invalid data provider address. Must be a valid Ethereum address: ' || $data_provider); - } - } - - $limit_val INT := COALESCE($limit_size, 6); - IF $limit_val <= 0 { - $limit_val := 6; - } - - IF $limit_val > 100 { - ERROR('Limit size cannot exceed 100'); - } - - RETURN - WITH limited_events AS ( - SELECT - te.tx_id, - te.block_height, - tm.name AS method, - LOWER(te.caller) AS caller, - te.fee_amount, - te.fee_recipient, - te.metadata - FROM transaction_events te - JOIN transaction_methods tm ON tm.method_id = te.method_id - WHERE $normalized_provider IS NULL OR LOWER(te.caller) = $normalized_provider - ORDER BY te.block_height DESC, te.tx_id DESC - LIMIT $limit_val - ) - SELECT - le.tx_id, - le.block_height, - le.method, - le.caller, - le.fee_amount, - le.fee_recipient, - le.metadata, - COALESCE(ted.sequence, 0) AS distribution_sequence, - ted.recipient AS distribution_recipient, - ted.amount AS distribution_amount - FROM limited_events le - LEFT JOIN transaction_event_distributions ted - ON ted.tx_id = le.tx_id - ORDER BY le.block_height DESC, - le.tx_id DESC, - COALESCE(ted.sequence, 0) ASC; -}; diff --git a/tests/streams/transaction_events_ledger_test.go b/tests/streams/transaction_events_ledger_test.go index 619e5bb7c..0ab50400c 100644 --- a/tests/streams/transaction_events_ledger_test.go +++ b/tests/streams/transaction_events_ledger_test.go @@ -345,6 +345,25 @@ func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, require.NoError(t, err) assertLedgerRows(lastTxRows) + withdrawHistoryRows, err := fetchLastTransactions(ctx, platform, actor.Address(), withdrawLeaderAddr, 10) + require.NoError(t, err) + require.NotEmpty(t, withdrawHistoryRows) + require.Equal(t, withdrawTx, withdrawHistoryRows[0].TxID) + + bonusHistoryRows, err := fetchLastTransactions(ctx, platform, actor.Address(), bonusRecipientLower, 10) + require.NoError(t, err) + require.NotEmpty(t, bonusHistoryRows) + foundBonus := false + for _, row := range bonusHistoryRows { + if row.DistributionRecipient == bonusRecipientLower { + require.Equal(t, insertTx, row.TxID) + require.Equal(t, feeOneTRUF, row.DistributionAmount) + foundBonus = true + break + } + } + require.True(t, foundBonus, "expected to find distribution row for bonus recipient") + legacyRows, err := fetchLegacyLastTransactions(ctx, platform, actor.Address(), userLower, int64(len(expected))) require.NoError(t, err) require.NotEmpty(t, legacyRows) @@ -475,9 +494,10 @@ func fetchTransactionFees(ctx context.Context, platform *kwilTesting.Platform, c return rows, err } -func fetchLastTransactions(ctx context.Context, platform *kwilTesting.Platform, caller string, dataProvider string, limit int64) ([]ledgerRow, error) { +func fetchLastTransactions(ctx context.Context, platform *kwilTesting.Platform, caller string, address string, limit int64) ([]ledgerRow, error) { rows := make([]ledgerRow, 0, 8) - args := []any{dataProvider, limit} + addr := strings.ToLower(address) + args := []any{addr, limit} err := callView(ctx, platform, caller, "get_last_transactions_v2", args, func(row *common.Row) error { feeRecipient := "" if row.Values[5] != nil { From ae07728d3ee81f092139a1477c34813d8d4ffc84 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 16:35:50 -0300 Subject: [PATCH 07/11] refactor: update bridge actions to use standardized recipient variable This commit modifies the `sepolia_bridge_tokens` and `ethereum_bridge_tokens` actions to utilize a standardized `$bridge_recipient` variable instead of directly referencing the `$recipient` parameter. This change enhances code clarity and consistency in the bridge withdrawal process. No functional changes were introduced, ensuring the actions continue to operate as intended. --- internal/migrations/erc20-bridge/001-actions.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/migrations/erc20-bridge/001-actions.sql b/internal/migrations/erc20-bridge/001-actions.sql index 8801b43d4..81d494000 100644 --- a/internal/migrations/erc20-bridge/001-actions.sql +++ b/internal/migrations/erc20-bridge/001-actions.sql @@ -46,7 +46,7 @@ CREATE OR REPLACE ACTION sepolia_bridge_tokens($recipient TEXT DEFAULT NULL, $am $bridge_recipient TEXT := LOWER(COALESCE($recipient, @caller)); -- Execute withdrawal using the bridge extension - sepolia_bridge.bridge(COALESCE($recipient, @caller), $withdrawal_amount); + sepolia_bridge.bridge($bridge_recipient, $withdrawal_amount); record_transaction_event( 5, @@ -104,7 +104,7 @@ CREATE OR REPLACE ACTION ethereum_bridge_tokens($recipient TEXT DEFAULT NULL, $a $bridge_recipient TEXT := LOWER(COALESCE($recipient, @caller)); -- Execute withdrawal using the bridge extension - ethereum_bridge.bridge(COALESCE($recipient, @caller), $withdrawal_amount); + ethereum_bridge.bridge($bridge_recipient, $withdrawal_amount); record_transaction_event( 5, From 151e7f3b5214c43db07963cde8bf386f96649f4f Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 16:49:58 -0300 Subject: [PATCH 08/11] refactor: remove append_fee_distribution action and add placeholder for multi-recipient fee recording This commit removes the `append_fee_distribution` action, streamlining the transaction event handling. A placeholder for a new action, `record_tx_event_split`, is introduced to facilitate future implementation of multi-recipient fee recording. This change aims to enhance clarity and prepare for more efficient fee distribution management in upcoming updates. --- .../migrations/027-transaction-actions.sql | 60 ++++--------------- 1 file changed, 11 insertions(+), 49 deletions(-) diff --git a/internal/migrations/027-transaction-actions.sql b/internal/migrations/027-transaction-actions.sql index 781b2061a..d9bc90a48 100644 --- a/internal/migrations/027-transaction-actions.sql +++ b/internal/migrations/027-transaction-actions.sql @@ -84,54 +84,17 @@ CREATE OR REPLACE ACTION record_transaction_event( } }; -CREATE OR REPLACE ACTION append_fee_distribution( - $recipient TEXT, - $amount NUMERIC(78, 0) -) PRIVATE { - IF @txid IS NULL { - ERROR('append_fee_distribution: missing transaction id'); - } - - IF $recipient IS NULL { - ERROR('append_fee_distribution: recipient is required'); - } - - $recipient_lower TEXT := LOWER($recipient); - IF NOT check_ethereum_address($recipient_lower) { - ERROR('append_fee_distribution: invalid recipient address: ' || $recipient); - } - - $tx_hex TEXT := LOWER(@txid); - IF LENGTH($tx_hex) != 64 { - ERROR('append_fee_distribution: txid must be 32-byte hex string'); - } - $tx_id TEXT := '0x' || $tx_hex; - - $amount_sanitized NUMERIC(78, 0) := COALESCE($amount, 0::NUMERIC(78, 0)); - - $next_sequence INT := 1; - FOR $row IN - SELECT COALESCE(MAX(sequence), 0) + 1 AS seq - FROM transaction_event_distributions - WHERE tx_id = $tx_id - { - $next_sequence := $row.seq; - } - - INSERT INTO transaction_event_distributions ( - tx_id, - sequence, - recipient, - amount, - note - ) VALUES ( - $tx_id, - $next_sequence, - $recipient_lower, - $amount_sanitized, - NULL - ); -}; +-- CREATE OR REPLACE ACTION record_tx_event_split( +-- $method_id INT, +-- $fee_amount NUMERIC(78, 0), +-- $recipients TEXT[], +-- $amounts NUMERIC(78, 0)[], +-- $metadata TEXT +-- ) PRIVATE { +-- -- TODO: Implement multi-recipient fee recording in a single call. +-- -- This action should validate array lengths, ensure the amounts sum to +-- -- $fee_amount, and write the distribution rows atomically. +-- }; CREATE OR REPLACE ACTION get_transaction_event( $tx_id TEXT @@ -329,4 +292,3 @@ CREATE OR REPLACE ACTION list_transaction_fees( }; - From 8852e2c722dbc3300e384df7b9be2247efd326b7 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 17:20:37 -0300 Subject: [PATCH 09/11] refactor: enhance transaction context management and streamline engine context creation This commit introduces a new function, `newTxContextWithLeader`, to standardize the creation of transaction contexts with a designated leader public key. The `EngineContext` initialization has been refactored across multiple files to utilize this new function, improving code clarity and reducing redundancy. Additionally, minor adjustments were made to test cases to ensure alignment with the updated context management, enhancing overall test coverage and maintainability. --- .../primitive_batch_insert_alignment_test.go | 13 +- tests/streams/utils/procedure/execute.go | 63 ++++++-- tests/streams/utils/setup/common.go | 140 ++++++++---------- tests/streams/utils/setup/primitive.go | 60 ++------ 4 files changed, 124 insertions(+), 152 deletions(-) diff --git a/tests/streams/primitive_batch_insert_alignment_test.go b/tests/streams/primitive_batch_insert_alignment_test.go index 5883f3da7..acde64bfb 100644 --- a/tests/streams/primitive_batch_insert_alignment_test.go +++ b/tests/streams/primitive_batch_insert_alignment_test.go @@ -21,7 +21,7 @@ import ( // TestPrimitiveBatchInsertAlignment ensures that batch insertion via insert_records maps values // to the correct streams when multiple streams are processed together. -// +// // This is a regression test for a bug where get_stream_ids() and helper_lowercase_array() // were using non-deterministic array aggregation, causing stream_ref arrays to become // misaligned with input data_provider/stream_id arrays. This resulted in records being @@ -31,7 +31,7 @@ import ( // contains only its own data by checking specific value/stream_ref combinations. func TestPrimitiveBatchInsertAlignment(t *testing.T) { testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ - Name: "primitive_batch_insert_alignment_test", + Name: "primitive_batch_insert_alignment_test", SeedStatements: migrations.GetSeedScriptStatements(), FunctionTests: []kwilTesting.TestFunc{ testBatchAlignment(t), @@ -76,14 +76,7 @@ func testBatchAlignment(t *testing.T) func(ctx context.Context, platform *kwilTe } // Execute insert_records directly in one call - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - TxID: platform.Txid(), - Signer: deployer.Bytes(), - Caller: deployer.Address(), - } - engineContext := &common.EngineContext{TxContext: txContext} + engineContext := setup.NewEngineContext(ctx, platform, deployer, 1) r, err := platform.Engine.Call(engineContext, platform.DB, "", "insert_records", []any{ dataProviders, diff --git a/tests/streams/utils/procedure/execute.go b/tests/streams/utils/procedure/execute.go index d8b4466d8..3f464bef3 100644 --- a/tests/streams/utils/procedure/execute.go +++ b/tests/streams/utils/procedure/execute.go @@ -5,15 +5,16 @@ import ( "encoding/json" "fmt" "strings" - - trufTypes "github.com/trufnetwork/sdk-go/core/types" - "github.com/trufnetwork/sdk-go/core/util" + "sync" "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" kwilTypes "github.com/trufnetwork/kwil-db/core/types" kwilTesting "github.com/trufnetwork/kwil-db/testing" - "github.com/trufnetwork/sdk-go/core/types" + types "github.com/trufnetwork/sdk-go/core/types" + "github.com/trufnetwork/sdk-go/core/util" ) // GetDataResult contains the full result of a GetRecord call @@ -47,6 +48,39 @@ func parseCacheInfoFromLogs(logs []string) (cacheHit bool, cachedAt *int64) { return } +var ( + defaultLeaderOnce sync.Once + defaultLeaderPub crypto.PublicKey +) + +func defaultLeaderPublicKey() crypto.PublicKey { + defaultLeaderOnce.Do(func() { + _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + panic(fmt.Sprintf("failed to generate default leader key: %v", err)) + } + defaultLeaderPub = pubGeneric + }) + return defaultLeaderPub +} + +func newTxContextWithLeader(ctx context.Context, platform *kwilTesting.Platform, signer []byte, caller string, authenticator string, height int64) *common.TxContext { + if height <= 0 { + height = 1 + } + return &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: height, + Proposer: defaultLeaderPublicKey(), + }, + Signer: signer, + Caller: caller, + TxID: platform.Txid(), + Authenticator: authenticator, + } +} + // GetRecordWithLogs executes get_record and returns full result including logs func GetRecordWithLogs(ctx context.Context, input GetRecordInput) (*GetDataResult, error) { deployer, err := util.NewEthereumAddressFromBytes(input.Platform.Deployer) @@ -504,16 +538,15 @@ func SetTaxonomy(ctx context.Context, input SetTaxonomyInput) error { weightDecimals = append(weightDecimals, valueDecimal) } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - engineContext := &common.EngineContext{ - TxContext: txContext, + TxContext: newTxContextWithLeader( + ctx, + input.Platform, + input.Platform.Deployer, + deployer.Address(), + coreauth.EthPersonalSignAuth, + input.Height, + ), } r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_taxonomy", []any{ @@ -726,7 +759,7 @@ func ListStreams(ctx context.Context, input ListStreamsInput) ([]ResultRow, erro type GetDatabaseSizeInput struct { Platform *kwilTesting.Platform - Locator trufTypes.StreamLocator + Locator types.StreamLocator Height int64 } @@ -834,7 +867,6 @@ func GetDatabaseSizeV2Pretty(ctx context.Context, input GetDatabaseSizeInput) ([ return processResultRows(resultRows) } - // ListTaxonomiesByHeight executes list_taxonomies_by_height action func ListTaxonomiesByHeight(ctx context.Context, input ListTaxonomiesByHeightInput) ([]ResultRow, error) { deployer, err := util.NewEthereumAddressFromBytes(input.Platform.Deployer) @@ -959,4 +991,3 @@ func ListMetadataByHeight(ctx context.Context, input ListMetadataByHeightInput) return processResultRows(resultRows) } - diff --git a/tests/streams/utils/setup/common.go b/tests/streams/utils/setup/common.go index f1e023eeb..b7f215a42 100644 --- a/tests/streams/utils/setup/common.go +++ b/tests/streams/utils/setup/common.go @@ -2,10 +2,14 @@ package setup import ( "context" + "fmt" "strings" + "sync" "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" kwilTesting "github.com/trufnetwork/kwil-db/testing" "github.com/trufnetwork/sdk-go/core/types" "github.com/trufnetwork/sdk-go/core/util" @@ -27,6 +31,53 @@ func (contractType ContractType) String() string { return string(contractType) } +var ( + defaultLeaderOnce sync.Once + defaultLeaderPub *crypto.Secp256k1PublicKey +) + +func defaultLeaderPublicKey() *crypto.Secp256k1PublicKey { + defaultLeaderOnce.Do(func() { + _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + panic(fmt.Sprintf("failed to generate default leader key: %v", err)) + } + var ok bool + defaultLeaderPub, ok = pubGeneric.(*crypto.Secp256k1PublicKey) + if !ok { + panic("default leader public key is not secp256k1") + } + }) + return defaultLeaderPub +} + +func newTxContextWithLeader(ctx context.Context, platform *kwilTesting.Platform, signer []byte, caller string, authenticator string, height int64) *common.TxContext { + if height <= 0 { + height = 1 + } + return &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: height, + Proposer: defaultLeaderPublicKey(), + }, + Signer: signer, + Caller: caller, + TxID: platform.Txid(), + Authenticator: authenticator, + } +} + +func newEthEngineContext(ctx context.Context, platform *kwilTesting.Platform, addr util.EthereumAddress, height int64) *common.EngineContext { + txContext := newTxContextWithLeader(ctx, platform, addr.Bytes(), addr.Address(), coreauth.EthPersonalSignAuth, height) + return &common.EngineContext{TxContext: txContext} +} + +// NewEngineContext returns an engine context configured with the provided signer and a deterministic leader. +func NewEngineContext(ctx context.Context, platform *kwilTesting.Platform, addr util.EthereumAddress, height int64) *common.EngineContext { + return newEthEngineContext(ctx, platform, addr, height) +} + // CreateStream parses and creates the dataset for a contract func CreateStream(ctx context.Context, platform *kwilTesting.Platform, contractInfo StreamInfo) error { return CreateStreamWithOptions(ctx, platform, contractInfo, CreateStreamOptions{}) @@ -63,17 +114,7 @@ func UntypedCreateStream(ctx context.Context, platform *kwilTesting.Platform, st return errors.Wrap(err, "invalid data provider address") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: addr.Bytes(), - Caller: addr.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, addr, 1) r, err := platform.Engine.Call(engineContext, platform.DB, @@ -124,17 +165,7 @@ func CreateStreamsWithOptions(ctx context.Context, platform *kwilTesting.Platfor return errors.Wrap(err, "error creating composed dataset") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: deployer.Bytes(), - Caller: deployer.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, deployer, 1) streamIds := make([]string, len(streamInfos)) streamTypes := make([]string, len(streamInfos)) @@ -164,17 +195,7 @@ func CreateStreamsWithOptions(ctx context.Context, platform *kwilTesting.Platfor } func DeleteStream(ctx context.Context, platform *kwilTesting.Platform, streamLocator types.StreamLocator) (*common.CallResult, error) { - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: streamLocator.DataProvider.Bytes(), - Caller: streamLocator.DataProvider.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, streamLocator.DataProvider, 1) return platform.Engine.Call(engineContext, platform.DB, @@ -202,17 +223,7 @@ func CreateDataProvider(ctx context.Context, platform *kwilTesting.Platform, add return errors.Wrap(err, "failed to enable stream deployer") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: addr.Bytes(), - Caller: addr.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, addr, 1) r, err := platform.Engine.Call(engineContext, platform.DB, @@ -253,17 +264,7 @@ func CreateDataProviderWithoutRole(ctx context.Context, platform *kwilTesting.Pl } // Register the data provider - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: addr.Bytes(), - Caller: addr.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, addr, 1) r, err := platform.Engine.Call(engineContext, platform.DB, @@ -302,16 +303,15 @@ func CreateDataProviderWithoutRole(ctx context.Context, platform *kwilTesting.Pl // 3. This is a test utility following the same pattern as AddMemberToRoleBypass // 4. Using OverrideAuthz is the standard pattern for test role management func removeMemberFromRoleBypass(ctx context.Context, platform *kwilTesting.Platform, owner, roleName, wallet string) error { - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 0}, - TxID: platform.Txid(), - Signer: []byte("system"), - Caller: "0x0000000000000000000000000000000000000000", - } - engineContext := &common.EngineContext{ - TxContext: txContext, + TxContext: newTxContextWithLeader( + ctx, + platform, + []byte("system"), + "0x0000000000000000000000000000000000000000", + "", + 1, + ), OverrideAuthz: true, // Skip authorization checks - this is a test utility } @@ -343,17 +343,7 @@ func GetStreamId(ctx context.Context, platform *kwilTesting.Platform, dataProvid return 0, errors.Wrap(err, "error creating ethereum address") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: deployer.Bytes(), - Caller: deployer.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, deployer, 1) var streamRef int r, err := platform.Engine.Call(engineContext, platform.DB, "", "get_stream_id", []any{ diff --git a/tests/streams/utils/setup/primitive.go b/tests/streams/utils/setup/primitive.go index 063c0aeb9..c38da5088 100644 --- a/tests/streams/utils/setup/primitive.go +++ b/tests/streams/utils/setup/primitive.go @@ -4,13 +4,12 @@ import ( "context" "strconv" - "github.com/trufnetwork/sdk-go/core/types" - "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" kwilTypes "github.com/trufnetwork/kwil-db/core/types" kwilTesting "github.com/trufnetwork/kwil-db/testing" testtable "github.com/trufnetwork/node/tests/streams/utils/table" + "github.com/trufnetwork/sdk-go/core/types" "github.com/trufnetwork/sdk-go/core/util" ) @@ -156,19 +155,8 @@ func InsertMarkdownPrimitiveData(ctx context.Context, input InsertMarkdownDataIn continue } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: txid, - Signer: signer.Bytes(), - Caller: signer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, input.Platform, signer, input.Height) + engineContext.TxContext.TxID = txid r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_record", []any{ input.StreamLocator.DataProvider.Address(), @@ -217,19 +205,8 @@ func insertPrimitiveData(ctx context.Context, input InsertPrimitiveDataInput) er } for _, arg := range args { - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: txid, - Signer: deployer.Bytes(), - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, input.Platform, deployer, input.Height) + engineContext.TxContext.TxID = txid r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_record", arg, func(row *common.Row) error { return nil @@ -327,16 +304,8 @@ func InsertPrimitiveDataMultiBatch(ctx context.Context, input InsertMultiPrimiti } signerAddr := util.Unsafe_NewEthereumAddressFromString(provider) - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: txid, - Signer: signerAddr.Bytes(), - Caller: signerAddr.Address(), - } - engineContext := &common.EngineContext{TxContext: txContext} + engineContext := newEthEngineContext(ctx, input.Platform, signerAddr, input.Height) + engineContext.TxContext.TxID = txid args := []any{g.dataProviders, g.streamIds, g.eventTimes, g.values} r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_records", args, func(row *common.Row) error { return nil }) @@ -404,19 +373,8 @@ func InsertTruflationDataBatch(ctx context.Context, input InsertTruflationDataIn return errors.Wrap(err, "error in InsertTruflationDataBatch") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: txid, - Signer: deployer.Bytes(), - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, input.Platform, deployer, input.Height) + engineContext.TxContext.TxID = txid r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "truflation_insert_records", args, func(row *common.Row) error { return nil From 818c6379f2cf8089e96c623658cffc75ee58dcdb Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 17:42:50 -0300 Subject: [PATCH 10/11] refactor: centralize transaction context creation and enhance engine context management This commit introduces a new `testctx` package to centralize the creation of transaction and engine contexts, improving code organization and reducing redundancy. The `newTxContextWithLeader` function has been replaced with `NewTxContextWithAuth` in the `testctx` package, streamlining the context setup across multiple files. Additionally, the refactoring enhances clarity and maintainability of the codebase, ensuring consistent context management throughout the testing utilities. --- tests/streams/utils/procedure/execute.go | 258 ++--------------------- tests/streams/utils/setup/common.go | 49 +---- tests/streams/utils/testctx/context.go | 57 +++++ 3 files changed, 79 insertions(+), 285 deletions(-) create mode 100644 tests/streams/utils/testctx/context.go diff --git a/tests/streams/utils/procedure/execute.go b/tests/streams/utils/procedure/execute.go index 3f464bef3..59063f4ed 100644 --- a/tests/streams/utils/procedure/execute.go +++ b/tests/streams/utils/procedure/execute.go @@ -5,14 +5,12 @@ import ( "encoding/json" "fmt" "strings" - "sync" "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" - "github.com/trufnetwork/kwil-db/core/crypto" - coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" kwilTypes "github.com/trufnetwork/kwil-db/core/types" kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/tests/streams/utils/testctx" types "github.com/trufnetwork/sdk-go/core/types" "github.com/trufnetwork/sdk-go/core/util" ) @@ -48,39 +46,6 @@ func parseCacheInfoFromLogs(logs []string) (cacheHit bool, cachedAt *int64) { return } -var ( - defaultLeaderOnce sync.Once - defaultLeaderPub crypto.PublicKey -) - -func defaultLeaderPublicKey() crypto.PublicKey { - defaultLeaderOnce.Do(func() { - _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) - if err != nil { - panic(fmt.Sprintf("failed to generate default leader key: %v", err)) - } - defaultLeaderPub = pubGeneric - }) - return defaultLeaderPub -} - -func newTxContextWithLeader(ctx context.Context, platform *kwilTesting.Platform, signer []byte, caller string, authenticator string, height int64) *common.TxContext { - if height <= 0 { - height = 1 - } - return &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: height, - Proposer: defaultLeaderPublicKey(), - }, - Signer: signer, - Caller: caller, - TxID: platform.Txid(), - Authenticator: authenticator, - } -} - // GetRecordWithLogs executes get_record and returns full result including logs func GetRecordWithLogs(ctx context.Context, input GetRecordInput) (*GetDataResult, error) { deployer, err := util.NewEthereumAddressFromBytes(input.Platform.Deployer) @@ -88,19 +53,7 @@ func GetRecordWithLogs(ctx context.Context, input GetRecordInput) (*GetDataResul return nil, errors.Wrap(err, "error in getRecord") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) prefix := "" if input.Prefix != nil { @@ -189,19 +142,7 @@ func GetIndexWithLogs(ctx context.Context, input GetIndexInput) (*GetDataResult, return nil, errors.Wrap(err, "error in getIndex") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) prefix := "" if input.Prefix != nil { @@ -270,19 +211,7 @@ func GetIndexChangeWithLogs(ctx context.Context, input GetIndexChangeInput) (*Ge return nil, errors.Wrap(err, "error in getIndexChange") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) // Set default use_cache to false if not specified useCache := false @@ -345,19 +274,7 @@ func GetFirstRecordWithLogs(ctx context.Context, input GetFirstRecordInput) (*Ge return nil, errors.Wrap(err, "error in getFirstRecord") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) // Set default use_cache to false if not specified useCache := false @@ -416,19 +333,7 @@ func SetMetadata(ctx context.Context, input SetMetadataInput) error { return errors.Wrap(err, "error in setMetadata") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "set_metadata", []any{ input.StreamLocator.DataProvider.Address(), @@ -483,17 +388,7 @@ func DescribeTaxonomies(ctx context.Context, input DescribeTaxonomiesInput) ([]R return nil, errors.Wrap(err, "error in DescribeTaxonomies.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - BlockContext: &common.BlockContext{Height: 0}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - Ctx: ctx, - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, 0) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "describe_taxonomies", []any{ @@ -538,16 +433,7 @@ func SetTaxonomy(ctx context.Context, input SetTaxonomyInput) error { weightDecimals = append(weightDecimals, valueDecimal) } - engineContext := &common.EngineContext{ - TxContext: newTxContextWithLeader( - ctx, - input.Platform, - input.Platform.Deployer, - deployer.Address(), - coreauth.EthPersonalSignAuth, - input.Height, - ), - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_taxonomy", []any{ input.StreamLocator.DataProvider.Address(), // parent data provider @@ -571,19 +457,7 @@ func GetCategoryStreams(ctx context.Context, input GetCategoryStreamsInput) ([]R return nil, errors.Wrap(err, "error in getCategoryStreams") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: 0, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, 0) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_category_streams", []any{ @@ -616,19 +490,7 @@ func FilterStreamsByExistence(ctx context.Context, input FilterStreamsByExistenc return nil, errors.Wrap(err, "error in FilterStreamsByExistence") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) dataProviders := []string{} streamIds := []string{} @@ -673,19 +535,7 @@ func DisableTaxonomy(ctx context.Context, input DisableTaxonomyInput) error { return errors.Wrap(err, "error in DisableTaxonomy") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "disable_taxonomy", []any{ input.StreamLocator.DataProvider.Address(), @@ -720,19 +570,7 @@ func ListStreams(ctx context.Context, input ListStreamsInput) ([]ResultRow, erro return nil, errors.Wrap(err, "error in ListStreams") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "list_streams", []any{ @@ -769,17 +607,7 @@ func GetDatabaseSize(ctx context.Context, input GetDatabaseSizeInput) (int64, er return 0, errors.Wrap(err, "failed to create Ethereum address from deployer bytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var databaseSize *int64 r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_database_size", []any{}, func(row *common.Row) error { databaseSize = safe(row.Values[0], nil, int64PtrConverter) @@ -802,17 +630,7 @@ func GetDatabaseSizeV2(ctx context.Context, input GetDatabaseSizeInput) ([]Resul return nil, errors.Wrap(err, "error in GetDatabaseSizeV2.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_database_size_v2", []any{}, func(row *common.Row) error { @@ -838,17 +656,7 @@ func GetDatabaseSizeV2Pretty(ctx context.Context, input GetDatabaseSizeInput) ([ return nil, errors.Wrap(err, "error in GetDatabaseSizeV2Pretty.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_database_size_v2_pretty", []any{}, func(row *common.Row) error { @@ -874,17 +682,7 @@ func ListTaxonomiesByHeight(ctx context.Context, input ListTaxonomiesByHeightInp return nil, errors.Wrap(err, "error in ListTaxonomiesByHeight.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "list_taxonomies_by_height", []any{ @@ -916,17 +714,7 @@ func GetTaxonomiesForStreams(ctx context.Context, input GetTaxonomiesForStreamsI return nil, errors.Wrap(err, "error in GetTaxonomiesForStreams.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_taxonomies_for_streams", []any{ @@ -956,17 +744,7 @@ func ListMetadataByHeight(ctx context.Context, input ListMetadataByHeightInput) return nil, errors.Wrap(err, "error in ListMetadataByHeight.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "list_metadata_by_height", []any{ diff --git a/tests/streams/utils/setup/common.go b/tests/streams/utils/setup/common.go index b7f215a42..7a16f2b82 100644 --- a/tests/streams/utils/setup/common.go +++ b/tests/streams/utils/setup/common.go @@ -2,15 +2,12 @@ package setup import ( "context" - "fmt" "strings" - "sync" "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" - "github.com/trufnetwork/kwil-db/core/crypto" - coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/tests/streams/utils/testctx" "github.com/trufnetwork/sdk-go/core/types" "github.com/trufnetwork/sdk-go/core/util" ) @@ -31,51 +28,13 @@ func (contractType ContractType) String() string { return string(contractType) } -var ( - defaultLeaderOnce sync.Once - defaultLeaderPub *crypto.Secp256k1PublicKey -) - -func defaultLeaderPublicKey() *crypto.Secp256k1PublicKey { - defaultLeaderOnce.Do(func() { - _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) - if err != nil { - panic(fmt.Sprintf("failed to generate default leader key: %v", err)) - } - var ok bool - defaultLeaderPub, ok = pubGeneric.(*crypto.Secp256k1PublicKey) - if !ok { - panic("default leader public key is not secp256k1") - } - }) - return defaultLeaderPub -} - -func newTxContextWithLeader(ctx context.Context, platform *kwilTesting.Platform, signer []byte, caller string, authenticator string, height int64) *common.TxContext { - if height <= 0 { - height = 1 - } - return &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: height, - Proposer: defaultLeaderPublicKey(), - }, - Signer: signer, - Caller: caller, - TxID: platform.Txid(), - Authenticator: authenticator, - } -} - func newEthEngineContext(ctx context.Context, platform *kwilTesting.Platform, addr util.EthereumAddress, height int64) *common.EngineContext { - txContext := newTxContextWithLeader(ctx, platform, addr.Bytes(), addr.Address(), coreauth.EthPersonalSignAuth, height) - return &common.EngineContext{TxContext: txContext} + return testctx.NewEngineContext(ctx, platform, addr, height) } // NewEngineContext returns an engine context configured with the provided signer and a deterministic leader. func NewEngineContext(ctx context.Context, platform *kwilTesting.Platform, addr util.EthereumAddress, height int64) *common.EngineContext { - return newEthEngineContext(ctx, platform, addr, height) + return testctx.NewEngineContext(ctx, platform, addr, height) } // CreateStream parses and creates the dataset for a contract @@ -304,7 +263,7 @@ func CreateDataProviderWithoutRole(ctx context.Context, platform *kwilTesting.Pl // 4. Using OverrideAuthz is the standard pattern for test role management func removeMemberFromRoleBypass(ctx context.Context, platform *kwilTesting.Platform, owner, roleName, wallet string) error { engineContext := &common.EngineContext{ - TxContext: newTxContextWithLeader( + TxContext: testctx.NewTxContextWithAuth( ctx, platform, []byte("system"), diff --git a/tests/streams/utils/testctx/context.go b/tests/streams/utils/testctx/context.go new file mode 100644 index 000000000..2ba746f6c --- /dev/null +++ b/tests/streams/utils/testctx/context.go @@ -0,0 +1,57 @@ +package testctx + +import ( + "context" + "fmt" + "sync" + + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/sdk-go/core/util" +) + +var ( + defaultLeaderOnce sync.Once + defaultLeaderPub *crypto.Secp256k1PublicKey +) + +func defaultLeaderPublicKey() *crypto.Secp256k1PublicKey { + defaultLeaderOnce.Do(func() { + _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + panic(fmt.Sprintf("failed to generate default leader key: %v", err)) + } + var ok bool + defaultLeaderPub, ok = pubGeneric.(*crypto.Secp256k1PublicKey) + if !ok { + panic("default leader public key is not secp256k1") + } + }) + return defaultLeaderPub +} + +// NewTxContextWithAuth constructs a TxContext that includes a deterministic leader proposer. +func NewTxContextWithAuth(ctx context.Context, platform *kwilTesting.Platform, signer []byte, caller string, authenticator string, height int64) *common.TxContext { + if height <= 0 { + height = 1 + } + return &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: height, + Proposer: defaultLeaderPublicKey(), + }, + Signer: signer, + Caller: caller, + TxID: platform.Txid(), + Authenticator: authenticator, + } +} + +// NewEngineContext returns an EngineContext configured for an Ethereum address signer. +func NewEngineContext(ctx context.Context, platform *kwilTesting.Platform, addr util.EthereumAddress, height int64) *common.EngineContext { + txContext := NewTxContextWithAuth(ctx, platform, addr.Bytes(), addr.Address(), coreauth.EthPersonalSignAuth, height) + return &common.EngineContext{TxContext: txContext} +} From 804e3d695a2dbdcfd6aef57169e3e271b2a13ba0 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Thu, 6 Nov 2025 19:55:29 -0300 Subject: [PATCH 11/11] fix: correct height validation in NewTxContextWithAuth function This commit updates the height validation in the `NewTxContextWithAuth` function to ensure that negative heights are handled correctly. The condition has been changed from `height <= 0` to `height < 0`, setting a default height of 1 when the provided height is negative. This adjustment improves the robustness of transaction context creation. --- tests/streams/utils/testctx/context.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/streams/utils/testctx/context.go b/tests/streams/utils/testctx/context.go index 2ba746f6c..da150307a 100644 --- a/tests/streams/utils/testctx/context.go +++ b/tests/streams/utils/testctx/context.go @@ -34,7 +34,7 @@ func defaultLeaderPublicKey() *crypto.Secp256k1PublicKey { // NewTxContextWithAuth constructs a TxContext that includes a deterministic leader proposer. func NewTxContextWithAuth(ctx context.Context, platform *kwilTesting.Platform, signer []byte, caller string, authenticator string, height int64) *common.TxContext { - if height <= 0 { + if height < 0 { height = 1 } return &common.TxContext{