Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/migrations/030-order-book-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ CREATE TABLE IF NOT EXISTS ob_queries (
id INT PRIMARY KEY,
hash BYTEA NOT NULL UNIQUE,
query_components BYTEA NOT NULL,
data_provider BYTEA,
stream_id BYTEA,
action_id TEXT,
query_args BYTEA,
Comment thread
MicBun marked this conversation as resolved.
settle_time INT8 NOT NULL,
settled BOOLEAN DEFAULT false NOT NULL,
winning_outcome BOOLEAN,
Expand Down Expand Up @@ -96,6 +100,11 @@ CREATE INDEX IF NOT EXISTS idx_ob_queries_settled
CREATE INDEX IF NOT EXISTS idx_ob_queries_components
ON ob_queries(query_components);

-- Discovery indexes for high-performance lookups
CREATE INDEX IF NOT EXISTS idx_ob_queries_stream_id_created ON ob_queries(stream_id, created_at);
CREATE INDEX IF NOT EXISTS idx_ob_queries_provider_stream ON ob_queries(data_provider, stream_id);
CREATE INDEX IF NOT EXISTS idx_ob_queries_action_id ON ob_queries(action_id);

Comment thread
MicBun marked this conversation as resolved.
-- =============================================================================
-- Transaction method registration
-- =============================================================================
Expand Down
37 changes: 34 additions & 3 deletions internal/migrations/032-order-book-actions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ CREATE OR REPLACE ACTION create_market(
ERROR('query_components is required (ABI-encoded (address,bytes32,string,bytes))');
}

-- Decode components for structured storage
$data_provider BYTEA;
$stream_id BYTEA;
$action_id_str TEXT;
$query_args BYTEA;
for $row in tn_utils.decode_query_components($query_components) {
$data_provider := $row.data_provider;
$stream_id := $row.stream_id;
$action_id_str := $row.action_id;
$query_args := $row.args;
}

-- Validation: Ensure decoding succeeded
if $data_provider IS NULL OR $stream_id IS NULL OR $action_id_str IS NULL OR $query_args IS NULL {
ERROR('Failed to decode query_components: invalid ABI data or missing components');
}

-- Compute hash from query components using attestation format
-- This ensures market hash matches attestation hash for automatic settlement
$query_hash BYTEA;
Expand Down Expand Up @@ -187,6 +204,10 @@ CREATE OR REPLACE ACTION create_market(
id,
hash,
query_components,
data_provider,
stream_id,
action_id,
query_args,
Comment thread
MicBun marked this conversation as resolved.
settle_time,
max_spread,
min_order_size,
Expand All @@ -198,6 +219,10 @@ CREATE OR REPLACE ACTION create_market(
COALESCE(MAX(id), 0) + 1,
$query_hash,
$query_components,
$data_provider,
$stream_id,
$action_id_str,
$query_args,
$settle_time,
$max_spread,
$min_order_size,
Expand Down Expand Up @@ -248,21 +273,27 @@ PUBLIC VIEW RETURNS (
max_spread INT,
min_order_size INT8,
created_at INT8,
creator BYTEA
creator BYTEA,
data_provider BYTEA,
stream_id BYTEA,
action_id TEXT,
query_args BYTEA
) {
if $query_id IS NULL {
ERROR('query_id is required');
}

for $market in
SELECT hash, query_components, bridge, settle_time, settled, winning_outcome, settled_at,
max_spread, min_order_size, created_at, creator
max_spread, min_order_size, created_at, creator,
data_provider, stream_id, action_id, query_args
FROM ob_queries
WHERE id = $query_id
{
RETURN $market.hash, $market.query_components, $market.bridge, $market.settle_time, $market.settled,
$market.winning_outcome, $market.settled_at, $market.max_spread,
$market.min_order_size, $market.created_at, $market.creator;
$market.min_order_size, $market.created_at, $market.creator,
$market.data_provider, $market.stream_id, $market.action_id, $market.query_args;
}

ERROR('Market not found: ' || $query_id::TEXT);
Expand Down
19 changes: 0 additions & 19 deletions internal/migrations/043-denormalize-ob-queries.sql

This file was deleted.

58 changes: 58 additions & 0 deletions internal/migrations/043-get-markets-by-stream.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
-- Migration: Order Book Discovery Action
-- Adds get_markets_by_stream discovery view for high-performance indexed lookups.

-- =============================================================================
-- get_markets_by_stream: Discovery view for asset pages
-- =============================================================================
/**
* Returns all markets associated with a specific stream ID.
* This lookup is high-performance (indexed).
*
* Parameters:
* - $stream_id: The 32-byte stream identifier
* - $limit_val: Maximum number of results (default 100, max 100)
* - $offset_val: Number of results to skip (default 0)
*
* Returns table of market summaries.
*/
CREATE OR REPLACE ACTION get_markets_by_stream(
$stream_id BYTEA,
$limit_val INT,
$offset_val INT
)
PUBLIC VIEW RETURNS TABLE (
id INT,
hash BYTEA,
data_provider BYTEA,
action_id TEXT,
settle_time INT8,
settled BOOLEAN,
winning_outcome BOOLEAN,
max_spread INT,
min_order_size INT8,
created_at INT8
) {
if $stream_id IS NULL {
ERROR('stream_id is required');
}

-- Apply default and max limits
-- Note: This logic is intentionally duplicated from list_markets (032-order-book-actions.sql)
-- to maintain consistency in pagination behavior.
$effective_limit INT := 100;
$effective_offset INT := 0;

if $limit_val IS NOT NULL AND $limit_val > 0 AND $limit_val <= 100 {
$effective_limit := $limit_val;
}
if $offset_val IS NOT NULL AND $offset_val >= 0 {
$effective_offset := $offset_val;
}

RETURN SELECT id, hash, data_provider, action_id, settle_time, settled,
winning_outcome, max_spread, min_order_size, created_at
FROM ob_queries
WHERE stream_id = $stream_id
ORDER BY created_at DESC, id DESC
LIMIT $effective_limit OFFSET $effective_offset;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
};
141 changes: 141 additions & 0 deletions tests/streams/order_book/discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//go:build kwiltest

package order_book

import (
"context"
"testing"
"time"

gethCommon "github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"github.com/trufnetwork/kwil-db/common"
coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth"
erc20bridge "github.com/trufnetwork/kwil-db/node/exts/erc20-bridge/erc20"
kwilTesting "github.com/trufnetwork/kwil-db/testing"
"github.com/trufnetwork/node/internal/migrations"
testutils "github.com/trufnetwork/node/tests/streams/utils"
"github.com/trufnetwork/sdk-go/core/util"
)

// TestOrderBookDiscovery verifies that new markets populate denormalized columns
// and can be discovered via the indexed get_markets_by_stream view.
func TestOrderBookDiscovery(t *testing.T) {
testutils.RunSchemaTest(t, kwilTesting.SchemaTest{
Name: "ORDER_BOOK_Discovery",
SeedStatements: migrations.GetSeedScriptStatements(),
FunctionTests: []kwilTesting.TestFunc{
testDiscoveryWorkflow(t),
},
}, testutils.GetTestOptionsWithCache())
}

func testDiscoveryWorkflow(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
// Reset balance point tracker
lastBalancePointComponents = nil
userAddr := util.Unsafe_NewEthereumAddressFromString("0x1111111111111111111111111111111111111111")

// Initialize ERC20 extension
err := erc20bridge.ForTestingInitializeExtension(ctx, platform)
require.NoError(t, err)

// Give user balance for fees
err = giveBalanceChainedComponents(ctx, platform, userAddr.Address(), "100000000000000000000")
require.NoError(t, err)

// 1. Create a Market
dataProvider := "0x2222222222222222222222222222222222222222"
streamID := "stdiscovery000000000000000000000" // Exactly 32 chars
actionID := "price_above_threshold"
argsBytes := []byte{0xDE, 0xAD, 0xBE, 0xEF}

queryComponents, err := encodeQueryComponentsABI(dataProvider, streamID, actionID, argsBytes)
require.NoError(t, err)

settleTime := time.Now().Add(1 * time.Hour).Unix()
var queryID int
err = callCreateMarketWithComponents(ctx, platform, &userAddr, queryComponents, settleTime, int64(5), int64(100), func(row *common.Row) error {
queryID = int(row.Values[0].(int64))
return nil
})
require.NoError(t, err)

// 2. Verify Structured Columns via get_market_info
// This also tests that get_market_info now returns the denormalized columns
engineCtx := engCtx(ctx, platform, userAddr.Address(), 1)

var dbProvider []byte
var dbStreamID []byte
var dbActionID string
var dbQueryArgs []byte

res, err := platform.Engine.Call(engineCtx, platform.DB, "", "get_market_info", []any{int64(queryID)}, func(row *common.Row) error {
// get_market_info now returns 15 columns. Denormalized ones are at 11, 12, 13, 14
require.Equal(t, 15, len(row.Values), "get_market_info should return 15 columns")

dbProvider = row.Values[11].([]byte)
dbStreamID = row.Values[12].([]byte)
dbActionID = row.Values[13].(string)
dbQueryArgs = row.Values[14].([]byte)
return nil
})
require.NoError(t, err)
require.Nil(t, res.Error)

require.Equal(t, gethCommon.HexToAddress(dataProvider).Bytes(), dbProvider, "data_provider should be denormalized")

var expectedStreamID [32]byte
copy(expectedStreamID[:], []byte(streamID))
require.Equal(t, expectedStreamID[:], dbStreamID, "stream_id should be denormalized")

require.Equal(t, actionID, dbActionID, "action_id should be denormalized")
require.Equal(t, argsBytes, dbQueryArgs, "query_args should be denormalized")

// 3. Test Discovery View
var discoveryCount int
// Parameters: $stream_id, $limit, $offset
res, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_markets_by_stream", []any{expectedStreamID[:], int64(10), int64(0)}, func(row *common.Row) error {
discoveryCount++
require.Equal(t, int64(queryID), row.Values[0].(int64), "discovered ID should match")
require.Equal(t, actionID, row.Values[3].(string), "discovered action_id should match")
return nil
})
require.NoError(t, err)
require.Nil(t, res.Error)
require.Equal(t, 1, discoveryCount, "should find exactly one market for this stream")

// 4. Create another market for the same stream to test indexing/multiple results
queryComponents2, err := encodeQueryComponentsABI(dataProvider, streamID, "price_below_threshold", []byte{0x00})
require.NoError(t, err)

err = callCreateMarketWithComponents(ctx, platform, &userAddr, queryComponents2, settleTime + 100, int64(5), int64(100), nil)
require.NoError(t, err)

discoveryCount = 0
res, err = platform.Engine.Call(engineCtx, platform.DB, "", "get_markets_by_stream", []any{expectedStreamID[:], int64(10), int64(0)}, func(row *common.Row) error {
discoveryCount++
return nil
})
require.NoError(t, err)
require.Equal(t, 2, discoveryCount, "should now find two markets for this stream")

return nil
}
}

// engCtx helper from other tests
func engCtx(ctx context.Context, platform *kwilTesting.Platform, caller string, height int64) *common.EngineContext {
return &common.EngineContext{
TxContext: &common.TxContext{
Ctx: ctx,
BlockContext: &common.BlockContext{
Height: height,
Timestamp: time.Now().Unix(),
},
Caller: caller,
TxID: platform.Txid(),
Authenticator: coreauth.EthPersonalSignAuth,
},
}
}
Loading