Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion internal/migrations/000-initial-data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,27 @@ CREATE INDEX IF NOT EXISTS meta_key_ref_idx ON metadata (metadata_key, value_ref
-- (data_provider, stream_id, metadata_key)
-- WHERE disabled_at IS NULL;
-- for now, we just index disabled_at
CREATE INDEX IF NOT EXISTS meta_disabled_idx ON metadata (disabled_at);
CREATE INDEX IF NOT EXISTS meta_disabled_idx ON metadata (disabled_at);

/* ============================================================================
* Transaction Tracking Columns
* ============================================================================
* Add tx_id columns to data tables for transaction effect tracking.
* This enables querying what data was created/modified by specific transactions.
*/

-- Add tx_id to streams table (for deployStream tracking)
ALTER TABLE streams ADD COLUMN IF NOT EXISTS tx_id TEXT;
CREATE INDEX IF NOT EXISTS streams_tx_id_idx ON streams (tx_id);

-- Add tx_id to primitive_events table (for insertRecords tracking)
ALTER TABLE primitive_events ADD COLUMN IF NOT EXISTS tx_id TEXT;
CREATE INDEX IF NOT EXISTS pe_tx_id_idx ON primitive_events (tx_id);

-- Add tx_id to taxonomies table (for setTaxonomies tracking)
ALTER TABLE taxonomies ADD COLUMN IF NOT EXISTS tx_id TEXT;
CREATE INDEX IF NOT EXISTS tax_tx_id_idx ON taxonomies (tx_id);

-- Add tx_id to metadata table (for setMetadata tracking)
ALTER TABLE metadata ADD COLUMN IF NOT EXISTS tx_id TEXT;
CREATE INDEX IF NOT EXISTS meta_tx_id_idx ON metadata (tx_id);
69 changes: 41 additions & 28 deletions internal/migrations/001-common-actions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,15 @@ CREATE OR REPLACE ACTION create_streams(
}

-- Create the streams using UNNEST for optimal performance
INSERT INTO streams (id, data_provider_id, data_provider, stream_id, stream_type, created_at)
INSERT INTO streams (id, data_provider_id, data_provider, stream_id, stream_type, created_at, tx_id)
SELECT
ROW_NUMBER() OVER (ORDER BY t.stream_id) + COALESCE((SELECT MAX(id) FROM streams), 0) AS id,
$data_provider_id,
$data_provider,
t.stream_id,
t.stream_type,
@height
@height,
@txid
FROM UNNEST($stream_ids, $stream_types) AS t(stream_id, stream_type);

-- Create metadata for the streams using UNNEST for optimal performance
Expand All @@ -168,7 +169,8 @@ CREATE OR REPLACE ACTION create_streams(
value_ref,
created_at,
disabled_at,
stream_ref
stream_ref,
tx_id
)
SELECT
uuid_generate_v5($base_uuid, 'metadata' || $data_provider || t.stream_id || 'stream_owner' || '1')::UUID,
Expand All @@ -180,7 +182,8 @@ CREATE OR REPLACE ACTION create_streams(
LOWER($data_provider)::TEXT,
@height,
NULL::INT,
s.id
s.id,
@txid
FROM UNNEST($stream_ids, $stream_types) AS t(stream_id, stream_type)
JOIN data_providers dp ON dp.address = $data_provider
JOIN streams s ON s.data_provider_id = dp.id AND s.stream_id = t.stream_id;
Expand All @@ -196,7 +199,8 @@ CREATE OR REPLACE ACTION create_streams(
value_ref,
created_at,
disabled_at,
stream_ref
stream_ref,
tx_id
)
SELECT
uuid_generate_v5($base_uuid, 'metadata' || $data_provider || t.stream_id || 'read_visibility' || '2')::UUID,
Expand All @@ -208,7 +212,8 @@ CREATE OR REPLACE ACTION create_streams(
NULL::TEXT,
@height,
NULL::INT,
s.id
s.id,
@txid
FROM UNNEST($stream_ids, $stream_types) AS t(stream_id, stream_type)
JOIN data_providers dp ON dp.address = $data_provider
JOIN streams s ON s.data_provider_id = dp.id AND s.stream_id = t.stream_id;
Expand All @@ -224,7 +229,8 @@ CREATE OR REPLACE ACTION create_streams(
value_ref,
created_at,
disabled_at,
stream_ref
stream_ref,
tx_id
)
SELECT
uuid_generate_v5($base_uuid, 'metadata' || $data_provider || t.stream_id || 'readonly_key' || '3')::UUID,
Expand All @@ -236,7 +242,8 @@ CREATE OR REPLACE ACTION create_streams(
NULL::TEXT,
@height,
NULL::INT,
s.id
s.id,
@txid
FROM UNNEST($stream_ids, $stream_types) AS t(stream_id, stream_type)
JOIN data_providers dp ON dp.address = $data_provider
JOIN streams s ON s.data_provider_id = dp.id AND s.stream_id = t.stream_id;
Expand All @@ -252,7 +259,8 @@ CREATE OR REPLACE ACTION create_streams(
value_ref,
created_at,
disabled_at,
stream_ref
stream_ref,
tx_id
)
SELECT
uuid_generate_v5($base_uuid, 'metadata' || $data_provider || t.stream_id || 'readonly_key' || '4')::UUID,
Expand All @@ -264,7 +272,8 @@ CREATE OR REPLACE ACTION create_streams(
NULL::TEXT,
@height,
NULL::INT,
s.id
s.id,
@txid
FROM UNNEST($stream_ids, $stream_types) AS t(stream_id, stream_type)
JOIN data_providers dp ON dp.address = $data_provider
JOIN streams s ON s.data_provider_id = dp.id AND s.stream_id = t.stream_id;
Expand All @@ -280,7 +289,8 @@ CREATE OR REPLACE ACTION create_streams(
value_ref,
created_at,
disabled_at,
stream_ref
stream_ref,
tx_id
)
SELECT
uuid_generate_v5($base_uuid, 'metadata' || $data_provider || t.stream_id || 'type' || '5')::UUID,
Expand All @@ -292,7 +302,8 @@ CREATE OR REPLACE ACTION create_streams(
NULL::TEXT,
@height,
NULL::INT,
s.id
s.id,
@txid
FROM UNNEST($stream_ids, $stream_types) AS t(stream_id, stream_type)
JOIN data_providers dp ON dp.address = $data_provider
JOIN streams s ON s.data_provider_id = dp.id AND s.stream_id = t.stream_id;
Expand Down Expand Up @@ -371,25 +382,27 @@ CREATE OR REPLACE ACTION insert_metadata(

-- Insert the metadata
INSERT INTO metadata (
row_id,
metadata_key,
value_i,
value_f,
value_s,
value_b,
value_ref,
row_id,
metadata_key,
value_i,
value_f,
value_s,
value_b,
value_ref,
created_at,
stream_ref
stream_ref,
tx_id
) VALUES (
$uuid,
$key,
$value_i,
$value_f,
$value_s,
$value_b,
LOWER($value_ref),
$uuid,
$key,
$value_i,
$value_f,
$value_s,
$value_b,
LOWER($value_ref),
$current_block,
$stream_ref
$stream_ref,
@txid
);

record_transaction_event(
Expand Down
5 changes: 3 additions & 2 deletions internal/migrations/003-primitive-insertion.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ CREATE OR REPLACE ACTION insert_records(
}

-- Insert all records using UNNEST to expand arrays efficiently
INSERT INTO primitive_events (event_time, value, created_at, truflation_created_at, stream_ref)
INSERT INTO primitive_events (event_time, value, created_at, truflation_created_at, stream_ref, tx_id)
SELECT
unnested.event_time,
unnested.value,
$current_block,
NULL,
unnested.stream_ref
unnested.stream_ref,
@txid
FROM UNNEST($event_time, $value, $stream_refs) AS unnested(event_time, value, stream_ref)
WHERE unnested.value != 0::NUMERIC(36,18)
ORDER BY unnested.stream_ref, unnested.event_time, $current_block; -- matches (stream_ref, event_time, created_at)
Expand Down
6 changes: 4 additions & 2 deletions internal/migrations/004-composed-taxonomy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ CREATE OR REPLACE ACTION insert_taxonomy(
group_sequence,
start_time,
stream_ref,
child_stream_ref
child_stream_ref,
tx_id
) VALUES (
$taxonomy_id,
$weight_value,
Expand All @@ -119,7 +120,8 @@ CREATE OR REPLACE ACTION insert_taxonomy(
$new_group_sequence, -- Use the new group_sequence for all child records.
$start_date, -- Start date of the taxonomy.
$stream_ref,
$child_stream_ref
$child_stream_ref,
@txid
);
}

Expand Down
4 changes: 3 additions & 1 deletion tests/streams/attestation/attestation_max_fee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ func testMaxFeeBelow40TRUF(t *testing.T, h *AttestationTestHelper, actionName st
argsBytes, err := tn_utils.EncodeActionArgs([]any{int64(46)})
require.NoError(t, err, "encode action args")

engineCtx := h.NewEngineContext()
// Use non-exempt user to test max_fee validation (exempt users skip fee checks)
addrs := NewTestAddresses()
engineCtx := h.NewNonExemptContext(addrs.Requester1)

maxFee := types.MustParseDecimalExplicit("30000000000000000000", 78, 0) // 30 TRUF in wei

Expand Down
12 changes: 6 additions & 6 deletions tests/streams/attestation/attestation_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ type attestationRow struct {
}

func runAttestationUnauthorizedBlocked(t *testing.T, ctx context.Context, platform *kwilTesting.Platform, helper *AttestationTestHelper, actionName string) {
// Create an unauthorized user that does NOT have network_writer role
// Create a non-exempt user that does NOT have network_writer role (must pay 40 TRUF fee)
unauthorizedAddr := util.Unsafe_NewEthereumAddressFromString("0x0000000000000000000000000000000000009999")

argsBytes, err := tn_utils.EncodeActionArgs([]any{int64(999)})
require.NoError(t, err, "encode action args")

// Create a context for the unauthorized user
// Create a context for the non-exempt user (no balance given, so can't pay fee)
unauthorizedCtx := &common.EngineContext{
TxContext: &common.TxContext{
Ctx: ctx,
Expand All @@ -155,7 +155,7 @@ func runAttestationUnauthorizedBlocked(t *testing.T, ctx context.Context, platfo
},
}

// Try to request attestation as unauthorized user - should fail
// Try to request attestation as non-exempt user without balance - should fail with insufficient balance
res, err := platform.Engine.Call(unauthorizedCtx, platform.DB, "", "request_attestation", []any{
TestDataProviderHex,
TestStreamID,
Expand All @@ -168,9 +168,9 @@ func runAttestationUnauthorizedBlocked(t *testing.T, ctx context.Context, platfo
})

require.NoError(t, err, "call should not error at engine level")
require.NotNil(t, res.Error, "action should return error for unauthorized user")
require.Contains(t, res.Error.Error(), "does not have the required system:network_writer role",
"error should indicate missing network_writer role")
require.NotNil(t, res.Error, "action should return error for user without balance")
require.Contains(t, res.Error.Error(), "Insufficient balance for attestation",
"error should indicate insufficient balance (non-exempt users must pay 40 TRUF fee)")
}

func fetchAttestationRow(helper *AttestationTestHelper, hash []byte) attestationRow {
Expand Down
37 changes: 9 additions & 28 deletions tests/streams/attestation/request_attestation_fee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,22 @@ func setupAttestationTestEnvironment(t *testing.T) func(ctx context.Context, pla
}
}

// Test 1: Network writer member pays 40 TRUF fee per attestation request
// Test 1: Non-exempt user pays 40 TRUF fee per attestation request
func testAttestationNetworkWriterPaysFee(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
requesterAddrVal := util.Unsafe_NewEthereumAddressFromString("0xa111111111111111111111111111111111111111")
requesterAddr := &requesterAddrVal

// Register as data provider with network_writer role
err := setup.CreateDataProvider(ctx, platform, requesterAddr.Address())
require.NoError(t, err, "failed to register data provider")

// Give requester 100 TRUF
err = giveAttestationBalance(ctx, platform, requesterAddr.Address(), "100000000000000000000")
// Note: NOT creating data provider - this user is non-exempt and must pay fees
// Give requester 100 TRUF (users with network_writer role are exempt, others must pay)
err := giveAttestationBalance(ctx, platform, requesterAddr.Address(), "100000000000000000000")
require.NoError(t, err, "failed to give balance")

// Get initial balance
initialBalance, err := getAttestationBalance(ctx, platform, requesterAddr.Address())
require.NoError(t, err, "failed to get initial balance")

// Request attestation (should pay 40 TRUF)
// Request attestation (should pay 40 TRUF as non-exempt user)
systemAdmin := util.Unsafe_NewEthereumAddressFromString("0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf")
streamID := "st000000000000000000000000000000"
err = requestAttestation(ctx, platform, requesterAddr, systemAdmin.Address(), streamID, "get_record")
Expand All @@ -143,12 +140,8 @@ func testAttestationInsufficientBalance(t *testing.T) func(ctx context.Context,
requesterAddrVal := util.Unsafe_NewEthereumAddressFromString("0xa222222222222222222222222222222222222222")
requesterAddr := &requesterAddrVal

// Register as data provider with network_writer role
err := setup.CreateDataProvider(ctx, platform, requesterAddr.Address())
require.NoError(t, err, "failed to register data provider")

// Give requester only 10 TRUF (insufficient for 40 TRUF fee)
err = giveAttestationBalance(ctx, platform, requesterAddr.Address(), "10000000000000000000")
err := giveAttestationBalance(ctx, platform, requesterAddr.Address(), "10000000000000000000")
require.NoError(t, err, "failed to give balance")

// Try to request attestation (should fail)
Expand All @@ -169,12 +162,8 @@ func testAttestationMultipleRequestsChargeFees(t *testing.T) func(ctx context.Co
requesterAddrVal := util.Unsafe_NewEthereumAddressFromString("0xa333333333333333333333333333333333333333")
requesterAddr := &requesterAddrVal

// Register as data provider with network_writer role
err := setup.CreateDataProvider(ctx, platform, requesterAddr.Address())
require.NoError(t, err, "failed to register data provider")

// Give requester 200 TRUF (enough for 5 attestations)
err = giveAttestationBalance(ctx, platform, requesterAddr.Address(), "200000000000000000000")
err := giveAttestationBalance(ctx, platform, requesterAddr.Address(), "200000000000000000000")
require.NoError(t, err, "failed to give balance")

// Get initial balance
Expand Down Expand Up @@ -211,12 +200,8 @@ func testAttestationLeaderReceivesFees(t *testing.T) func(ctx context.Context, p
requesterAddrVal := util.Unsafe_NewEthereumAddressFromString("0xa444444444444444444444444444444444444444")
requesterAddr := &requesterAddrVal

// Register as data provider with network_writer role
err := setup.CreateDataProvider(ctx, platform, requesterAddr.Address())
require.NoError(t, err, "failed to register data provider")

// Give requester 100 TRUF
err = giveAttestationBalance(ctx, platform, requesterAddr.Address(), "100000000000000000000")
err := giveAttestationBalance(ctx, platform, requesterAddr.Address(), "100000000000000000000")
require.NoError(t, err, "failed to give balance")

// Generate leader keys
Expand Down Expand Up @@ -260,12 +245,8 @@ func testAttestationBalanceCorrectlyDeducted(t *testing.T) func(ctx context.Cont
requesterAddrVal := util.Unsafe_NewEthereumAddressFromString("0xa555555555555555555555555555555555555555")
requesterAddr := &requesterAddrVal

// Register as data provider with network_writer role
err := setup.CreateDataProvider(ctx, platform, requesterAddr.Address())
require.NoError(t, err, "failed to register data provider")

// Give requester exactly 80 TRUF (enough for 2 attestations)
err = giveAttestationBalance(ctx, platform, requesterAddr.Address(), "80000000000000000000")
err := giveAttestationBalance(ctx, platform, requesterAddr.Address(), "80000000000000000000")
require.NoError(t, err, "failed to give balance")

systemAdmin := util.Unsafe_NewEthereumAddressFromString("0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf")
Expand Down
21 changes: 21 additions & 0 deletions tests/streams/attestation/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,27 @@ func (h *AttestationTestHelper) NewEngineContext() *common.EngineContext {
}
}

// NewNonExemptContext creates a context for a non-exempt user (without network_writer role)
// This is useful for testing fee validation with non-exempt users
func (h *AttestationTestHelper) NewNonExemptContext(userAddr *util.EthereumAddress) *common.EngineContext {
// Give user balance to pay fees
h.GiveBalance(userAddr.Address(), "1000000000000000000000") // 1000 TRUF

return &common.EngineContext{
TxContext: &common.TxContext{
Ctx: h.ctx,
BlockContext: &common.BlockContext{
Height: 1,
Proposer: h.leaderPub, // Required for @leader_sender in fee transfers
},
Signer: userAddr.Bytes(),
Caller: userAddr.Address(),
TxID: h.platform.Txid(),
Authenticator: auth.EthPersonalSignAuth, // Required for balance operations
},
}
}

// NewLeaderContext creates a context with leader authorization
func (h *AttestationTestHelper) NewLeaderContext(privateKey kcrypto.PrivateKey, publicKey kcrypto.PublicKey) *common.EngineContext {
nodeSigner := auth.GetNodeSigner(privateKey)
Expand Down
Loading
Loading