From afe1c6a37afdb39bf25f8f77efc65db223ec9e8d Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Tue, 24 Feb 2026 01:38:07 +0700 Subject: [PATCH 1/4] feat: view prediction market data by stream --- .../migrations/032-order-book-actions.sql | 25 +++ .../migrations/044-get-markets-by-stream.sql | 56 +++++++ tests/streams/order_book/discovery_test.go | 154 ++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 internal/migrations/044-get-markets-by-stream.sql create mode 100644 tests/streams/order_book/discovery_test.go diff --git a/internal/migrations/032-order-book-actions.sql b/internal/migrations/032-order-book-actions.sql index ee17ecf02..d068f785c 100644 --- a/internal/migrations/032-order-book-actions.sql +++ b/internal/migrations/032-order-book-actions.sql @@ -101,6 +101,23 @@ CREATE OR REPLACE ACTION create_market( ERROR('query_components is required (ABI-encoded (address,bytes32,string,bytes))'); } + -- Decode components for structured storage + $data_provider BYTEA; + $stream_id BYTEA; + $action_id_str TEXT; + $query_args BYTEA; + for $row in tn_utils.decode_query_components($query_components) { + $data_provider := $row.data_provider; + $stream_id := $row.stream_id; + $action_id_str := $row.action_id; + $query_args := $row.args; + } + + -- Validation: Ensure decoding succeeded + if $data_provider IS NULL OR $stream_id IS NULL { + ERROR('Failed to decode query_components: invalid ABI data or missing components'); + } + -- Compute hash from query components using attestation format -- This ensures market hash matches attestation hash for automatic settlement $query_hash BYTEA; @@ -187,6 +204,10 @@ CREATE OR REPLACE ACTION create_market( id, hash, query_components, + data_provider, + stream_id, + action_id, + query_args, settle_time, max_spread, min_order_size, @@ -198,6 +219,10 @@ CREATE OR REPLACE ACTION create_market( COALESCE(MAX(id), 0) + 1, $query_hash, $query_components, + $data_provider, + $stream_id, + $action_id_str, + $query_args, $settle_time, $max_spread, $min_order_size, diff --git a/internal/migrations/044-get-markets-by-stream.sql b/internal/migrations/044-get-markets-by-stream.sql new file mode 100644 index 000000000..4ea0406ac --- /dev/null +++ b/internal/migrations/044-get-markets-by-stream.sql @@ -0,0 +1,56 @@ +-- Migration: Order Book Discovery Action +-- Adds get_markets_by_stream discovery view for high-performance indexed lookups. + +-- ============================================================================= +-- get_markets_by_stream: Discovery view for asset pages +-- ============================================================================= +/** + * Returns all markets associated with a specific stream ID. + * This lookup is high-performance (indexed). + * + * Parameters: + * - $stream_id: The 32-byte stream identifier + * - $limit_val: Maximum number of results (default 100, max 100) + * - $offset_val: Number of results to skip (default 0) + * + * Returns table of market summaries. + */ +CREATE OR REPLACE ACTION get_markets_by_stream( + $stream_id BYTEA, + $limit_val INT, + $offset_val INT +) +PUBLIC VIEW RETURNS TABLE ( + id INT, + hash BYTEA, + data_provider BYTEA, + action_id TEXT, + settle_time INT8, + settled BOOLEAN, + winning_outcome BOOLEAN, + max_spread INT, + min_order_size INT8, + created_at INT8 +) { + if $stream_id IS NULL { + ERROR('stream_id is required'); + } + + -- Apply default and max limits + $effective_limit INT := 100; + $effective_offset INT := 0; + + if $limit_val IS NOT NULL AND $limit_val > 0 AND $limit_val <= 100 { + $effective_limit := $limit_val; + } + if $offset_val IS NOT NULL AND $offset_val >= 0 { + $effective_offset := $offset_val; + } + + RETURN SELECT id, hash, data_provider, action_id, settle_time, settled, + winning_outcome, max_spread, min_order_size, created_at + FROM ob_queries + WHERE stream_id = $stream_id + ORDER BY created_at DESC + LIMIT $effective_limit OFFSET $effective_offset; +}; diff --git a/tests/streams/order_book/discovery_test.go b/tests/streams/order_book/discovery_test.go new file mode 100644 index 000000000..c2298c03f --- /dev/null +++ b/tests/streams/order_book/discovery_test.go @@ -0,0 +1,154 @@ +//go:build kwiltest + +package order_book + +import ( + "context" + "fmt" + "testing" + "time" + + gethCommon "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/common" + coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" + erc20bridge "github.com/trufnetwork/kwil-db/node/exts/erc20-bridge/erc20" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/internal/migrations" + testutils "github.com/trufnetwork/node/tests/streams/utils" + "github.com/trufnetwork/sdk-go/core/util" +) + +// TestOrderBookDiscovery verifies that new markets populate denormalized columns +// and can be discovered via the indexed get_markets_by_stream view. +func TestOrderBookDiscovery(t *testing.T) { + testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "ORDER_BOOK_Discovery", + SeedStatements: migrations.GetSeedScriptStatements(), + FunctionTests: []kwilTesting.TestFunc{ + testDiscoveryWorkflow(t), + }, + }, testutils.GetTestOptionsWithCache()) +} + +func testDiscoveryWorkflow(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Reset balance point tracker + lastBalancePointComponents = nil + userAddr := util.Unsafe_NewEthereumAddressFromString("0x1111111111111111111111111111111111111111") + + // Initialize ERC20 extension + err := erc20bridge.ForTestingInitializeExtension(ctx, platform) + require.NoError(t, err) + + // Give user balance for fees + err = giveBalanceChainedComponents(ctx, platform, userAddr.Address(), "100000000000000000000") + require.NoError(t, err) + + // 1. Create a Market + dataProvider := "0x2222222222222222222222222222222222222222" + streamID := "stdiscovery000000000000000000000" // Exactly 32 chars + actionID := "price_above_threshold" + argsBytes := []byte{0xDE, 0xAD, 0xBE, 0xEF} + + queryComponents, err := encodeQueryComponentsABI(dataProvider, streamID, actionID, argsBytes) + require.NoError(t, err) + + settleTime := time.Now().Add(1 * time.Hour).Unix() + var queryID int + err = callCreateMarketWithComponents(ctx, platform, &userAddr, queryComponents, settleTime, int64(5), int64(100), func(row *common.Row) error { + queryID = int(row.Values[0].(int64)) + return nil + }) + require.NoError(t, err) + + // 2. Verify Structured Columns in DB + // We'll query ob_queries directly using standard SQL to prove denormalization worked + engineCtx := engCtx(ctx, platform, userAddr.Address(), 1) + + var dbProvider []byte + var dbStreamID []byte + var dbActionID string + var dbQueryArgs []byte + + err = platform.Engine.Execute(engineCtx, platform.DB, + "SELECT data_provider, stream_id, action_id, query_args FROM ob_queries WHERE id = " + fmt.Sprintf("%d", queryID), nil, func(row *common.Row) error { + require.NotNil(t, row.Values[0]) + v0, ok := row.Values[0].([]byte) + require.True(t, ok, "expected []byte for data_provider") + dbProvider = v0 + + require.NotNil(t, row.Values[1]) + v1, ok := row.Values[1].([]byte) + require.True(t, ok, "expected []byte for stream_id") + dbStreamID = v1 + + require.NotNil(t, row.Values[2]) + v2, ok := row.Values[2].(string) + require.True(t, ok, "expected string for action_id") + dbActionID = v2 + + require.NotNil(t, row.Values[3]) + v3, ok := row.Values[3].([]byte) + require.True(t, ok, "expected []byte for query_args") + dbQueryArgs = v3 + return nil + }) + require.NoError(t, err) + + require.Equal(t, gethCommon.HexToAddress(dataProvider).Bytes(), dbProvider, "data_provider should be denormalized") + + var expectedStreamID [32]byte + copy(expectedStreamID[:], []byte(streamID)) + require.Equal(t, expectedStreamID[:], dbStreamID, "stream_id should be denormalized") + + require.Equal(t, actionID, dbActionID, "action_id should be denormalized") + require.Equal(t, argsBytes, dbQueryArgs, "query_args should be denormalized") + + // 3. Test Discovery View + var discoveryCount int + // Parameters: $stream_id, $limit, $offset + res, err := platform.Engine.Call(engineCtx, platform.DB, "", "get_markets_by_stream", []any{expectedStreamID[:], int64(10), int64(0)}, func(row *common.Row) error { + discoveryCount++ + require.Equal(t, int64(queryID), row.Values[0].(int64), "discovered ID should match") + require.Equal(t, actionID, row.Values[3].(string), "discovered action_id should match") + return nil + }) + require.NoError(t, err) + require.Nil(t, res.Error) + require.Equal(t, 1, discoveryCount, "should find exactly one market for this stream") + + // 4. Create another market for the same stream to test indexing/multiple results + queryComponents2, err := encodeQueryComponentsABI(dataProvider, streamID, "price_below_threshold", []byte{0x00}) + require.NoError(t, err) + + err = callCreateMarketWithComponents(ctx, platform, &userAddr, queryComponents2, settleTime + 100, int64(5), int64(100), nil) + require.NoError(t, err) + + discoveryCount = 0 + res, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_markets_by_stream", []any{expectedStreamID[:], int64(10), int64(0)}, func(row *common.Row) error { + discoveryCount++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, discoveryCount, "should now find two markets for this stream") + + return nil + } +} + +// engCtx helper from other tests +func engCtx(ctx context.Context, platform *kwilTesting.Platform, caller string, height int64) *common.EngineContext { + return &common.EngineContext{ + TxContext: &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: height, + Timestamp: time.Now().Unix(), + }, + Caller: caller, + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + }, + } +} From 59cf74c1622842228d535d4eada13e2a61568304 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Tue, 24 Feb 2026 01:47:50 +0700 Subject: [PATCH 2/4] chore: apply suggestion --- internal/migrations/032-order-book-actions.sql | 2 +- internal/migrations/044-get-markets-by-stream.sql | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/migrations/032-order-book-actions.sql b/internal/migrations/032-order-book-actions.sql index d068f785c..abec609ec 100644 --- a/internal/migrations/032-order-book-actions.sql +++ b/internal/migrations/032-order-book-actions.sql @@ -114,7 +114,7 @@ CREATE OR REPLACE ACTION create_market( } -- Validation: Ensure decoding succeeded - if $data_provider IS NULL OR $stream_id IS NULL { + if $data_provider IS NULL OR $stream_id IS NULL OR $action_id_str IS NULL OR $query_args IS NULL { ERROR('Failed to decode query_components: invalid ABI data or missing components'); } diff --git a/internal/migrations/044-get-markets-by-stream.sql b/internal/migrations/044-get-markets-by-stream.sql index 4ea0406ac..6bc185000 100644 --- a/internal/migrations/044-get-markets-by-stream.sql +++ b/internal/migrations/044-get-markets-by-stream.sql @@ -36,7 +36,9 @@ PUBLIC VIEW RETURNS TABLE ( ERROR('stream_id is required'); } - -- Apply default and max limits + -- Apply default and max limits + -- Note: This logic is intentionally duplicated from list_markets (032-order-book-actions.sql) + -- to maintain consistency in pagination behavior. $effective_limit INT := 100; $effective_offset INT := 0; From 76d4b9178cd18846f6bce595e556aaf1ef163ed1 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Tue, 24 Feb 2026 02:41:24 +0700 Subject: [PATCH 3/4] chore: apply suggestion --- internal/migrations/030-order-book-schema.sql | 9 +++++ .../migrations/032-order-book-actions.sql | 12 ++++-- .../migrations/043-denormalize-ob-queries.sql | 19 ---------- ...ream.sql => 043-get-markets-by-stream.sql} | 0 tests/streams/order_book/discovery_test.go | 37 ++++++------------- 5 files changed, 30 insertions(+), 47 deletions(-) delete mode 100644 internal/migrations/043-denormalize-ob-queries.sql rename internal/migrations/{044-get-markets-by-stream.sql => 043-get-markets-by-stream.sql} (100%) diff --git a/internal/migrations/030-order-book-schema.sql b/internal/migrations/030-order-book-schema.sql index 8238c19d2..abdb643ea 100644 --- a/internal/migrations/030-order-book-schema.sql +++ b/internal/migrations/030-order-book-schema.sql @@ -21,6 +21,10 @@ CREATE TABLE IF NOT EXISTS ob_queries ( id INT PRIMARY KEY, hash BYTEA NOT NULL UNIQUE, query_components BYTEA NOT NULL, + data_provider BYTEA, + stream_id BYTEA, + action_id TEXT, + query_args BYTEA, settle_time INT8 NOT NULL, settled BOOLEAN DEFAULT false NOT NULL, winning_outcome BOOLEAN, @@ -96,6 +100,11 @@ CREATE INDEX IF NOT EXISTS idx_ob_queries_settled CREATE INDEX IF NOT EXISTS idx_ob_queries_components ON ob_queries(query_components); +-- Discovery indexes for high-performance lookups +CREATE INDEX IF NOT EXISTS idx_ob_queries_stream_id ON ob_queries(stream_id); +CREATE INDEX IF NOT EXISTS idx_ob_queries_provider_stream ON ob_queries(data_provider, stream_id); +CREATE INDEX IF NOT EXISTS idx_ob_queries_action_id ON ob_queries(action_id); + -- ============================================================================= -- Transaction method registration -- ============================================================================= diff --git a/internal/migrations/032-order-book-actions.sql b/internal/migrations/032-order-book-actions.sql index abec609ec..5c29df0c8 100644 --- a/internal/migrations/032-order-book-actions.sql +++ b/internal/migrations/032-order-book-actions.sql @@ -273,7 +273,11 @@ PUBLIC VIEW RETURNS ( max_spread INT, min_order_size INT8, created_at INT8, - creator BYTEA + creator BYTEA, + data_provider BYTEA, + stream_id BYTEA, + action_id TEXT, + query_args BYTEA ) { if $query_id IS NULL { ERROR('query_id is required'); @@ -281,13 +285,15 @@ PUBLIC VIEW RETURNS ( for $market in SELECT hash, query_components, bridge, settle_time, settled, winning_outcome, settled_at, - max_spread, min_order_size, created_at, creator + max_spread, min_order_size, created_at, creator, + data_provider, stream_id, action_id, query_args FROM ob_queries WHERE id = $query_id { RETURN $market.hash, $market.query_components, $market.bridge, $market.settle_time, $market.settled, $market.winning_outcome, $market.settled_at, $market.max_spread, - $market.min_order_size, $market.created_at, $market.creator; + $market.min_order_size, $market.created_at, $market.creator, + $market.data_provider, $market.stream_id, $market.action_id, $market.query_args; } ERROR('Market not found: ' || $query_id::TEXT); diff --git a/internal/migrations/043-denormalize-ob-queries.sql b/internal/migrations/043-denormalize-ob-queries.sql deleted file mode 100644 index 4f72d2cbe..000000000 --- a/internal/migrations/043-denormalize-ob-queries.sql +++ /dev/null @@ -1,19 +0,0 @@ --- Migration: Denormalize ob_queries table for discovery --- Adds structured columns for searchable market data components. --- Requires Phase 1 (tn_utils.decode_query_components precompile) to be deployed. - --- 1. Add new columns (nullable to accommodate legacy testnet data) -ALTER TABLE ob_queries ADD COLUMN data_provider BYTEA; -ALTER TABLE ob_queries ADD COLUMN stream_id BYTEA; -ALTER TABLE ob_queries ADD COLUMN action_id TEXT; -ALTER TABLE ob_queries ADD COLUMN query_args BYTEA; - --- 2. Add indexes for high-performance discovery --- Enable direct stream page lookups (O(log n)) -CREATE INDEX IF NOT EXISTS idx_ob_queries_stream_id ON ob_queries(stream_id); - --- Enable per-provider stream filtering -CREATE INDEX IF NOT EXISTS idx_ob_queries_provider_stream ON ob_queries(data_provider, stream_id); - --- Enable filtering by action type (e.g. price_above_threshold) -CREATE INDEX IF NOT EXISTS idx_ob_queries_action_id ON ob_queries(action_id); diff --git a/internal/migrations/044-get-markets-by-stream.sql b/internal/migrations/043-get-markets-by-stream.sql similarity index 100% rename from internal/migrations/044-get-markets-by-stream.sql rename to internal/migrations/043-get-markets-by-stream.sql diff --git a/tests/streams/order_book/discovery_test.go b/tests/streams/order_book/discovery_test.go index c2298c03f..7c0cfb3ea 100644 --- a/tests/streams/order_book/discovery_test.go +++ b/tests/streams/order_book/discovery_test.go @@ -4,7 +4,6 @@ package order_book import ( "context" - "fmt" "testing" "time" @@ -62,8 +61,8 @@ func testDiscoveryWorkflow(t *testing.T) func(ctx context.Context, platform *kwi }) require.NoError(t, err) - // 2. Verify Structured Columns in DB - // We'll query ob_queries directly using standard SQL to prove denormalization worked + // 2. Verify Structured Columns via get_market_info + // This also tests that get_market_info now returns the denormalized columns engineCtx := engCtx(ctx, platform, userAddr.Address(), 1) var dbProvider []byte @@ -71,30 +70,18 @@ func testDiscoveryWorkflow(t *testing.T) func(ctx context.Context, platform *kwi var dbActionID string var dbQueryArgs []byte - err = platform.Engine.Execute(engineCtx, platform.DB, - "SELECT data_provider, stream_id, action_id, query_args FROM ob_queries WHERE id = " + fmt.Sprintf("%d", queryID), nil, func(row *common.Row) error { - require.NotNil(t, row.Values[0]) - v0, ok := row.Values[0].([]byte) - require.True(t, ok, "expected []byte for data_provider") - dbProvider = v0 - - require.NotNil(t, row.Values[1]) - v1, ok := row.Values[1].([]byte) - require.True(t, ok, "expected []byte for stream_id") - dbStreamID = v1 - - require.NotNil(t, row.Values[2]) - v2, ok := row.Values[2].(string) - require.True(t, ok, "expected string for action_id") - dbActionID = v2 - - require.NotNil(t, row.Values[3]) - v3, ok := row.Values[3].([]byte) - require.True(t, ok, "expected []byte for query_args") - dbQueryArgs = v3 + res, err := platform.Engine.Call(engineCtx, platform.DB, "", "get_market_info", []any{int64(queryID)}, func(row *common.Row) error { + // get_market_info now returns 15 columns. Denormalized ones are at 11, 12, 13, 14 + require.Equal(t, 15, len(row.Values), "get_market_info should return 15 columns") + + dbProvider = row.Values[11].([]byte) + dbStreamID = row.Values[12].([]byte) + dbActionID = row.Values[13].(string) + dbQueryArgs = row.Values[14].([]byte) return nil }) require.NoError(t, err) + require.Nil(t, res.Error) require.Equal(t, gethCommon.HexToAddress(dataProvider).Bytes(), dbProvider, "data_provider should be denormalized") @@ -108,7 +95,7 @@ func testDiscoveryWorkflow(t *testing.T) func(ctx context.Context, platform *kwi // 3. Test Discovery View var discoveryCount int // Parameters: $stream_id, $limit, $offset - res, err := platform.Engine.Call(engineCtx, platform.DB, "", "get_markets_by_stream", []any{expectedStreamID[:], int64(10), int64(0)}, func(row *common.Row) error { + res, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_markets_by_stream", []any{expectedStreamID[:], int64(10), int64(0)}, func(row *common.Row) error { discoveryCount++ require.Equal(t, int64(queryID), row.Values[0].(int64), "discovered ID should match") require.Equal(t, actionID, row.Values[3].(string), "discovered action_id should match") From 0b1cc130b536310006b7a6a3d06ee542805b94e3 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Tue, 24 Feb 2026 02:53:01 +0700 Subject: [PATCH 4/4] chore: apply suggestion --- internal/migrations/030-order-book-schema.sql | 2 +- internal/migrations/043-get-markets-by-stream.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/migrations/030-order-book-schema.sql b/internal/migrations/030-order-book-schema.sql index abdb643ea..47a06d236 100644 --- a/internal/migrations/030-order-book-schema.sql +++ b/internal/migrations/030-order-book-schema.sql @@ -101,7 +101,7 @@ CREATE INDEX IF NOT EXISTS idx_ob_queries_components ON ob_queries(query_components); -- Discovery indexes for high-performance lookups -CREATE INDEX IF NOT EXISTS idx_ob_queries_stream_id ON ob_queries(stream_id); +CREATE INDEX IF NOT EXISTS idx_ob_queries_stream_id_created ON ob_queries(stream_id, created_at); CREATE INDEX IF NOT EXISTS idx_ob_queries_provider_stream ON ob_queries(data_provider, stream_id); CREATE INDEX IF NOT EXISTS idx_ob_queries_action_id ON ob_queries(action_id); diff --git a/internal/migrations/043-get-markets-by-stream.sql b/internal/migrations/043-get-markets-by-stream.sql index 6bc185000..0ae971b27 100644 --- a/internal/migrations/043-get-markets-by-stream.sql +++ b/internal/migrations/043-get-markets-by-stream.sql @@ -53,6 +53,6 @@ PUBLIC VIEW RETURNS TABLE ( winning_outcome, max_spread, min_order_size, created_at FROM ob_queries WHERE stream_id = $stream_id - ORDER BY created_at DESC + ORDER BY created_at DESC, id DESC LIMIT $effective_limit OFFSET $effective_offset; };