From 3f2449809f7edfcd72dfd5c24165ecd8e4b3244d Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Thu, 20 Nov 2025 21:55:10 +0700 Subject: [PATCH 1/2] feat: show transaction input data on-demand --- go.sum | 8 - .../028-transaction-input-actions.sql | 248 ++++++++++++++++++ tests/streams/transaction_input_data_test.go | 203 ++++++++++++++ 3 files changed, 451 insertions(+), 8 deletions(-) create mode 100644 internal/migrations/028-transaction-input-actions.sql create mode 100644 tests/streams/transaction_input_data_test.go diff --git a/go.sum b/go.sum index 000836175..6e6b1c4f3 100644 --- a/go.sum +++ b/go.sum @@ -1212,16 +1212,8 @@ github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZ github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY= github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPDo= github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= -github.com/trufnetwork/kwil-db v0.10.3-0.20251007160009-740626e8d3dc h1:svkXj1gOW1BC1fRpkcokx5hTJHS2z6wH5PBmhKT1g6M= -github.com/trufnetwork/kwil-db v0.10.3-0.20251007160009-740626e8d3dc/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db v0.10.3-0.20251007160956-022e6739f56d h1:0QSNquiTdWwFkYqh1W8IRedIC9B7oKY4gOtWmbMINpg= -github.com/trufnetwork/kwil-db v0.10.3-0.20251007160956-022e6739f56d/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= github.com/trufnetwork/kwil-db v0.10.3-0.20251022154021-25ca3428bace h1:Cikf1fQSgTDPHU4yXhJgAKQ4qjn4PDF0znXPmVLK1b8= github.com/trufnetwork/kwil-db v0.10.3-0.20251022154021-25ca3428bace/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20251007160009-740626e8d3dc h1:XMiKB6yIjIgoOtte1doYJKJDi76Yd+NtRckVBaZWX5U= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20251007160009-740626e8d3dc/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20251007160956-022e6739f56d h1:+y33t1CekaXy4iaENBp4svpx/6eeVvpcoTDzTzwkM0M= -github.com/trufnetwork/kwil-db/core v0.4.3-0.20251007160956-022e6739f56d/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/kwil-db/core v0.4.3-0.20251022154021-25ca3428bace h1:CyU76505WNmFFZSB9w3007qAA8et3WxKqxEJcSh+Rfo= github.com/trufnetwork/kwil-db/core v0.4.3-0.20251022154021-25ca3428bace/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88= diff --git a/internal/migrations/028-transaction-input-actions.sql b/internal/migrations/028-transaction-input-actions.sql new file mode 100644 index 000000000..1163eabf3 --- /dev/null +++ b/internal/migrations/028-transaction-input-actions.sql @@ -0,0 +1,248 @@ +/** + * ============================================================================= + * Transaction Input Data Actions + * ============================================================================= + * + * Provides on-demand retrieval of transaction input data by querying + * data tables using tx_id foreign keys. These actions are called from + * the transaction explorer frontend when users click "Show Input Data". + * + * Design Philosophy: + * - No data duplication: Query existing tables with tx_id columns + * - On-demand loading: Only fetch when explicitly requested + * - Direct node calls: No indexer involvement + * - Simple queries: Single WHERE clause using indexed tx_id + * + * Related Tables: + * - streams (tx_id column added in 000-initial-data.sql:165) + * - primitive_events (tx_id column added in 000-initial-data.sql:169) + * - taxonomies (tx_id column added in 000-initial-data.sql:173) + * - metadata (tx_id column added in 000-initial-data.sql:177) + * - attestations (uses request_tx_id as PRIMARY KEY) + * + * Related Actions: + * - get_transaction_event (027-transaction-actions.sql:99) - Get basic tx info + * - record_transaction_event (027-transaction-actions.sql:1) - Record tx events + */ + +/** + * get_transaction_streams: Retrieve streams deployed in a transaction + * + * Used for: deployStream transactions (method_id = 1) + * + * @param $tx_id - Transaction ID (hex string with or without 0x prefix) + * @return Array of streams created in the transaction + */ +CREATE OR REPLACE ACTION get_transaction_streams( + $tx_id TEXT +) PUBLIC VIEW RETURNS TABLE ( + stream_id TEXT, + data_provider TEXT, + stream_type TEXT, + created_at INT8 +) { + IF $tx_id IS NULL OR trim($tx_id) = '' { + ERROR('tx_id is required'); + } + + -- Normalize tx_id: remove 0x prefix if present (data tables store without 0x) + $tx_clean TEXT := LOWER($tx_id); + IF substring($tx_clean, 1, 2) = '0x' { + $tx_clean := substring($tx_clean, 3); + } + + RETURN SELECT + stream_id, + data_provider, + stream_type, + created_at + FROM streams + WHERE tx_id = $tx_clean + ORDER BY stream_id; +}; + +/** + * get_transaction_records: Retrieve records inserted in a transaction + * + * Used for: insertRecords transactions (method_id = 2) + * + * @param $tx_id - Transaction ID (hex string with or without 0x prefix) + * @return Array of primitive events (records) inserted in the transaction + * + * Note: May return 100+ records for batch insertions + */ +CREATE OR REPLACE ACTION get_transaction_records( + $tx_id TEXT +) PUBLIC VIEW RETURNS TABLE ( + data_provider TEXT, + stream_id TEXT, + event_time INT8, + value NUMERIC(36, 18), + created_at INT8 +) { + IF $tx_id IS NULL OR trim($tx_id) = '' { + ERROR('tx_id is required'); + } + + -- Normalize tx_id: remove 0x prefix if present (data tables store without 0x) + $tx_clean TEXT := LOWER($tx_id); + IF substring($tx_clean, 1, 2) = '0x' { + $tx_clean := substring($tx_clean, 3); + } + + RETURN SELECT + s.data_provider, + s.stream_id, + pe.event_time, + pe.value, + pe.created_at + FROM primitive_events pe + JOIN streams s ON pe.stream_ref = s.id + WHERE pe.tx_id = $tx_clean + ORDER BY s.data_provider, s.stream_id, pe.event_time; +}; + +/** + * get_transaction_taxonomies: Retrieve taxonomies created in a transaction + * + * Used for: setTaxonomies transactions (method_id = 3) + * + * @param $tx_id - Transaction ID (hex string with or without 0x prefix) + * @return Array of taxonomy relationships created in the transaction + * + * Note: Multiple rows with same group_sequence represent one taxonomy version + */ +CREATE OR REPLACE ACTION get_transaction_taxonomies( + $tx_id TEXT +) PUBLIC VIEW RETURNS TABLE ( + data_provider TEXT, + stream_id TEXT, + child_data_provider TEXT, + child_stream_id TEXT, + weight NUMERIC(36, 18), + start_time INT8, + group_sequence INT8, + created_at INT8 +) { + IF $tx_id IS NULL OR trim($tx_id) = '' { + ERROR('tx_id is required'); + } + + -- Normalize tx_id: remove 0x prefix if present (data tables store without 0x) + $tx_clean TEXT := LOWER($tx_id); + IF substring($tx_clean, 1, 2) = '0x' { + $tx_clean := substring($tx_clean, 3); + } + + RETURN SELECT + s.data_provider, + s.stream_id, + cs.data_provider AS child_data_provider, + cs.stream_id AS child_stream_id, + t.weight, + t.start_time, + t.group_sequence, + t.created_at + FROM taxonomies t + JOIN streams s ON t.stream_ref = s.id + JOIN streams cs ON t.child_stream_ref = cs.id + WHERE t.tx_id = $tx_clean + ORDER BY s.data_provider, s.stream_id, t.group_sequence, cs.stream_id; +}; + +/** + * get_transaction_metadata: Retrieve metadata set in a transaction + * + * Used for: setMetadata transactions (method_id = 7) + * + * @param $tx_id - Transaction ID (hex string with or without 0x prefix) + * @return Array of metadata key-value pairs set in the transaction + * + * Note: Only one value_* field will be non-null per row + */ +CREATE OR REPLACE ACTION get_transaction_metadata( + $tx_id TEXT +) PUBLIC VIEW RETURNS TABLE ( + data_provider TEXT, + stream_id TEXT, + metadata_key TEXT, + value_i INT8, + value_f NUMERIC(36, 18), + value_b BOOLEAN, + value_s TEXT, + value_ref TEXT, + created_at INT8 +) { + IF $tx_id IS NULL OR trim($tx_id) = '' { + ERROR('tx_id is required'); + } + + -- Normalize tx_id: remove 0x prefix if present (data tables store without 0x) + $tx_clean TEXT := LOWER($tx_id); + IF substring($tx_clean, 1, 2) = '0x' { + $tx_clean := substring($tx_clean, 3); + } + + RETURN SELECT + s.data_provider, + s.stream_id, + m.metadata_key, + m.value_i, + m.value_f, + m.value_b, + m.value_s, + m.value_ref, + m.created_at + FROM metadata m + JOIN streams s ON m.stream_ref = s.id + WHERE m.tx_id = $tx_clean + ORDER BY s.data_provider, s.stream_id, m.metadata_key; +}; + +/** + * get_transaction_attestation: Retrieve attestation request details + * + * Used for: requestAttestation transactions (method_id = 6) + * + * @param $tx_id - Transaction ID (hex string with or without 0x prefix) + * @return Single attestation record or empty if not found + * + * Note: BYTEA fields (attestation_hash, requester, result_canonical, etc.) + * are returned as hex-encoded strings + */ +CREATE OR REPLACE ACTION get_transaction_attestation( + $tx_id TEXT +) PUBLIC VIEW RETURNS TABLE ( + request_tx_id TEXT, + attestation_hash BYTEA, + requester BYTEA, + result_canonical BYTEA, + encrypt_sig BOOLEAN, + created_height INT8, + signature BYTEA, + validator_pubkey BYTEA, + signed_height INT8 +) { + IF $tx_id IS NULL OR trim($tx_id) = '' { + ERROR('tx_id is required'); + } + + -- Normalize tx_id: remove 0x prefix if present (attestations store without 0x) + $tx_clean TEXT := LOWER($tx_id); + IF substring($tx_clean, 1, 2) = '0x' { + $tx_clean := substring($tx_clean, 3); + } + + RETURN SELECT + request_tx_id, + attestation_hash, + requester, + result_canonical, + encrypt_sig, + created_height, + signature, + validator_pubkey, + signed_height + FROM attestations + WHERE request_tx_id = $tx_clean; +}; diff --git a/tests/streams/transaction_input_data_test.go b/tests/streams/transaction_input_data_test.go new file mode 100644 index 000000000..682363abb --- /dev/null +++ b/tests/streams/transaction_input_data_test.go @@ -0,0 +1,203 @@ +//go:build kwiltest + +package tests + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/common" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + kwilTypes "github.com/trufnetwork/kwil-db/core/types" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/internal/migrations" + testutils "github.com/trufnetwork/node/tests/streams/utils" + "github.com/trufnetwork/node/tests/streams/utils/setup" + sdkTypes "github.com/trufnetwork/sdk-go/core/types" + "github.com/trufnetwork/sdk-go/core/util" +) + +// TestTransactionInputActions tests all transaction input data retrieval actions +func TestTransactionInputActions(t *testing.T) { + testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "TX_INPUT_01_AllActions", + SeedStatements: migrations.GetSeedScriptStatements(), + FunctionTests: []kwilTesting.TestFunc{ + runTransactionInputActionsTest(t), + }, + }, testutils.GetTestOptionsWithCache()) +} + +func runTransactionInputActionsTest(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + systemAdminVal := util.Unsafe_NewEthereumAddressFromString("0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf") + systemAdmin := &systemAdminVal + platform.Deployer = systemAdmin.Bytes() + + require.NoError(t, setup.AddMemberToRoleBypass(ctx, platform, "system", "network_writers_manager", systemAdmin.Address())) + require.NoError(t, setup.CreateDataProvider(ctx, platform, systemAdmin.Address())) + + // Test 1: get_transaction_streams + t.Log("Test 1: Create streams and verify get_transaction_streams") + + stream1 := util.GenerateStreamId("test_stream_1") + stream2 := util.GenerateStreamId("test_stream_2") + + txID1 := strings.ToLower(platform.Txid()) + tx1 := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 100, + }, + Signer: systemAdmin.Bytes(), + Caller: systemAdmin.Address(), + TxID: txID1, + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx1 := &common.EngineContext{TxContext: tx1} + _, err := platform.Engine.Call(engineCtx1, platform.DB, "", "create_streams", []any{ + []string{stream1.String(), stream2.String()}, + []string{"primitive", "composed"}, + }, func(row *common.Row) error { return nil }) + require.NoError(t, err) + + formattedTxID1 := "0x" + txID1 + t.Logf("Created streams with tx_id: %s", formattedTxID1) + + // Call get_transaction_streams + rows1 := make([]*common.Row, 0) + err = callViewAction(ctx, platform, systemAdmin.Address(), "get_transaction_streams", []any{formattedTxID1}, func(row *common.Row) error { + rows1 = append(rows1, row) + return nil + }) + require.NoError(t, err) + require.Len(t, rows1, 2, "should return 2 streams") + + // Verify both streams are returned with correct types + stream1Found := false + stream2Found := false + for _, row := range rows1 { + streamID := row.Values[0].(string) + streamType := row.Values[2].(string) + dataProvider := row.Values[1].(string) + + require.Equal(t, strings.ToLower(systemAdmin.Address()), dataProvider, "data_provider should match") + require.NotNil(t, row.Values[3], "created_at should not be nil") + + if streamID == stream1.String() { + require.Equal(t, "primitive", streamType, "stream1 should be primitive") + stream1Found = true + } else if streamID == stream2.String() { + require.Equal(t, "composed", streamType, "stream2 should be composed") + stream2Found = true + } + } + require.True(t, stream1Found, "stream1 should be found") + require.True(t, stream2Found, "stream2 should be found") + + t.Log("✅ get_transaction_streams works correctly") + + // Test 2: get_transaction_records + t.Log("Test 2: Insert records and verify get_transaction_records") + + primitiveStream := util.GenerateStreamId("test_primitive") + require.NoError(t, setup.CreateStream(ctx, platform, setup.StreamInfo{ + Type: setup.ContractTypePrimitive, + Locator: sdkTypes.StreamLocator{ + StreamId: primitiveStream, + DataProvider: *systemAdmin, + }, + })) + + value1, err := kwilTypes.ParseDecimalExplicit("100.5", 36, 18) + require.NoError(t, err) + + txID2 := strings.ToLower(platform.Txid()) + tx2 := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 101, + }, + Signer: systemAdmin.Bytes(), + Caller: systemAdmin.Address(), + TxID: txID2, + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx2 := &common.EngineContext{TxContext: tx2} + _, err = platform.Engine.Call(engineCtx2, platform.DB, "", "insert_records", []any{ + []string{strings.ToLower(systemAdmin.Address())}, + []string{primitiveStream.String()}, + []int64{1700000000}, + []*kwilTypes.Decimal{value1}, + }, func(row *common.Row) error { return nil }) + require.NoError(t, err) + + formattedTxID2 := "0x" + txID2 + t.Logf("Inserted records with tx_id: %s", formattedTxID2) + + // Call get_transaction_records + rows2 := make([]*common.Row, 0) + err = callViewAction(ctx, platform, systemAdmin.Address(), "get_transaction_records", []any{formattedTxID2}, func(row *common.Row) error { + rows2 = append(rows2, row) + return nil + }) + require.NoError(t, err) + require.Len(t, rows2, 1, "should return 1 record") + require.Equal(t, primitiveStream.String(), rows2[0].Values[1].(string), "stream_id should match") + require.Equal(t, int64(1700000000), rows2[0].Values[2].(int64), "event_time should match") + + t.Log("✅ get_transaction_records works correctly") + + // Test 3: Edge cases + t.Log("Test 3: Edge cases") + + // Non-existent tx_id + rows3 := make([]*common.Row, 0) + err = callViewAction(ctx, platform, systemAdmin.Address(), "get_transaction_streams", []any{"0xnonexistent"}, func(row *common.Row) error { + rows3 = append(rows3, row) + return nil + }) + require.NoError(t, err) + require.Len(t, rows3, 0, "should return empty for non-existent tx_id") + + // Empty tx_id should error + err = callViewAction(ctx, platform, systemAdmin.Address(), "get_transaction_records", []any{""}, func(row *common.Row) error { + return nil + }) + require.Error(t, err) + require.Contains(t, err.Error(), "tx_id is required") + + t.Log("✅ Edge cases handled correctly") + t.Log("✅✅✅ All transaction input data actions work correctly!") + + return nil + } +} + +func callViewAction(ctx context.Context, platform *kwilTesting.Platform, caller string, action string, args []any, fn func(*common.Row) error) error { + address, err := util.NewEthereumAddressFromBytes(platform.Deployer) + if err != nil { + return err + } + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 100, + }, + Signer: address.Bytes(), + Caller: strings.ToLower(caller), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx} + res, err := platform.Engine.Call(engineCtx, platform.DB, "", action, args, fn) + if err != nil { + return err + } + if res != nil && res.Error != nil { + return res.Error + } + return nil +} From 68234718a5509fb2124856c72ad478483b2cbe5d Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Thu, 20 Nov 2025 22:07:46 +0700 Subject: [PATCH 2/2] chore: apply suggestion --- internal/migrations/028-transaction-input-actions.sql | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/migrations/028-transaction-input-actions.sql b/internal/migrations/028-transaction-input-actions.sql index 1163eabf3..d3a637551 100644 --- a/internal/migrations/028-transaction-input-actions.sql +++ b/internal/migrations/028-transaction-input-actions.sql @@ -208,7 +208,10 @@ CREATE OR REPLACE ACTION get_transaction_metadata( * @return Single attestation record or empty if not found * * Note: BYTEA fields (attestation_hash, requester, result_canonical, etc.) - * are returned as hex-encoded strings + * are returned as BYTEA types. PostgreSQL returns them as hex-encoded + * strings (prefixed with \x), which are decoded to raw bytes by the + * Kwil engine and then re-encoded by the client/serialization layer + * (typically as hex or base64) when transmitted over JSON-RPC */ CREATE OR REPLACE ACTION get_transaction_attestation( $tx_id TEXT