diff --git a/internal/migrations/001-common-actions.sql b/internal/migrations/001-common-actions.sql index 87c0e9976..ddc700a19 100644 --- a/internal/migrations/001-common-actions.sql +++ b/internal/migrations/001-common-actions.sql @@ -59,6 +59,9 @@ CREATE OR REPLACE ACTION create_streams( ) PUBLIC { -- ===== FEE COLLECTION WITH ROLE EXEMPTION ===== $lower_caller TEXT := LOWER(@caller); + $fee_total NUMERIC(78, 0) := 0::NUMERIC(78, 0); + $fee_recipient TEXT := NULL; + $leader_hex TEXT := NULL; -- Get stream count (used for both fee calculation and validation) $num_streams INT := array_length($stream_ids); @@ -77,6 +80,11 @@ CREATE OR REPLACE ACTION create_streams( $fee_per_stream := 2000000000000000000::NUMERIC(78, 0); -- 2 TRUF with 18 decimals $total_fee := $fee_per_stream * $num_streams::NUMERIC(78, 0); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex := encode(@leader_sender, 'hex')::TEXT; + $caller_balance := ethereum_bridge.balance(@caller); IF $caller_balance < $total_fee { @@ -84,8 +92,9 @@ CREATE OR REPLACE ACTION create_streams( ERROR('Insufficient balance for stream creation. Required: ' || ($total_fee / 1000000000000000000::NUMERIC(78, 0))::TEXT || ' TRUF for ' || $num_streams::TEXT || ' stream(s)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $total_fee); + ethereum_bridge.transfer($leader_hex, $total_fee); + $fee_total := $total_fee; + $fee_recipient := '0x' || $leader_hex; } -- ===== END FEE COLLECTION ===== @@ -288,6 +297,12 @@ CREATE OR REPLACE ACTION create_streams( JOIN data_providers dp ON dp.address = $data_provider JOIN streams s ON s.data_provider_id = dp.id AND s.stream_id = t.stream_id; + record_transaction_event( + 1, + $fee_total, + $fee_recipient, + NULL + ); }; @@ -376,6 +391,13 @@ CREATE OR REPLACE ACTION insert_metadata( $current_block, $stream_ref ); + + record_transaction_event( + 7, + 0::NUMERIC(78, 0), + NULL, + NULL + ); }; /** diff --git a/internal/migrations/003-primitive-insertion.sql b/internal/migrations/003-primitive-insertion.sql index ddcdc706e..a91f411b2 100644 --- a/internal/migrations/003-primitive-insertion.sql +++ b/internal/migrations/003-primitive-insertion.sql @@ -30,6 +30,9 @@ CREATE OR REPLACE ACTION insert_records( -- Use helper function to avoid expensive for-loop roundtrips $data_provider := helper_lowercase_array($data_provider); $lower_caller TEXT := LOWER(@caller); + $fee_total NUMERIC(78, 0) := 0::NUMERIC(78, 0); + $fee_recipient TEXT := NULL; + $leader_hex TEXT := NULL; -- Get record count (used for both fee calculation and validation) $num_records INT := array_length($data_provider); @@ -49,6 +52,11 @@ CREATE OR REPLACE ACTION insert_records( $fee_per_record := 2000000000000000000::NUMERIC(78, 0); -- 2 TRUF with 18 decimals $total_fee := $fee_per_record * $num_records::NUMERIC(78, 0); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex := encode(@leader_sender, 'hex')::TEXT; + $caller_balance := ethereum_bridge.balance(@caller); IF $caller_balance < $total_fee { @@ -56,8 +64,9 @@ CREATE OR REPLACE ACTION insert_records( ERROR('Insufficient balance for write fee. Required: ' || ($total_fee / 1000000000000000000::NUMERIC(78, 0))::TEXT || ' TRUF for ' || $num_records::TEXT || ' record(s)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $total_fee); + ethereum_bridge.transfer($leader_hex, $total_fee); + $fee_total := $total_fee; + $fee_recipient := '0x' || $leader_hex; } -- ===== END FEE COLLECTION ===== if $num_records != array_length($stream_id) or $num_records != array_length($event_time) or $num_records != array_length($value) { @@ -102,4 +111,11 @@ CREATE OR REPLACE ACTION insert_records( $event_time, $value ); -}; \ No newline at end of file + + record_transaction_event( + 2, + $fee_total, + $fee_recipient, + NULL + ); +}; diff --git a/internal/migrations/004-composed-taxonomy.sql b/internal/migrations/004-composed-taxonomy.sql index ff7027e93..2306c7669 100644 --- a/internal/migrations/004-composed-taxonomy.sql +++ b/internal/migrations/004-composed-taxonomy.sql @@ -15,6 +15,9 @@ CREATE OR REPLACE ACTION insert_taxonomy( $child_data_providers[$i] := LOWER($child_data_providers[$i]); } $lower_caller := LOWER(@caller); + $fee_total NUMERIC(78, 0) := 0::NUMERIC(78, 0); + $fee_recipient TEXT := NULL; + $leader_hex TEXT := NULL; -- ensure it's a composed stream if is_primitive_stream($data_provider, $stream_id) == true { @@ -56,6 +59,11 @@ CREATE OR REPLACE ACTION insert_taxonomy( $fee_per_stream := 2000000000000000000::NUMERIC(78, 0); -- 2 TRUF with 18 decimals $total_fee := $fee_per_stream * $num_children::NUMERIC(78, 0); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex := encode(@leader_sender, 'hex')::TEXT; + $caller_balance := ethereum_bridge.balance(@caller); IF $caller_balance < $total_fee { @@ -63,8 +71,9 @@ CREATE OR REPLACE ACTION insert_taxonomy( ERROR('Insufficient balance for taxonomies creation. Required: ' || ($total_fee / 1000000000000000000::NUMERIC(78, 0))::TEXT || ' TRUF for ' || $num_children::TEXT || ' child stream(s)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $total_fee); + ethereum_bridge.transfer($leader_hex, $total_fee); + $fee_total := $total_fee; + $fee_recipient := '0x' || $leader_hex; } -- ===== END FEE COLLECTION ===== @@ -113,6 +122,13 @@ CREATE OR REPLACE ACTION insert_taxonomy( $child_stream_ref ); } + + record_transaction_event( + 3, + $fee_total, + $fee_recipient, + NULL + ); }; /** diff --git a/internal/migrations/010-get-latest-write-activity.sql b/internal/migrations/010-get-latest-write-activity.sql index 287670289..9197321c9 100644 --- a/internal/migrations/010-get-latest-write-activity.sql +++ b/internal/migrations/010-get-latest-write-activity.sql @@ -1,71 +1,203 @@ -CREATE OR REPLACE ACTION get_last_transactions( - $data_provider TEXT, - $limit_size INT8 -) PUBLIC VIEW RETURNS TABLE( - created_at INT8, - method TEXT -) { - $data_provider := LOWER($data_provider); - IF $limit_size IS NULL { - $limit_size := 6; - } - IF $limit_size <= 0 { - $limit_size := 6; - } - - IF $limit_size > 100 { - ERROR('Limit size cannot exceed 100'); - } - - -- Fetch top limit_size DISTINCT blocks per table, then take top limit_size overall - -- This gets the most recent activity from each table type and combines them - RETURN SELECT created_at, method FROM ( - SELECT created_at, method, ROW_NUMBER() OVER (PARTITION BY created_at ORDER BY priority ASC) AS rn FROM ( - SELECT s.created_at, 'deployStream' AS method, 1 AS priority - FROM ( - SELECT DISTINCT s.created_at - FROM streams s - JOIN data_providers dp ON s.data_provider_id = dp.id - WHERE COALESCE($data_provider, '') = '' OR dp.address = $data_provider - ORDER BY s.created_at DESC - LIMIT $limit_size - ) s - UNION ALL - SELECT pe.created_at, 'insertRecords', 2 - FROM ( - SELECT DISTINCT pe.created_at - FROM primitive_events pe - JOIN streams s ON pe.stream_ref = s.id - JOIN data_providers dp ON s.data_provider_id = dp.id - WHERE COALESCE($data_provider, '') = '' OR dp.address = $data_provider - ORDER BY pe.created_at DESC - LIMIT $limit_size - ) pe - UNION ALL - SELECT t.created_at, 'setTaxonomies', 3 - FROM ( - SELECT DISTINCT t.created_at - FROM taxonomies t - JOIN streams s ON t.stream_ref = s.id - JOIN data_providers dp ON s.data_provider_id = dp.id - WHERE COALESCE($data_provider, '') = '' OR dp.address = $data_provider - ORDER BY t.created_at DESC - LIMIT $limit_size - ) t - UNION ALL - SELECT m.created_at, 'setMetadata', 4 - FROM ( - SELECT DISTINCT m.created_at - FROM metadata m - JOIN streams s ON m.stream_ref = s.id - JOIN data_providers dp ON s.data_provider_id = dp.id - WHERE COALESCE($data_provider, '') = '' OR dp.address = $data_provider - ORDER BY m.created_at DESC - LIMIT $limit_size - ) m - ) AS combined - ) AS ranked - WHERE rn = 1 - ORDER BY created_at DESC - LIMIT $limit_size; -} +/** + * Transaction history views + * + * get_last_transactions_v1 - legacy implementation (no fee/caller metadata) + * get_last_transactions_v2 - ledger-backed implementation (redefined later in migration 027) + * get_last_transactions - temporary wrapper returning the v2 signature but + * still sourcing data from v1. This will be replaced + * with v2 once callers migrate. + */ + +-- This is a legacy implementation that is no longer used. It will be removed in a future migration. +CREATE OR REPLACE ACTION get_last_transactions_v1( + $data_provider TEXT, + $limit_size INT8 +) PUBLIC VIEW RETURNS TABLE( + created_at INT8, + method TEXT +) { + $normalized_provider TEXT := NULL; + IF COALESCE($data_provider, '') != '' { + $normalized_provider := LOWER($data_provider); + } + + IF $limit_size IS NULL { + $limit_size := 6; + } + IF $limit_size <= 0 { + $limit_size := 6; + } + + IF $limit_size > 100 { + ERROR('Limit size cannot exceed 100'); + } + + RETURN SELECT created_at, method FROM ( + SELECT created_at, method, ROW_NUMBER() OVER (PARTITION BY created_at ORDER BY priority ASC) AS rn FROM ( + SELECT s.created_at, 'deployStream' AS method, 1 AS priority + FROM ( + SELECT DISTINCT s.created_at + FROM streams s + JOIN data_providers dp ON s.data_provider_id = dp.id + WHERE COALESCE($normalized_provider, '') = '' OR dp.address = $normalized_provider + ORDER BY s.created_at DESC + LIMIT $limit_size + ) s + UNION ALL + SELECT pe.created_at, 'insertRecords', 2 + FROM ( + SELECT DISTINCT pe.created_at + FROM primitive_events pe + JOIN streams s ON pe.stream_ref = s.id + JOIN data_providers dp ON s.data_provider_id = dp.id + WHERE COALESCE($normalized_provider, '') = '' OR dp.address = $normalized_provider + ORDER BY pe.created_at DESC + LIMIT $limit_size + ) pe + UNION ALL + SELECT t.created_at, 'setTaxonomies', 3 + FROM ( + SELECT DISTINCT t.created_at + FROM taxonomies t + JOIN streams s ON t.stream_ref = s.id + JOIN data_providers dp ON s.data_provider_id = dp.id + WHERE COALESCE($normalized_provider, '') = '' OR dp.address = $normalized_provider + ORDER BY t.created_at DESC + LIMIT $limit_size + ) t + UNION ALL + SELECT m.created_at, 'setMetadata', 4 + FROM ( + SELECT DISTINCT m.created_at + FROM metadata m + JOIN streams s ON m.stream_ref = s.id + JOIN data_providers dp ON s.data_provider_id = dp.id + WHERE COALESCE($normalized_provider, '') = '' OR dp.address = $normalized_provider + ORDER BY m.created_at DESC + LIMIT $limit_size + ) m + ) AS combined + ) AS ranked + WHERE rn = 1 + ORDER BY created_at DESC + LIMIT $limit_size; +}; + +CREATE OR REPLACE ACTION get_last_transactions_v2( + $wallet TEXT, + $limit_size INT8 +) PUBLIC VIEW RETURNS TABLE( + tx_id TEXT, + created_at INT8, + method TEXT, + caller TEXT, + fee_amount NUMERIC(78, 0), + fee_recipient TEXT, + metadata TEXT, + distribution_sequence INT, + distribution_recipient TEXT, + distribution_amount NUMERIC(78, 0) +) { + $normalized_wallet TEXT := NULL; + IF COALESCE($wallet, '') != '' { + $normalized_wallet := LOWER($wallet); + IF NOT check_ethereum_address($normalized_wallet) { + ERROR('Invalid wallet. Must be a valid Ethereum address: ' || $wallet); + } + } + + $limit_val INT := COALESCE($limit_size, 6); + IF $limit_val <= 0 { + $limit_val := 6; + } + + IF $limit_val > 100 { + ERROR('Limit size cannot exceed 100'); + } + + RETURN + WITH limited_events AS ( + SELECT + te.tx_id, + te.block_height, + tm.name AS method, + LOWER(te.caller) AS caller, + te.fee_amount, + te.fee_recipient, + te.metadata + FROM transaction_events te + JOIN transaction_methods tm ON tm.method_id = te.method_id + WHERE + $normalized_wallet IS NULL + OR te.caller = $normalized_wallet + OR (te.fee_recipient IS NOT NULL AND te.fee_recipient = $normalized_wallet) + OR EXISTS ( + SELECT 1 + FROM transaction_event_distributions ted_inner + WHERE ted_inner.tx_id = te.tx_id + AND ted_inner.recipient = $normalized_wallet + ) + ORDER BY te.block_height DESC, te.tx_id DESC + LIMIT $limit_val + ) + SELECT + le.tx_id, + le.block_height, + le.method, + le.caller, + le.fee_amount, + le.fee_recipient, + le.metadata, + COALESCE(ted.sequence, 0) AS distribution_sequence, + ted.recipient AS distribution_recipient, + ted.amount AS distribution_amount + FROM limited_events le + LEFT JOIN transaction_event_distributions ted + ON ted.tx_id = le.tx_id + ORDER BY le.block_height DESC, + le.tx_id DESC, + COALESCE(ted.sequence, 0) ASC; +}; + +CREATE OR REPLACE ACTION get_last_transactions( + $data_provider TEXT, + $limit_size INT8 +) PUBLIC VIEW RETURNS TABLE( + tx_id TEXT, + created_at INT8, + method TEXT, + caller TEXT, + fee_amount NUMERIC(78, 0), + fee_recipient TEXT, + metadata TEXT, + fee_distributions TEXT +) { + $normalized_provider TEXT := NULL; + IF COALESCE($data_provider, '') != '' { + $normalized_provider := LOWER($data_provider); + IF NOT check_ethereum_address($normalized_provider) { + ERROR('Invalid data provider address. Must be a valid Ethereum address: ' || $data_provider); + } + } + + $limit_val INT := COALESCE($limit_size, 6); + IF $limit_val <= 0 { + $limit_val := 6; + } + + IF $limit_val > 100 { + ERROR('Limit size cannot exceed 100'); + } + + RETURN + SELECT + NULL::TEXT AS tx_id, + lt.created_at, + lt.method, + NULL::TEXT AS caller, + NULL::NUMERIC(78, 0) AS fee_amount, + NULL::TEXT AS fee_recipient, + NULL::TEXT AS metadata, + ''::TEXT AS fee_distributions + FROM get_last_transactions_v1($normalized_provider, $limit_val) lt; +}; diff --git a/internal/migrations/024-attestation-actions.sql b/internal/migrations/024-attestation-actions.sql index c333f9e45..5f2b9418d 100644 --- a/internal/migrations/024-attestation-actions.sql +++ b/internal/migrations/024-attestation-actions.sql @@ -151,8 +151,15 @@ CREATE OR REPLACE ACTION request_attestation( $created_height, NULL, NULL, NULL ); --- Queue for signing (no-op on non-leader validators; handled by precompile) -tn_attestation.queue_for_signing(encode($attestation_hash, 'hex')); + -- Queue for signing (no-op on non-leader validators; handled by precompile) + tn_attestation.queue_for_signing(encode($attestation_hash, 'hex')); + + record_transaction_event( + 6, + $attestation_fee, + '0x' || $leader_addr, + NULL + ); RETURN $request_tx_id, $attestation_hash; }; diff --git a/internal/migrations/026-transaction-schemas.sql b/internal/migrations/026-transaction-schemas.sql new file mode 100644 index 000000000..eff9fc4d1 --- /dev/null +++ b/internal/migrations/026-transaction-schemas.sql @@ -0,0 +1,53 @@ +/** + * ============================================================================= + * Transaction Ledger Schema + * ============================================================================= + * + * Defines tables used to record transaction fee events and their per-recipient + * distributions. Helper actions live in 027-transaction-actions.sql. + */ + +CREATE TABLE IF NOT EXISTS transaction_methods ( + method_id INT PRIMARY KEY, + name TEXT NOT NULL UNIQUE +); + +INSERT INTO transaction_methods (method_id, name) VALUES + (1, 'deployStream'), + (2, 'insertRecords'), + (3, 'setTaxonomies'), + (4, 'transferTN'), + (5, 'withdrawTN'), + (6, 'requestAttestation'), + (7, 'setMetadata') +ON CONFLICT (method_id) DO NOTHING; + +CREATE TABLE IF NOT EXISTS transaction_events ( + tx_id TEXT PRIMARY KEY, + block_height INT8 NOT NULL, + method_id INT NOT NULL REFERENCES transaction_methods(method_id), + caller TEXT NOT NULL, + fee_amount NUMERIC(78, 0) NOT NULL DEFAULT 0, + fee_recipient TEXT, + metadata TEXT -- future could be JSONB when supported +); + +CREATE INDEX IF NOT EXISTS transaction_events_block_idx + ON transaction_events (block_height, tx_id); + +CREATE INDEX IF NOT EXISTS transaction_events_caller_idx + ON transaction_events (caller, block_height, tx_id); + +-- this table is the single source of truth for per-recipient breakdowns; downstream readers aggregate from it +-- E.g. attestations will soon pay a slice of the fee to each data provider that contributed to the data +CREATE TABLE IF NOT EXISTS transaction_event_distributions ( + tx_id TEXT NOT NULL REFERENCES transaction_events(tx_id) ON DELETE CASCADE, + sequence INT NOT NULL, + recipient TEXT NOT NULL, + amount NUMERIC(78, 0) NOT NULL, + note TEXT, + PRIMARY KEY (tx_id, sequence) +); + +CREATE INDEX IF NOT EXISTS tx_event_dist_rec_idx + ON transaction_event_distributions (recipient, tx_id); diff --git a/internal/migrations/027-transaction-actions.sql b/internal/migrations/027-transaction-actions.sql new file mode 100644 index 000000000..d9bc90a48 --- /dev/null +++ b/internal/migrations/027-transaction-actions.sql @@ -0,0 +1,294 @@ +CREATE OR REPLACE ACTION record_transaction_event( + -- check internal/migrations/026-transaction-schemas.sql for method_id values + $method_id INT, + $fee_amount NUMERIC(78, 0), + $fee_recipient TEXT, + $metadata TEXT +) PRIVATE { + IF $method_id IS NULL { + ERROR('record_transaction_event: method_id is required'); + } + + IF @txid IS NULL { + ERROR('record_transaction_event: missing transaction id'); + } + + $tx_hex TEXT := LOWER(@txid); + IF LENGTH($tx_hex) != 64 { + ERROR('record_transaction_event: txid must be 32-byte hex string'); + } + $tx_id TEXT := '0x' || $tx_hex; + + $caller_lower TEXT := LOWER(@caller); + IF NOT check_ethereum_address($caller_lower) { + ERROR('record_transaction_event: caller is not a valid address: ' || $caller_lower); + } + + $recipient_lower TEXT := NULL; + IF $fee_recipient IS NOT NULL AND $fee_recipient != '' { + $recipient_lower := LOWER($fee_recipient); + IF NOT check_ethereum_address($recipient_lower) { + ERROR('record_transaction_event: fee_recipient is not a valid address: ' || $fee_recipient); + } + } + + $amount NUMERIC(78, 0) := COALESCE($fee_amount, 0::NUMERIC(78, 0)); + + $event_exists BOOL := FALSE; + FOR $row IN + SELECT 1 + FROM transaction_events + WHERE tx_id = $tx_id + LIMIT 1 + { + $event_exists := TRUE; + } + + IF !$event_exists { + -- Ledger rows are idempotent: only create the parent record once even if + -- record_transaction_event is invoked multiple times within the same tx. + INSERT INTO transaction_events ( + tx_id, + block_height, + method_id, + caller, + fee_amount, + fee_recipient, + metadata + ) VALUES ( + $tx_id, + @height, + $method_id, + $caller_lower, + $amount, + $recipient_lower, + $metadata + ); + IF $recipient_lower IS NOT NULL AND $amount > 0::NUMERIC(78, 0) { + -- transaction_event_distributions is the single source of truth for + -- per-recipient breakdowns; downstream readers aggregate from it. + INSERT INTO transaction_event_distributions ( + tx_id, + sequence, + recipient, + amount, + note + ) VALUES ( + $tx_id, + 1, + $recipient_lower, + $amount, + NULL + ); + } + } +}; + +-- CREATE OR REPLACE ACTION record_tx_event_split( +-- $method_id INT, +-- $fee_amount NUMERIC(78, 0), +-- $recipients TEXT[], +-- $amounts NUMERIC(78, 0)[], +-- $metadata TEXT +-- ) PRIVATE { +-- -- TODO: Implement multi-recipient fee recording in a single call. +-- -- This action should validate array lengths, ensure the amounts sum to +-- -- $fee_amount, and write the distribution rows atomically. +-- }; + +CREATE OR REPLACE ACTION get_transaction_event( + $tx_id TEXT +) PUBLIC VIEW RETURNS ( + tx_id TEXT, + block_height INT8, + method TEXT, + caller TEXT, + fee_amount NUMERIC(78, 0), + fee_recipient TEXT, + metadata TEXT, + fee_distributions TEXT +) { + IF $tx_id IS NULL { + ERROR('tx_id is required'); + } + + $tx_clean TEXT := LOWER($tx_id); + IF substring($tx_clean, 1, 2) != '0x' { + $tx_clean := '0x' || $tx_clean; + } + + $out_tx_id TEXT := NULL; + $out_block_height INT8 := 0; + $out_method TEXT := NULL; + $out_caller TEXT := NULL; + $out_fee_amount NUMERIC(78, 0) := 0::NUMERIC(78, 0); + $out_fee_recipient TEXT := NULL; + $out_metadata TEXT := NULL; + + FOR $row IN + SELECT + te.tx_id, + te.block_height, + tm.name AS method, + te.caller, + te.fee_amount, + te.fee_recipient, + te.metadata + FROM transaction_events te + JOIN transaction_methods tm ON tm.method_id = te.method_id + WHERE te.tx_id = $tx_clean + LIMIT 1 + { + $out_tx_id := $row.tx_id; + $out_block_height := $row.block_height; + $out_method := $row.method; + $out_caller := $row.caller; + $out_fee_amount := $row.fee_amount; + $out_fee_recipient := $row.fee_recipient; + $out_metadata := $row.metadata; + } + + IF $out_tx_id IS NULL { + RETURN; + } + + $distribution TEXT := ''; + FOR $dist IN + SELECT recipient, amount + FROM transaction_event_distributions + WHERE tx_id = $tx_clean + ORDER BY sequence ASC + { + $entry TEXT := $dist.recipient || ':' || $dist.amount::TEXT; + IF $distribution = '' { + $distribution := $entry; + } ELSE { + $distribution := $distribution || ',' || $entry; + } + } + + RETURN + $out_tx_id, + $out_block_height, + $out_method, + $out_caller, + $out_fee_amount, + $out_fee_recipient, + $out_metadata, + $distribution; +}; + + +CREATE OR REPLACE ACTION list_transaction_fees( + $wallet TEXT, + $mode TEXT DEFAULT 'paid', + $limit INT DEFAULT 20, + $offset INT DEFAULT 0 +) PUBLIC VIEW RETURNS TABLE ( + tx_id TEXT, + block_height INT8, + method TEXT, + caller TEXT, + total_fee NUMERIC(78, 0), + fee_recipient TEXT, + metadata TEXT, + distribution_sequence INT, + distribution_recipient TEXT, + distribution_amount NUMERIC(78, 0) +) { + IF $wallet IS NULL OR trim($wallet) = '' { + ERROR('wallet is required'); + } + + $wallet_lower TEXT := LOWER($wallet); + IF NOT check_ethereum_address($wallet_lower) { + ERROR('wallet must be a valid Ethereum address: ' || $wallet); + } + + $mode_normalized TEXT := LOWER(COALESCE($mode, 'paid')); + IF $mode_normalized != 'paid' AND $mode_normalized != 'received' AND $mode_normalized != 'both' { + ERROR('mode must be one of paid, received, or both'); + } + + $limit_val INT := COALESCE($limit, 20); + IF $limit_val <= 0 { + $limit_val := 20; + } + IF $limit_val > 1000 { + ERROR('limit cannot exceed 1000'); + } + + $offset_val INT := COALESCE($offset, 0); + IF $offset_val < 0 { + $offset_val := 0; + } + + $mode_is_paid BOOL := $mode_normalized = 'paid'; + $mode_is_received BOOL := $mode_normalized = 'received'; + $mode_is_both BOOL := $mode_normalized = 'both'; + + RETURN SELECT + fe.tx_id, + fe.block_height, + fe.method, + fe.caller, + fe.fee_amount, + fe.fee_recipient, + fe.metadata, + COALESCE(ted.sequence, 0) AS distribution_sequence, + ted.recipient AS distribution_recipient, + ted.amount AS distribution_amount + FROM ( + SELECT + te.tx_id, + te.block_height, + tm.name AS method, + LOWER(te.caller) AS caller, + te.fee_amount, + te.fee_recipient, + te.metadata + FROM transaction_events te + JOIN transaction_methods tm ON tm.method_id = te.method_id + WHERE + ($mode_is_paid AND te.caller = $wallet_lower) + OR ($mode_is_received AND ( + te.fee_recipient = $wallet_lower + OR EXISTS ( + SELECT 1 + FROM transaction_event_distributions ted_inner + WHERE ted_inner.tx_id = te.tx_id + AND ted_inner.recipient = $wallet_lower + ) + )) + OR ($mode_is_both AND ( + te.caller = $wallet_lower + OR te.fee_recipient = $wallet_lower + OR EXISTS ( + SELECT 1 + FROM transaction_event_distributions ted_inner + WHERE ted_inner.tx_id = te.tx_id + AND ted_inner.recipient = $wallet_lower + ) + )) + ORDER BY te.block_height DESC, te.tx_id DESC + LIMIT $limit_val + OFFSET $offset_val + ) fe + LEFT JOIN ( + SELECT + tx_id, + sequence, + recipient, + amount + FROM transaction_event_distributions + WHERE + NOT $mode_is_received + OR recipient = $wallet_lower + ) ted + ON ted.tx_id = fe.tx_id + ORDER BY fe.block_height DESC, + fe.tx_id DESC, + COALESCE(ted.sequence, 0) ASC; +}; + + diff --git a/internal/migrations/901-utilities.sql b/internal/migrations/901-utilities.sql index 1531a8c51..487b02eda 100644 --- a/internal/migrations/901-utilities.sql +++ b/internal/migrations/901-utilities.sql @@ -107,7 +107,6 @@ CREATE OR REPLACE ACTION helper_lowercase_array( ) t; }; - CREATE OR REPLACE ACTION helper_check_cache( $data_provider TEXT, $stream_id TEXT, diff --git a/internal/migrations/erc20-bridge/001-actions.sql b/internal/migrations/erc20-bridge/001-actions.sql index 6239ea94c..81d494000 100644 --- a/internal/migrations/erc20-bridge/001-actions.sql +++ b/internal/migrations/erc20-bridge/001-actions.sql @@ -36,12 +36,24 @@ CREATE OR REPLACE ACTION sepolia_bridge_tokens($recipient TEXT DEFAULT NULL, $am ($withdrawal_fee / '1000000000000000000'::NUMERIC(78, 0))::TEXT || ' TRUF fee)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $withdrawal_fee); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex TEXT := encode(@leader_sender, 'hex')::TEXT; + ethereum_bridge.transfer($leader_hex, $withdrawal_fee); -- ===== END FEE COLLECTION ===== + $bridge_recipient TEXT := LOWER(COALESCE($recipient, @caller)); + -- Execute withdrawal using the bridge extension - sepolia_bridge.bridge(COALESCE($recipient, @caller), $withdrawal_amount); + sepolia_bridge.bridge($bridge_recipient, $withdrawal_amount); + + record_transaction_event( + 5, + $withdrawal_fee, + '0x' || $leader_hex, + NULL + ); }; -- MAINNET @@ -82,10 +94,22 @@ CREATE OR REPLACE ACTION ethereum_bridge_tokens($recipient TEXT DEFAULT NULL, $a ($withdrawal_fee / '1000000000000000000'::NUMERIC(78, 0))::TEXT || ' TRUF fee)'); } - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $withdrawal_fee); + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex TEXT := encode(@leader_sender, 'hex')::TEXT; + ethereum_bridge.transfer($leader_hex, $withdrawal_fee); -- ===== END FEE COLLECTION ===== + $bridge_recipient TEXT := LOWER(COALESCE($recipient, @caller)); + -- Execute withdrawal using the bridge extension - ethereum_bridge.bridge(COALESCE($recipient, @caller), $withdrawal_amount); + ethereum_bridge.bridge($bridge_recipient, $withdrawal_amount); + + record_transaction_event( + 5, + $withdrawal_fee, + '0x' || $leader_hex, + NULL + ); }; diff --git a/internal/migrations/erc20-bridge/002-public-transfer-actions.sql b/internal/migrations/erc20-bridge/002-public-transfer-actions.sql index 7a55b715b..4bbc037d7 100644 --- a/internal/migrations/erc20-bridge/002-public-transfer-actions.sql +++ b/internal/migrations/erc20-bridge/002-public-transfer-actions.sql @@ -1,58 +1,83 @@ --- Public Transfer Actions for TRUF Token Operations --- Enables users to transfer TRUF tokens between addresses on TN - --- SEPOLIA TESTNET TRANSFERS -CREATE OR REPLACE ACTION sepolia_transfer($to_address TEXT, $amount TEXT) PUBLIC { - -- Validate Ethereum address format - if NOT check_ethereum_address($to_address) { - ERROR('Invalid Ethereum address format. Must be a valid Ethereum address: ' || $to_address); - } - - -- Validate amount is positive - if $amount::NUMERIC(78, 0) <= 0::NUMERIC(78, 0) { - ERROR('Transfer amount must be positive'); - } - - -- Fee - $fee := 1000000000000000000::NUMERIC(78, 0); -- 1 TRUF with 18 decimals - $caller_balance := COALESCE(sepolia_bridge.balance(@caller), 0::NUMERIC(78, 0)); - - IF ($caller_balance < ($amount::NUMERIC(78, 0) + $fee)) { - ERROR('Insufficient balance for transfer. Requires an extra 1 TRUF fee on top of the transfer amount'); - } - - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - sepolia_bridge.transfer($leader_addr, $fee); - - -- Execute transfer using the bridge extension - sepolia_bridge.transfer($to_address, $amount::NUMERIC(78, 0)); -}; - - --- MAINNET TRANSFERS -CREATE OR REPLACE ACTION ethereum_transfer($to_address TEXT, $amount TEXT) PUBLIC { - -- Validate Ethereum address format - if NOT check_ethereum_address($to_address) { - ERROR('Invalid Ethereum address format. Must be a valid Ethereum address: ' || $to_address); - } - - -- Validate amount is positive - if $amount::NUMERIC(78, 0) <= 0::NUMERIC(78, 0) { - ERROR('Transfer amount must be positive'); - } - - -- Fee - $fee := 1000000000000000000::NUMERIC(78, 0); -- 1 TRUF with 18 decimals - $caller_balance := COALESCE(ethereum_bridge.balance(@caller), 0::NUMERIC(78, 0)); - - IF ($caller_balance < ($amount::NUMERIC(78, 0) + $fee)) { - ERROR('Insufficient balance for transfer. Requires an extra 1 TRUF fee on top of the transfer amount'); - } - - $leader_addr TEXT := encode(@leader_sender, 'hex')::TEXT; - ethereum_bridge.transfer($leader_addr, $fee); - - -- Execute transfer using the bridge extension - ethereum_bridge.transfer($to_address, $amount::NUMERIC(78, 0)); -}; - +-- Public Transfer Actions for TRUF Token Operations +-- Enables users to transfer TRUF tokens between addresses on TN + +-- SEPOLIA TESTNET TRANSFERS +CREATE OR REPLACE ACTION sepolia_transfer($to_address TEXT, $amount TEXT) PUBLIC { + $recipient_lower TEXT := LOWER($to_address); + + -- Validate Ethereum address format + if NOT check_ethereum_address($recipient_lower) { + ERROR('Invalid Ethereum address format. Must be a valid Ethereum address: ' || $to_address); + } + + -- Validate amount is positive + if $amount::NUMERIC(78, 0) <= 0::NUMERIC(78, 0) { + ERROR('Transfer amount must be positive'); + } + + -- Fee + $fee := 1000000000000000000::NUMERIC(78, 0); -- 1 TRUF with 18 decimals + $caller_balance := COALESCE(sepolia_bridge.balance(@caller), 0::NUMERIC(78, 0)); + + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex TEXT := encode(@leader_sender, 'hex')::TEXT; + + IF ($caller_balance < ($amount::NUMERIC(78, 0) + $fee)) { + ERROR('Insufficient balance for transfer. Requires an extra 1 TRUF fee on top of the transfer amount'); + } + + sepolia_bridge.transfer($leader_hex, $fee); + + -- Execute transfer using the bridge extension + sepolia_bridge.transfer($to_address, $amount::NUMERIC(78, 0)); + + record_transaction_event( + 4, + $fee, + '0x' || $leader_hex, + NULL + ); +}; + + +-- MAINNET TRANSFERS +CREATE OR REPLACE ACTION ethereum_transfer($to_address TEXT, $amount TEXT) PUBLIC { + $recipient_lower TEXT := LOWER($to_address); + + -- Validate Ethereum address format + if NOT check_ethereum_address($recipient_lower) { + ERROR('Invalid Ethereum address format. Must be a valid Ethereum address: ' || $to_address); + } + + -- Validate amount is positive + if $amount::NUMERIC(78, 0) <= 0::NUMERIC(78, 0) { + ERROR('Transfer amount must be positive'); + } + + -- Fee + $fee := 1000000000000000000::NUMERIC(78, 0); -- 1 TRUF with 18 decimals + $caller_balance := COALESCE(ethereum_bridge.balance(@caller), 0::NUMERIC(78, 0)); + + IF @leader_sender IS NULL { + ERROR('Leader address not available for fee transfer'); + } + $leader_hex TEXT := encode(@leader_sender, 'hex')::TEXT; + + IF ($caller_balance < ($amount::NUMERIC(78, 0) + $fee)) { + ERROR('Insufficient balance for transfer. Requires an extra 1 TRUF fee on top of the transfer amount'); + } + + ethereum_bridge.transfer($leader_hex, $fee); + + -- Execute transfer using the bridge extension + ethereum_bridge.transfer($to_address, $amount::NUMERIC(78, 0)); + + record_transaction_event( + 4, + $fee, + '0x' || $leader_hex, + NULL + ); +}; diff --git a/tests/streams/primitive_batch_insert_alignment_test.go b/tests/streams/primitive_batch_insert_alignment_test.go index 5883f3da7..acde64bfb 100644 --- a/tests/streams/primitive_batch_insert_alignment_test.go +++ b/tests/streams/primitive_batch_insert_alignment_test.go @@ -21,7 +21,7 @@ import ( // TestPrimitiveBatchInsertAlignment ensures that batch insertion via insert_records maps values // to the correct streams when multiple streams are processed together. -// +// // This is a regression test for a bug where get_stream_ids() and helper_lowercase_array() // were using non-deterministic array aggregation, causing stream_ref arrays to become // misaligned with input data_provider/stream_id arrays. This resulted in records being @@ -31,7 +31,7 @@ import ( // contains only its own data by checking specific value/stream_ref combinations. func TestPrimitiveBatchInsertAlignment(t *testing.T) { testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ - Name: "primitive_batch_insert_alignment_test", + Name: "primitive_batch_insert_alignment_test", SeedStatements: migrations.GetSeedScriptStatements(), FunctionTests: []kwilTesting.TestFunc{ testBatchAlignment(t), @@ -76,14 +76,7 @@ func testBatchAlignment(t *testing.T) func(ctx context.Context, platform *kwilTe } // Execute insert_records directly in one call - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - TxID: platform.Txid(), - Signer: deployer.Bytes(), - Caller: deployer.Address(), - } - engineContext := &common.EngineContext{TxContext: txContext} + engineContext := setup.NewEngineContext(ctx, platform, deployer, 1) r, err := platform.Engine.Call(engineContext, platform.DB, "", "insert_records", []any{ dataProviders, diff --git a/tests/streams/transaction_events_ledger_test.go b/tests/streams/transaction_events_ledger_test.go new file mode 100644 index 000000000..0ab50400c --- /dev/null +++ b/tests/streams/transaction_events_ledger_test.go @@ -0,0 +1,727 @@ +//go:build kwiltest + +package tests + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + kwilTypes "github.com/trufnetwork/kwil-db/core/types" + erc20shim "github.com/trufnetwork/kwil-db/node/exts/erc20-bridge/erc20" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/extensions/tn_utils" + "github.com/trufnetwork/node/internal/migrations" + testutils "github.com/trufnetwork/node/tests/streams/utils" + testerc20 "github.com/trufnetwork/node/tests/streams/utils/erc20" + "github.com/trufnetwork/node/tests/streams/utils/setup" + sdkTypes "github.com/trufnetwork/sdk-go/core/types" + "github.com/trufnetwork/sdk-go/core/util" +) + +const ( + ledgerChain = "sepolia" + ledgerEscrow = "0x502430eD0BbE0f230215870c9C2853e126eE5Ae3" + ledgerERC20 = "0x2222222222222222222222222222222222222222" + ledgerExtensionAlias = "sepolia_bridge" + + feeOneTRUF = "1000000000000000000" + feeTwoTRUF = "2000000000000000000" + feeFourTRUF = "4000000000000000000" + feeFortyTRUF = "40000000000000000000" + transferAmount = "5000000000000000000" + withdrawAmount = "10000000000000000000" + initialUserFunds = "200000000000000000000" +) + +var ledgerPointCounter int64 = 20000 + +func TestTransactionEventsLedger(t *testing.T) { + testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "LEDGER_FEE01_TransactionEventsLedger", + SeedStatements: migrations.GetSeedScriptStatements(), + FunctionTests: []kwilTesting.TestFunc{ + runTransactionEventsLedgerScenario(t), + }, + }, testutils.GetTestOptionsWithCache()) +} + +func runTransactionEventsLedgerScenario(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + systemAdmin := util.Unsafe_NewEthereumAddressFromString("0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf") + platform.Deployer = systemAdmin.Bytes() + + require.NoError(t, setup.AddMemberToRoleBypass(ctx, platform, "system", "network_writers_manager", systemAdmin.Address())) + require.NoError(t, setup.CreateDataProvider(ctx, platform, systemAdmin.Address())) + + err := erc20shim.ForTestingSeedAndActivateInstance(ctx, platform, ledgerChain, ledgerEscrow, ledgerERC20, 18, 60, ledgerExtensionAlias) + if err != nil { + if !strings.Contains(err.Error(), "alias \"sepolia_bridge\" already exists") { + require.NoError(t, err, "failed to seed ERC-20 bridge instance") + } + require.NoError(t, erc20shim.ForTestingInitializeExtension(ctx, platform), "failed to reinitialize ERC-20 bridge extension") + } + require.NoError(t, erc20shim.ForTestingInitializeExtension(ctx, platform)) + + actorVal := util.Unsafe_NewEthereumAddressFromString("0x9999999999999999999999999999999999999999") + actor := &actorVal + require.NoError(t, setup.CreateDataProviderWithoutRole(ctx, platform, actor.Address())) + require.NoError(t, ledgerGiveBalance(ctx, platform, actor.Address(), initialUserFunds)) + + receiverVal := util.Unsafe_NewEthereumAddressFromString("0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") + receiver := &receiverVal + bonusRecipientVal := util.Unsafe_NewEthereumAddressFromString("0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB") + bonusRecipientLower := strings.ToLower(bonusRecipientVal.Address()) + + attestationStream := util.GenerateStreamId("ledger_attestation_stream") + require.NoError(t, setup.CreateStream(ctx, platform, setup.StreamInfo{ + Type: setup.ContractTypePrimitive, + Locator: sdkTypes.StreamLocator{ + StreamId: attestationStream, + DataProvider: systemAdmin, + }, + })) + + userLower := strings.ToLower(actor.Address()) + height := int64(5) + + primitiveStream := util.GenerateStreamId("ledger_primitive_stream") + composedStream := util.GenerateStreamId("ledger_composed_stream") + + createLeaderPub, createLeaderAddr := newLeader(t) + createTx, err := callActionWithLeader(ctx, platform, actor, createLeaderPub, height, "create_streams", []any{ + []string{primitiveStream.String(), composedStream.String()}, + []string{"primitive", "composed"}, + }) + require.NoError(t, err) + height++ + + insertLeaderPub, insertLeaderAddr := newLeader(t) + insertValue, err := kwilTypes.ParseDecimalExplicit("10.5", 36, 18) + require.NoError(t, err) + insertTx, err := callActionWithLeader(ctx, platform, actor, insertLeaderPub, height, "insert_records", []any{ + []string{userLower}, + []string{primitiveStream.String()}, + []int64{1000}, + []*kwilTypes.Decimal{insertValue}, + }) + require.NoError(t, err) + height++ + + tableCheck, err := platform.DB.Execute(ctx, + `SELECT table_schema::TEXT + FROM information_schema.tables + WHERE table_name = 'transaction_event_distributions'`, + ) + require.NoError(t, err) + require.NotEmpty(t, tableCheck.Rows, "transaction_event_distributions table missing") + + schemaName, ok := tableCheck.Rows[0][0].(string) + require.True(t, ok, "expected schema name as string") + t.Logf("ledger distribution table schema: %s", schemaName) + + updateSQL := fmt.Sprintf(` + UPDATE %s.transaction_event_distributions + SET amount = $1::NUMERIC(78, 0) + WHERE tx_id = $2 AND sequence = 1`, schemaName) + _, err = platform.DB.Execute(ctx, + updateSQL, + feeOneTRUF, + insertTx, + ) + require.NoError(t, err) + + insertSQL := fmt.Sprintf(` + INSERT INTO %s.transaction_event_distributions (tx_id, sequence, recipient, amount, note) + VALUES ($1, $2, $3, $4::NUMERIC(78, 0), NULL)`, schemaName) + _, err = platform.DB.Execute(ctx, + insertSQL, + insertTx, + 2, + bonusRecipientLower, + feeOneTRUF, + ) + require.NoError(t, err) + + taxLeaderPub, taxLeaderAddr := newLeader(t) + weight, err := kwilTypes.ParseDecimalExplicit("1.0", 36, 18) + require.NoError(t, err) + taxTx, err := callActionWithLeader(ctx, platform, actor, taxLeaderPub, height, "insert_taxonomy", []any{ + userLower, + composedStream.String(), + []string{userLower}, + []string{primitiveStream.String()}, + []*kwilTypes.Decimal{weight}, + nil, + }) + require.NoError(t, err) + height++ + + transferLeaderPub, transferLeaderAddr := newLeader(t) + transferTx, err := callActionWithLeader(ctx, platform, actor, transferLeaderPub, height, "sepolia_transfer", []any{ + receiver.Address(), + transferAmount, + }) + require.NoError(t, err) + height++ + + withdrawLeaderPub, withdrawLeaderAddr := newLeader(t) + withdrawTx, err := callActionWithLeader(ctx, platform, actor, withdrawLeaderPub, height, "sepolia_bridge_tokens", []any{ + actor.Address(), + withdrawAmount, + }) + require.NoError(t, err) + height++ + + require.NoError(t, setup.AddMemberToRoleBypass(ctx, platform, "system", "network_writer", actor.Address())) + + argsBytes, err := tn_utils.EncodeActionArgs([]any{ + strings.ToLower(systemAdmin.Address()), + attestationStream.String(), + int64(0), + int64(999), + int64(99999), + false, + }) + require.NoError(t, err) + + attLeaderPub, attLeaderAddr := newLeader(t) + attTx, err := callActionWithLeader(ctx, platform, actor, attLeaderPub, height, "request_attestation", []any{ + strings.ToLower(systemAdmin.Address()), + attestationStream.String(), + "get_record", + argsBytes, + false, + nil, + }) + require.NoError(t, err) + + assertNoMetadata := func(meta metadataMap) { + require.Empty(t, meta, "expected no metadata for ledger event") + } + + expected := map[string]ledgerExpectation{ + createTx: { + method: "deployStream", + fee: feeFourTRUF, + feeRecipient: createLeaderAddr, + feeDistributions: []string{ + buildDistribution(createLeaderAddr, feeFourTRUF), + }, + assertMetadata: assertNoMetadata, + }, + insertTx: { + method: "insertRecords", + fee: feeTwoTRUF, + feeRecipient: insertLeaderAddr, + feeDistributions: []string{ + buildDistribution(insertLeaderAddr, feeOneTRUF), + buildDistribution(bonusRecipientLower, feeOneTRUF), + }, + assertMetadata: assertNoMetadata, + }, + taxTx: { + method: "setTaxonomies", + fee: feeTwoTRUF, + feeRecipient: taxLeaderAddr, + feeDistributions: []string{ + buildDistribution(taxLeaderAddr, feeTwoTRUF), + }, + assertMetadata: assertNoMetadata, + }, + transferTx: { + method: "transferTN", + fee: feeOneTRUF, + feeRecipient: transferLeaderAddr, + feeDistributions: []string{ + buildDistribution(transferLeaderAddr, feeOneTRUF), + }, + assertMetadata: assertNoMetadata, + }, + withdrawTx: { + method: "withdrawTN", + fee: feeFortyTRUF, + feeRecipient: withdrawLeaderAddr, + feeDistributions: []string{ + buildDistribution(withdrawLeaderAddr, feeFortyTRUF), + }, + assertMetadata: assertNoMetadata, + }, + attTx: { + method: "requestAttestation", + fee: feeFortyTRUF, + feeRecipient: attLeaderAddr, + feeDistributions: []string{ + buildDistribution(attLeaderAddr, feeFortyTRUF), + }, + assertMetadata: assertNoMetadata, + }, + } + + buildExpectedCounts := func() (map[string]map[string]int, int) { + counts := make(map[string]map[string]int, len(expected)) + total := 0 + for txID, exp := range expected { + distList := exp.feeDistributions + if len(distList) == 0 { + distList = []string{""} + } + counts[txID] = make(map[string]int, len(distList)) + for _, dist := range distList { + counts[txID][dist]++ + total++ + } + } + return counts, total + } + + assertLedgerRows := func(rows []ledgerRow) { + counts, total := buildExpectedCounts() + require.Len(t, rows, total) + seen := make(map[string]map[string]int, len(expected)) + for txID := range expected { + seen[txID] = make(map[string]int) + } + for _, row := range rows { + exp, ok := expected[row.TxID] + require.True(t, ok, "unexpected tx in results: %s", row.TxID) + require.Equal(t, exp.method, row.Method) + require.Equal(t, userLower, row.Caller) + require.Equal(t, exp.fee, row.FeeAmount) + require.Equal(t, exp.feeRecipient, row.FeeRecipient) + pair := buildDistribution(row.DistributionRecipient, row.DistributionAmount) + require.Contains(t, counts[row.TxID], pair, "unexpected distribution %s for tx %s", pair, row.TxID) + seen[row.TxID][pair]++ + t.Logf("metadata for %s (%s): raw=%s parsed=%v", row.TxID, row.Method, row.RawMetadata, row.Metadata) + exp.assertMetadata(row.Metadata) + } + for txID, expectedCounts := range counts { + for dist, want := range expectedCounts { + require.Equal(t, want, seen[txID][dist], "distribution %s for tx %s missing or duplicated", dist, txID) + } + } + } + + paidRows, err := fetchTransactionFees(ctx, platform, actor.Address(), userLower, "paid") + require.NoError(t, err) + assertLedgerRows(paidRows) + + bothRows, err := fetchTransactionFees(ctx, platform, actor.Address(), userLower, "both") + require.NoError(t, err) + assertLedgerRows(bothRows) + + withdrawReceivedRows, err := fetchTransactionFees(ctx, platform, actor.Address(), withdrawLeaderAddr, "received") + require.NoError(t, err) + require.Len(t, withdrawReceivedRows, 1) + require.Equal(t, withdrawTx, withdrawReceivedRows[0].TxID) + require.Equal(t, withdrawLeaderAddr, withdrawReceivedRows[0].FeeRecipient) + require.Equal(t, withdrawLeaderAddr, withdrawReceivedRows[0].DistributionRecipient) + require.Equal(t, feeFortyTRUF, withdrawReceivedRows[0].DistributionAmount) + require.Equal(t, userLower, withdrawReceivedRows[0].Caller) + + insertLeaderReceivedRows, err := fetchTransactionFees(ctx, platform, actor.Address(), insertLeaderAddr, "received") + require.NoError(t, err) + require.Len(t, insertLeaderReceivedRows, 1) + require.Equal(t, insertTx, insertLeaderReceivedRows[0].TxID) + require.Equal(t, insertLeaderAddr, insertLeaderReceivedRows[0].FeeRecipient) + require.Equal(t, insertLeaderAddr, insertLeaderReceivedRows[0].DistributionRecipient) + require.Equal(t, feeOneTRUF, insertLeaderReceivedRows[0].DistributionAmount) + + bonusReceivedRows, err := fetchTransactionFees(ctx, platform, actor.Address(), bonusRecipientLower, "received") + require.NoError(t, err) + require.Len(t, bonusReceivedRows, 1) + require.Equal(t, insertTx, bonusReceivedRows[0].TxID) + require.Equal(t, insertLeaderAddr, bonusReceivedRows[0].FeeRecipient) + require.Equal(t, bonusRecipientLower, bonusReceivedRows[0].DistributionRecipient) + require.Equal(t, feeOneTRUF, bonusReceivedRows[0].DistributionAmount) + + lastTxRows, err := fetchLastTransactions(ctx, platform, actor.Address(), userLower, int64(len(expected))) + require.NoError(t, err) + assertLedgerRows(lastTxRows) + + withdrawHistoryRows, err := fetchLastTransactions(ctx, platform, actor.Address(), withdrawLeaderAddr, 10) + require.NoError(t, err) + require.NotEmpty(t, withdrawHistoryRows) + require.Equal(t, withdrawTx, withdrawHistoryRows[0].TxID) + + bonusHistoryRows, err := fetchLastTransactions(ctx, platform, actor.Address(), bonusRecipientLower, 10) + require.NoError(t, err) + require.NotEmpty(t, bonusHistoryRows) + foundBonus := false + for _, row := range bonusHistoryRows { + if row.DistributionRecipient == bonusRecipientLower { + require.Equal(t, insertTx, row.TxID) + require.Equal(t, feeOneTRUF, row.DistributionAmount) + foundBonus = true + break + } + } + require.True(t, foundBonus, "expected to find distribution row for bonus recipient") + + legacyRows, err := fetchLegacyLastTransactions(ctx, platform, actor.Address(), userLower, int64(len(expected))) + require.NoError(t, err) + require.NotEmpty(t, legacyRows) + + methodSet := make(map[string]struct{}) + for _, exp := range expected { + methodSet[exp.method] = struct{}{} + } + + for _, row := range legacyRows { + require.Greater(t, row.CreatedAt, int64(0)) + _, ok := methodSet[row.Method] + require.True(t, ok, "unexpected method in legacy view: %s", row.Method) + } + + for txID, exp := range expected { + eventRow, err := fetchTransactionEvent(ctx, platform, actor.Address(), txID) + require.NoError(t, err, "failed to fetch transaction event for %s", txID) + require.Equal(t, exp.method, eventRow.Method) + require.Equal(t, exp.fee, eventRow.FeeAmount) + require.Equal(t, exp.feeRecipient, eventRow.FeeRecipient) + expectedAggregate := strings.Join(exp.feeDistributions, ",") + require.Equal(t, expectedAggregate, eventRow.FeeDistributions) + exp.assertMetadata(eventRow.Metadata) + } + + return nil + } +} + +type ledgerExpectation struct { + method string + fee string + feeRecipient string + feeDistributions []string + assertMetadata func(meta metadataMap) +} + +type legacyRow struct { + CreatedAt int64 + Method string +} + +type ledgerRow struct { + TxID string + CreatedAt int64 + Method string + Caller string + FeeAmount string + FeeRecipient string + Metadata metadataMap + FeeDistributions string + DistributionSequence int + DistributionRecipient string + DistributionAmount string + RawMetadata string +} + +func newLeader(t *testing.T) (*crypto.Secp256k1PublicKey, string) { + t.Helper() + _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) + require.NoError(t, err) + pub := pubGeneric.(*crypto.Secp256k1PublicKey) + return pub, addressFromPub(pub) +} + +func addressFromPub(pub *crypto.Secp256k1PublicKey) string { + return fmt.Sprintf("0x%x", crypto.EthereumAddressFromPubKey(pub)) +} + +func callActionWithLeader(ctx context.Context, platform *kwilTesting.Platform, signer *util.EthereumAddress, leaderPub *crypto.Secp256k1PublicKey, height int64, action string, args []any) (string, error) { + txID := strings.ToLower(platform.Txid()) + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: height, + Proposer: leaderPub, + }, + Signer: signer.Bytes(), + Caller: signer.Address(), + TxID: txID, + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx} + res, err := platform.Engine.Call(engineCtx, platform.DB, "", action, args, func(row *common.Row) error { return nil }) + if err != nil { + return "", err + } + if res != nil && res.Error != nil { + return "", res.Error + } + return "0x" + txID, nil +} + +func fetchTransactionFees(ctx context.Context, platform *kwilTesting.Platform, caller string, wallet string, mode string) ([]ledgerRow, error) { + rows := make([]ledgerRow, 0, 8) + args := []any{strings.ToLower(wallet), mode, 20, 0} + err := callView(ctx, platform, caller, "list_transaction_fees", args, func(row *common.Row) error { + feeRecipient := "" + if row.Values[5] != nil { + feeRecipient = strings.ToLower(row.Values[5].(string)) + } + + rawMetadata := stringOrEmpty(row.Values[6]) + meta := parseMetadata(row.Values[6]) + + seq := int(row.Values[7].(int64)) + distRecipient := "" + if row.Values[8] != nil { + distRecipient = strings.ToLower(row.Values[8].(string)) + } + + rows = append(rows, ledgerRow{ + TxID: row.Values[0].(string), + CreatedAt: row.Values[1].(int64), + Method: row.Values[2].(string), + Caller: strings.ToLower(row.Values[3].(string)), + FeeAmount: decimalToString(row.Values[4]), + FeeRecipient: feeRecipient, + Metadata: meta, + DistributionSequence: seq, + DistributionRecipient: distRecipient, + DistributionAmount: decimalToString(row.Values[9]), + RawMetadata: rawMetadata, + }) + return nil + }) + return rows, err +} + +func fetchLastTransactions(ctx context.Context, platform *kwilTesting.Platform, caller string, address string, limit int64) ([]ledgerRow, error) { + rows := make([]ledgerRow, 0, 8) + addr := strings.ToLower(address) + args := []any{addr, limit} + err := callView(ctx, platform, caller, "get_last_transactions_v2", args, func(row *common.Row) error { + feeRecipient := "" + if row.Values[5] != nil { + feeRecipient = strings.ToLower(row.Values[5].(string)) + } + + rawMetadata := stringOrEmpty(row.Values[6]) + rows = append(rows, ledgerRow{ + TxID: row.Values[0].(string), + CreatedAt: row.Values[1].(int64), + Method: row.Values[2].(string), + Caller: strings.ToLower(row.Values[3].(string)), + FeeAmount: decimalToString(row.Values[4]), + FeeRecipient: feeRecipient, + Metadata: parseMetadata(row.Values[6]), + DistributionSequence: int(row.Values[7].(int64)), + DistributionRecipient: strings.ToLower(stringOrEmpty(row.Values[8])), + DistributionAmount: decimalToString(row.Values[9]), + RawMetadata: rawMetadata, + }) + return nil + }) + return rows, err +} + +func fetchLegacyLastTransactions(ctx context.Context, platform *kwilTesting.Platform, caller string, dataProvider string, limit int64) ([]legacyRow, error) { + rows := make([]legacyRow, 0, 8) + args := []any{dataProvider, limit} + err := callView(ctx, platform, caller, "get_last_transactions_v1", args, func(row *common.Row) error { + rows = append(rows, legacyRow{ + CreatedAt: row.Values[0].(int64), + Method: row.Values[1].(string), + }) + return nil + }) + return rows, err +} + +func fetchTransactionEvent(ctx context.Context, platform *kwilTesting.Platform, caller string, txID string) (ledgerRow, error) { + var result ledgerRow + err := callView(ctx, platform, caller, "get_transaction_event", []any{txID}, func(row *common.Row) error { + feeRecipient := "" + if row.Values[5] != nil { + feeRecipient = strings.ToLower(row.Values[5].(string)) + } + + rawMetadata := stringOrEmpty(row.Values[6]) + result = ledgerRow{ + TxID: row.Values[0].(string), + Method: row.Values[2].(string), + Caller: strings.ToLower(row.Values[3].(string)), + FeeAmount: decimalToString(row.Values[4]), + FeeRecipient: feeRecipient, + Metadata: parseMetadata(row.Values[6]), + FeeDistributions: stringOrEmpty(row.Values[7]), + DistributionSequence: 0, + DistributionRecipient: "", + DistributionAmount: "", + RawMetadata: rawMetadata, + } + return nil + }) + return result, err +} + +func callView(ctx context.Context, platform *kwilTesting.Platform, caller string, action string, args []any, fn func(*common.Row) error) error { + address, err := util.NewEthereumAddressFromBytes(platform.Deployer) + if err != nil { + return err + } + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 100, + }, + Signer: address.Bytes(), + Caller: strings.ToLower(caller), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx} + res, err := platform.Engine.Call(engineCtx, platform.DB, "", action, args, fn) + if err != nil { + return err + } + if res != nil && res.Error != nil { + return res.Error + } + return nil +} + +func decimalToString(val any) string { + if val == nil { + return "" + } + dec, ok := val.(*kwilTypes.Decimal) + if !ok { + panic(fmt.Sprintf("expected *types.Decimal, got %T", val)) + } + return dec.String() +} + +type metadataMap map[string]any + +func parseMetadata(val any) metadataMap { + if val == nil { + return metadataMap{} + } + + switch v := val.(type) { + case string: + if v == "" { + return metadataMap{} + } + var m metadataMap + if err := json.Unmarshal([]byte(v), &m); err != nil { + panic(fmt.Sprintf("failed to parse metadata %q: %v", v, err)) + } + return m + default: + panic(fmt.Sprintf("unexpected metadata type %T", v)) + } +} + +func (m metadataMap) String(key string) string { + val, ok := m[key] + if !ok { + return "" + } + str, ok := val.(string) + if !ok { + panic(fmt.Sprintf("metadata %s is not string: %T", key, val)) + } + return strings.ToLower(str) +} + +func (m metadataMap) Bool(key string) bool { + val, ok := m[key] + if !ok { + return false + } + b, ok := val.(bool) + if !ok { + panic(fmt.Sprintf("metadata %s is not bool: %T", key, val)) + } + return b +} + +func (m metadataMap) Int(key string) int64 { + val, ok := m[key] + if !ok { + return 0 + } + switch num := val.(type) { + case float64: + return int64(num) + case json.Number: + i, err := num.Int64() + if err != nil { + panic(fmt.Sprintf("metadata %s invalid json number: %v", key, err)) + } + return i + default: + panic(fmt.Sprintf("metadata %s is not numeric: %T", key, val)) + } +} + +func (m metadataMap) StringSlice(key string) []string { + val, ok := m[key] + if !ok { + return nil + } + arr, ok := val.([]any) + if !ok { + panic(fmt.Sprintf("metadata %s is not array: %T", key, val)) + } + out := make([]string, 0, len(arr)) + for _, v := range arr { + str, ok := v.(string) + if !ok { + panic(fmt.Sprintf("metadata array %s contains non-string %T", key, v)) + } + out = append(out, strings.ToLower(str)) + } + return out +} + +func (m metadataMap) Optional(key string) any { + val, ok := m[key] + if !ok { + return nil + } + if val == nil { + return nil + } + return val +} + +func stringOrEmpty(val any) string { + if val == nil { + return "" + } + return val.(string) +} + +func buildDistribution(recipient, amount string) string { + if amount == "" || recipient == "" { + return "" + } + return strings.ToLower(recipient) + ":" + amount +} + +func ledgerGiveBalance(ctx context.Context, platform *kwilTesting.Platform, wallet string, amount string) error { + ledgerPointCounter++ + return testerc20.InjectERC20Transfer( + ctx, + platform, + ledgerChain, + ledgerEscrow, + ledgerERC20, + wallet, + wallet, + amount, + ledgerPointCounter, + nil, + ) +} diff --git a/tests/streams/utils/procedure/execute.go b/tests/streams/utils/procedure/execute.go index d8b4466d8..59063f4ed 100644 --- a/tests/streams/utils/procedure/execute.go +++ b/tests/streams/utils/procedure/execute.go @@ -6,14 +6,13 @@ import ( "fmt" "strings" - trufTypes "github.com/trufnetwork/sdk-go/core/types" - "github.com/trufnetwork/sdk-go/core/util" - "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" kwilTypes "github.com/trufnetwork/kwil-db/core/types" kwilTesting "github.com/trufnetwork/kwil-db/testing" - "github.com/trufnetwork/sdk-go/core/types" + "github.com/trufnetwork/node/tests/streams/utils/testctx" + types "github.com/trufnetwork/sdk-go/core/types" + "github.com/trufnetwork/sdk-go/core/util" ) // GetDataResult contains the full result of a GetRecord call @@ -54,19 +53,7 @@ func GetRecordWithLogs(ctx context.Context, input GetRecordInput) (*GetDataResul return nil, errors.Wrap(err, "error in getRecord") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) prefix := "" if input.Prefix != nil { @@ -155,19 +142,7 @@ func GetIndexWithLogs(ctx context.Context, input GetIndexInput) (*GetDataResult, return nil, errors.Wrap(err, "error in getIndex") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) prefix := "" if input.Prefix != nil { @@ -236,19 +211,7 @@ func GetIndexChangeWithLogs(ctx context.Context, input GetIndexChangeInput) (*Ge return nil, errors.Wrap(err, "error in getIndexChange") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) // Set default use_cache to false if not specified useCache := false @@ -311,19 +274,7 @@ func GetFirstRecordWithLogs(ctx context.Context, input GetFirstRecordInput) (*Ge return nil, errors.Wrap(err, "error in getFirstRecord") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) // Set default use_cache to false if not specified useCache := false @@ -382,19 +333,7 @@ func SetMetadata(ctx context.Context, input SetMetadataInput) error { return errors.Wrap(err, "error in setMetadata") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "set_metadata", []any{ input.StreamLocator.DataProvider.Address(), @@ -449,17 +388,7 @@ func DescribeTaxonomies(ctx context.Context, input DescribeTaxonomiesInput) ([]R return nil, errors.Wrap(err, "error in DescribeTaxonomies.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - BlockContext: &common.BlockContext{Height: 0}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - Ctx: ctx, - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, 0) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "describe_taxonomies", []any{ @@ -504,17 +433,7 @@ func SetTaxonomy(ctx context.Context, input SetTaxonomyInput) error { weightDecimals = append(weightDecimals, valueDecimal) } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_taxonomy", []any{ input.StreamLocator.DataProvider.Address(), // parent data provider @@ -538,19 +457,7 @@ func GetCategoryStreams(ctx context.Context, input GetCategoryStreamsInput) ([]R return nil, errors.Wrap(err, "error in getCategoryStreams") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: 0, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, 0) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_category_streams", []any{ @@ -583,19 +490,7 @@ func FilterStreamsByExistence(ctx context.Context, input FilterStreamsByExistenc return nil, errors.Wrap(err, "error in FilterStreamsByExistence") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) dataProviders := []string{} streamIds := []string{} @@ -640,19 +535,7 @@ func DisableTaxonomy(ctx context.Context, input DisableTaxonomyInput) error { return errors.Wrap(err, "error in DisableTaxonomy") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "disable_taxonomy", []any{ input.StreamLocator.DataProvider.Address(), @@ -687,19 +570,7 @@ func ListStreams(ctx context.Context, input ListStreamsInput) ([]ResultRow, erro return nil, errors.Wrap(err, "error in ListStreams") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: input.Platform.Txid(), - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "list_streams", []any{ @@ -726,7 +597,7 @@ func ListStreams(ctx context.Context, input ListStreamsInput) ([]ResultRow, erro type GetDatabaseSizeInput struct { Platform *kwilTesting.Platform - Locator trufTypes.StreamLocator + Locator types.StreamLocator Height int64 } @@ -736,17 +607,7 @@ func GetDatabaseSize(ctx context.Context, input GetDatabaseSizeInput) (int64, er return 0, errors.Wrap(err, "failed to create Ethereum address from deployer bytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var databaseSize *int64 r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_database_size", []any{}, func(row *common.Row) error { databaseSize = safe(row.Values[0], nil, int64PtrConverter) @@ -769,17 +630,7 @@ func GetDatabaseSizeV2(ctx context.Context, input GetDatabaseSizeInput) ([]Resul return nil, errors.Wrap(err, "error in GetDatabaseSizeV2.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_database_size_v2", []any{}, func(row *common.Row) error { @@ -805,17 +656,7 @@ func GetDatabaseSizeV2Pretty(ctx context.Context, input GetDatabaseSizeInput) ([ return nil, errors.Wrap(err, "error in GetDatabaseSizeV2Pretty.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_database_size_v2_pretty", []any{}, func(row *common.Row) error { @@ -834,7 +675,6 @@ func GetDatabaseSizeV2Pretty(ctx context.Context, input GetDatabaseSizeInput) ([ return processResultRows(resultRows) } - // ListTaxonomiesByHeight executes list_taxonomies_by_height action func ListTaxonomiesByHeight(ctx context.Context, input ListTaxonomiesByHeightInput) ([]ResultRow, error) { deployer, err := util.NewEthereumAddressFromBytes(input.Platform.Deployer) @@ -842,17 +682,7 @@ func ListTaxonomiesByHeight(ctx context.Context, input ListTaxonomiesByHeightInp return nil, errors.Wrap(err, "error in ListTaxonomiesByHeight.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "list_taxonomies_by_height", []any{ @@ -884,17 +714,7 @@ func GetTaxonomiesForStreams(ctx context.Context, input GetTaxonomiesForStreamsI return nil, errors.Wrap(err, "error in GetTaxonomiesForStreams.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "get_taxonomies_for_streams", []any{ @@ -924,17 +744,7 @@ func ListMetadataByHeight(ctx context.Context, input ListMetadataByHeightInput) return nil, errors.Wrap(err, "error in ListMetadataByHeight.NewEthereumAddressFromBytes") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: input.Height}, - Signer: input.Platform.Deployer, - Caller: deployer.Address(), - TxID: input.Platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := testctx.NewEngineContext(ctx, input.Platform, deployer, input.Height) var resultRows [][]any r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "list_metadata_by_height", []any{ @@ -959,4 +769,3 @@ func ListMetadataByHeight(ctx context.Context, input ListMetadataByHeightInput) return processResultRows(resultRows) } - diff --git a/tests/streams/utils/setup/common.go b/tests/streams/utils/setup/common.go index f1e023eeb..7a16f2b82 100644 --- a/tests/streams/utils/setup/common.go +++ b/tests/streams/utils/setup/common.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/tests/streams/utils/testctx" "github.com/trufnetwork/sdk-go/core/types" "github.com/trufnetwork/sdk-go/core/util" ) @@ -27,6 +28,15 @@ func (contractType ContractType) String() string { return string(contractType) } +func newEthEngineContext(ctx context.Context, platform *kwilTesting.Platform, addr util.EthereumAddress, height int64) *common.EngineContext { + return testctx.NewEngineContext(ctx, platform, addr, height) +} + +// NewEngineContext returns an engine context configured with the provided signer and a deterministic leader. +func NewEngineContext(ctx context.Context, platform *kwilTesting.Platform, addr util.EthereumAddress, height int64) *common.EngineContext { + return testctx.NewEngineContext(ctx, platform, addr, height) +} + // CreateStream parses and creates the dataset for a contract func CreateStream(ctx context.Context, platform *kwilTesting.Platform, contractInfo StreamInfo) error { return CreateStreamWithOptions(ctx, platform, contractInfo, CreateStreamOptions{}) @@ -63,17 +73,7 @@ func UntypedCreateStream(ctx context.Context, platform *kwilTesting.Platform, st return errors.Wrap(err, "invalid data provider address") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: addr.Bytes(), - Caller: addr.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, addr, 1) r, err := platform.Engine.Call(engineContext, platform.DB, @@ -124,17 +124,7 @@ func CreateStreamsWithOptions(ctx context.Context, platform *kwilTesting.Platfor return errors.Wrap(err, "error creating composed dataset") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: deployer.Bytes(), - Caller: deployer.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, deployer, 1) streamIds := make([]string, len(streamInfos)) streamTypes := make([]string, len(streamInfos)) @@ -164,17 +154,7 @@ func CreateStreamsWithOptions(ctx context.Context, platform *kwilTesting.Platfor } func DeleteStream(ctx context.Context, platform *kwilTesting.Platform, streamLocator types.StreamLocator) (*common.CallResult, error) { - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: streamLocator.DataProvider.Bytes(), - Caller: streamLocator.DataProvider.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, streamLocator.DataProvider, 1) return platform.Engine.Call(engineContext, platform.DB, @@ -202,17 +182,7 @@ func CreateDataProvider(ctx context.Context, platform *kwilTesting.Platform, add return errors.Wrap(err, "failed to enable stream deployer") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: addr.Bytes(), - Caller: addr.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, addr, 1) r, err := platform.Engine.Call(engineContext, platform.DB, @@ -253,17 +223,7 @@ func CreateDataProviderWithoutRole(ctx context.Context, platform *kwilTesting.Pl } // Register the data provider - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: addr.Bytes(), - Caller: addr.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, addr, 1) r, err := platform.Engine.Call(engineContext, platform.DB, @@ -302,16 +262,15 @@ func CreateDataProviderWithoutRole(ctx context.Context, platform *kwilTesting.Pl // 3. This is a test utility following the same pattern as AddMemberToRoleBypass // 4. Using OverrideAuthz is the standard pattern for test role management func removeMemberFromRoleBypass(ctx context.Context, platform *kwilTesting.Platform, owner, roleName, wallet string) error { - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 0}, - TxID: platform.Txid(), - Signer: []byte("system"), - Caller: "0x0000000000000000000000000000000000000000", - } - engineContext := &common.EngineContext{ - TxContext: txContext, + TxContext: testctx.NewTxContextWithAuth( + ctx, + platform, + []byte("system"), + "0x0000000000000000000000000000000000000000", + "", + 1, + ), OverrideAuthz: true, // Skip authorization checks - this is a test utility } @@ -343,17 +302,7 @@ func GetStreamId(ctx context.Context, platform *kwilTesting.Platform, dataProvid return 0, errors.Wrap(err, "error creating ethereum address") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{Height: 1}, - Signer: deployer.Bytes(), - Caller: deployer.Address(), - TxID: platform.Txid(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, platform, deployer, 1) var streamRef int r, err := platform.Engine.Call(engineContext, platform.DB, "", "get_stream_id", []any{ diff --git a/tests/streams/utils/setup/primitive.go b/tests/streams/utils/setup/primitive.go index 063c0aeb9..c38da5088 100644 --- a/tests/streams/utils/setup/primitive.go +++ b/tests/streams/utils/setup/primitive.go @@ -4,13 +4,12 @@ import ( "context" "strconv" - "github.com/trufnetwork/sdk-go/core/types" - "github.com/pkg/errors" "github.com/trufnetwork/kwil-db/common" kwilTypes "github.com/trufnetwork/kwil-db/core/types" kwilTesting "github.com/trufnetwork/kwil-db/testing" testtable "github.com/trufnetwork/node/tests/streams/utils/table" + "github.com/trufnetwork/sdk-go/core/types" "github.com/trufnetwork/sdk-go/core/util" ) @@ -156,19 +155,8 @@ func InsertMarkdownPrimitiveData(ctx context.Context, input InsertMarkdownDataIn continue } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: txid, - Signer: signer.Bytes(), - Caller: signer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, input.Platform, signer, input.Height) + engineContext.TxContext.TxID = txid r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_record", []any{ input.StreamLocator.DataProvider.Address(), @@ -217,19 +205,8 @@ func insertPrimitiveData(ctx context.Context, input InsertPrimitiveDataInput) er } for _, arg := range args { - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: txid, - Signer: deployer.Bytes(), - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, input.Platform, deployer, input.Height) + engineContext.TxContext.TxID = txid r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_record", arg, func(row *common.Row) error { return nil @@ -327,16 +304,8 @@ func InsertPrimitiveDataMultiBatch(ctx context.Context, input InsertMultiPrimiti } signerAddr := util.Unsafe_NewEthereumAddressFromString(provider) - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: txid, - Signer: signerAddr.Bytes(), - Caller: signerAddr.Address(), - } - engineContext := &common.EngineContext{TxContext: txContext} + engineContext := newEthEngineContext(ctx, input.Platform, signerAddr, input.Height) + engineContext.TxContext.TxID = txid args := []any{g.dataProviders, g.streamIds, g.eventTimes, g.values} r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_records", args, func(row *common.Row) error { return nil }) @@ -404,19 +373,8 @@ func InsertTruflationDataBatch(ctx context.Context, input InsertTruflationDataIn return errors.Wrap(err, "error in InsertTruflationDataBatch") } - txContext := &common.TxContext{ - Ctx: ctx, - BlockContext: &common.BlockContext{ - Height: input.Height, - }, - TxID: txid, - Signer: deployer.Bytes(), - Caller: deployer.Address(), - } - - engineContext := &common.EngineContext{ - TxContext: txContext, - } + engineContext := newEthEngineContext(ctx, input.Platform, deployer, input.Height) + engineContext.TxContext.TxID = txid r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "truflation_insert_records", args, func(row *common.Row) error { return nil diff --git a/tests/streams/utils/testctx/context.go b/tests/streams/utils/testctx/context.go new file mode 100644 index 000000000..da150307a --- /dev/null +++ b/tests/streams/utils/testctx/context.go @@ -0,0 +1,57 @@ +package testctx + +import ( + "context" + "fmt" + "sync" + + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/sdk-go/core/util" +) + +var ( + defaultLeaderOnce sync.Once + defaultLeaderPub *crypto.Secp256k1PublicKey +) + +func defaultLeaderPublicKey() *crypto.Secp256k1PublicKey { + defaultLeaderOnce.Do(func() { + _, pubGeneric, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + panic(fmt.Sprintf("failed to generate default leader key: %v", err)) + } + var ok bool + defaultLeaderPub, ok = pubGeneric.(*crypto.Secp256k1PublicKey) + if !ok { + panic("default leader public key is not secp256k1") + } + }) + return defaultLeaderPub +} + +// NewTxContextWithAuth constructs a TxContext that includes a deterministic leader proposer. +func NewTxContextWithAuth(ctx context.Context, platform *kwilTesting.Platform, signer []byte, caller string, authenticator string, height int64) *common.TxContext { + if height < 0 { + height = 1 + } + return &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: height, + Proposer: defaultLeaderPublicKey(), + }, + Signer: signer, + Caller: caller, + TxID: platform.Txid(), + Authenticator: authenticator, + } +} + +// NewEngineContext returns an EngineContext configured for an Ethereum address signer. +func NewEngineContext(ctx context.Context, platform *kwilTesting.Platform, addr util.EthereumAddress, height int64) *common.EngineContext { + txContext := NewTxContextWithAuth(ctx, platform, addr.Bytes(), addr.Address(), coreauth.EthPersonalSignAuth, height) + return &common.EngineContext{TxContext: txContext} +}