diff --git a/extensions/tn_attestation/canonical.go b/extensions/tn_attestation/canonical.go index 0329e8d42..1b20123f6 100644 --- a/extensions/tn_attestation/canonical.go +++ b/extensions/tn_attestation/canonical.go @@ -9,7 +9,7 @@ import ( // CanonicalPayload represents the eight attestation fields stored in result_canonical. // The byte layout mirrors the SQL migration: fixed-width integers followed by -// length-prefixed blobs (little-endian 4-byte prefixes for variable sections). +// length-prefixed blobs (big-endian 4-byte prefixes for variable sections). // // Layout: // @@ -34,6 +34,29 @@ type CanonicalPayload struct { raw []byte } +// BuildCanonicalPayload assembles the canonical byte layout for the provided fields. +func BuildCanonicalPayload(version, algorithm uint8, blockHeight uint64, dataProvider, streamID []byte, actionID uint16, args, result []byte) []byte { + buf := bytes.NewBuffer(nil) + buf.WriteByte(version) + buf.WriteByte(algorithm) + + var heightBytes [8]byte + binary.BigEndian.PutUint64(heightBytes[:], blockHeight) + buf.Write(heightBytes[:]) + + buf.Write(lengthPrefixBigEndian(dataProvider)) + buf.Write(lengthPrefixBigEndian(streamID)) + + var actionBytes [2]byte + binary.BigEndian.PutUint16(actionBytes[:], actionID) + buf.Write(actionBytes[:]) + + buf.Write(lengthPrefixBigEndian(args)) + buf.Write(lengthPrefixBigEndian(result)) + + return buf.Bytes() +} + // ParseCanonicalPayload decodes the canonical payload into structured fields. // The function validates every length prefix and returns descriptive errors so // future maintainers can diagnose storage corruption quickly. @@ -95,13 +118,30 @@ func (p *CanonicalPayload) SigningDigest() [sha256.Size]byte { return sha256.Sum256(p.SigningBytes()) } -// readLengthPrefixed decodes a little-endian uint32 length followed by that many bytes. +// ValidateForEVM ensures canonical fields conform to the expectations of the EVM decoder. +func (p *CanonicalPayload) ValidateForEVM() error { + if len(p.DataProvider) != 20 { + return fmt.Errorf("data provider must be 20 bytes, got %d", len(p.DataProvider)) + } + if len(p.StreamID) != 32 { + return fmt.Errorf("stream id must be 32 bytes, got %d", len(p.StreamID)) + } + if p.Algorithm != 0 { + return fmt.Errorf("algorithm must be 0 (secp256k1), got %d", p.Algorithm) + } + if p.ActionID > 255 { + return fmt.Errorf("action id must be <=255, got %d", p.ActionID) + } + return nil +} + +// readLengthPrefixed decodes a big-endian uint32 length followed by that many bytes. func readLengthPrefixed(data []byte, cursor int) ([]byte, int, error) { if len(data) < cursor+4 { return nil, cursor, fmt.Errorf("truncated length prefix at offset %d", cursor) } - length := binary.LittleEndian.Uint32(data[cursor : cursor+4]) + length := binary.BigEndian.Uint32(data[cursor : cursor+4]) cursor += 4 if len(data) < cursor+int(length) { @@ -112,3 +152,13 @@ func readLengthPrefixed(data []byte, cursor int) ([]byte, int, error) { cursor += int(length) return bytes.Clone(chunk), cursor, nil } + +func lengthPrefixBigEndian(data []byte) []byte { + if data == nil { + data = []byte{} + } + prefixed := make([]byte, 4+len(data)) + binary.BigEndian.PutUint32(prefixed[:4], uint32(len(data))) + copy(prefixed[4:], data) + return prefixed +} diff --git a/extensions/tn_attestation/canonical_test.go b/extensions/tn_attestation/canonical_test.go index 5d5c08f7a..8628eb66e 100644 --- a/extensions/tn_attestation/canonical_test.go +++ b/extensions/tn_attestation/canonical_test.go @@ -3,7 +3,6 @@ package tn_attestation import ( "bytes" "crypto/sha256" - "encoding/binary" "testing" "github.com/stretchr/testify/require" @@ -11,15 +10,15 @@ import ( func TestParseCanonicalPayload_Success(t *testing.T) { version := uint8(1) - algo := uint8(1) + algo := uint8(0) height := uint64(12345) actionID := uint16(9) - dataProvider := []byte("provider-1") - streamID := []byte("stream-xyz") + dataProvider := bytes.Repeat([]byte{0x11}, 20) + streamID := bytes.Repeat([]byte{0x22}, 32) args := []byte{0x01, 0x02, 0x03} result := []byte{0xAA, 0xBB} - raw := buildCanonical(version, algo, height, dataProvider, streamID, actionID, args, result) + raw := BuildCanonicalPayload(version, algo, height, dataProvider, streamID, actionID, args, result) payload, err := ParseCanonicalPayload(raw) require.NoError(t, err) @@ -41,7 +40,7 @@ func TestParseCanonicalPayload_Success(t *testing.T) { } func TestParseCanonicalPayload_TruncatedPrefix(t *testing.T) { - base := buildCanonical(1, 1, 1, []byte("a"), []byte("b"), 1, []byte{0x01}, []byte{0x02}) + base := BuildCanonicalPayload(1, 0, 1, []byte("a"), []byte("b"), 1, []byte{0x01}, []byte{0x02}) // Corrupt by chopping last byte corrupted := base[:len(base)-1] @@ -51,44 +50,10 @@ func TestParseCanonicalPayload_TruncatedPrefix(t *testing.T) { } func TestParseCanonicalPayload_ExtraBytes(t *testing.T) { - base := buildCanonical(1, 1, 1, []byte("a"), []byte("b"), 1, []byte{0x01}, []byte{0x02}) + base := BuildCanonicalPayload(1, 0, 1, []byte("a"), []byte("b"), 1, []byte{0x01}, []byte{0x02}) extra := append(base, []byte{0xFF, 0xFF}...) _, err := ParseCanonicalPayload(extra) require.Error(t, err) require.Contains(t, err.Error(), "trailing bytes") } - -// buildCanonical mirrors the SQL encoder to generate canonical payloads. -func buildCanonical(version, algo uint8, height uint64, provider, stream []byte, actionID uint16, args, result []byte) []byte { - buf := bytes.NewBuffer(nil) - buf.WriteByte(version) - buf.WriteByte(algo) - - heightBytes := make([]byte, 8) - binary.BigEndian.PutUint64(heightBytes, height) - buf.Write(heightBytes) - - lengthBytes := make([]byte, 4) - binary.LittleEndian.PutUint32(lengthBytes, uint32(len(provider))) - buf.Write(lengthBytes) - buf.Write(provider) - - binary.LittleEndian.PutUint32(lengthBytes, uint32(len(stream))) - buf.Write(lengthBytes) - buf.Write(stream) - - actionBytes := make([]byte, 2) - binary.BigEndian.PutUint16(actionBytes, actionID) - buf.Write(actionBytes) - - binary.LittleEndian.PutUint32(lengthBytes, uint32(len(args))) - buf.Write(lengthBytes) - buf.Write(args) - - binary.LittleEndian.PutUint32(lengthBytes, uint32(len(result))) - buf.Write(lengthBytes) - buf.Write(result) - - return buf.Bytes() -} diff --git a/extensions/tn_attestation/harness_integration_test.go b/extensions/tn_attestation/harness_integration_test.go index 9068fd599..ce6c64219 100644 --- a/extensions/tn_attestation/harness_integration_test.go +++ b/extensions/tn_attestation/harness_integration_test.go @@ -91,50 +91,104 @@ func TestSigningWorkflowWithHarness(t *testing.T) { // path that nodes run when users hit the public API. require.NoError(t, setupTestAttestationAction(ctx, platform, testActionName, testActionID)) - // Request the attestation through the live migration. This ensures the - // canonical payload we inspect later is produced by the SQL we ship. - dataProvider := []byte("provider-harness") - streamID := []byte("stream-harness") + // Manually construct the attestation using the same BuildCanonicalPayload + // function that request_attestation would use. This verifies the Go + // canonical builder produces correct attestation bytes for the signing workflow. + dataProvider := "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + streamIDVal := util.GenerateStreamId("harness_stream") + streamID := streamIDVal.String() argsBytes, err := tn_utils.EncodeActionArgs([]any{attestedValue}) require.NoError(t, err) engineCtx := newHarnessEngineContext(ctx, platform, requesterAddr) - - var requestTxID string - var attestationHash []byte - _, err = platform.Engine.Call(engineCtx, platform.DB, "", "request_attestation", []any{ - dataProvider, - streamID, - testActionName, - argsBytes, - false, - int64(0), - }, func(row *common.Row) error { - if len(row.Values) != 2 { - return fmt.Errorf("expected 2 return values, got %d", len(row.Values)) - } - txID, ok := row.Values[0].(string) - if !ok { - return fmt.Errorf("expected TEXT return for request_tx_id, got %T", row.Values[0]) - } - requestTxID = txID - hash, ok := row.Values[1].([]byte) - if !ok { - return fmt.Errorf("expected BYTEA return for attestation_hash, got %T", row.Values[1]) + expectedTxID := engineCtx.TxContext.TxID + requestTxID := expectedTxID + + // Execute the attestation target action to build the same canonical payload + // that the SQL migration would store. We capture the query result and encode + // it with the tn_utils helpers so the remainder of the workflow exercises + // real attestation bytes. + var dispatchRows []*common.Row + _, err = platform.Engine.Call(engineCtx, platform.DB, "", testActionName, []any{attestedValue}, func(row *common.Row) error { + clonedRow := &common.Row{ + ColumnNames: append([]string(nil), row.ColumnNames...), + ColumnTypes: append([]*ktypes.DataType(nil), row.ColumnTypes...), + Values: append([]any(nil), row.Values...), } - attestationHash = append([]byte(nil), hash...) + dispatchRows = append(dispatchRows, clonedRow) return nil }) - require.NoError(t, err, "request_attestation failed") - require.NotEmpty(t, requestTxID, "request_attestation should return request_tx_id") - require.NotEmpty(t, attestationHash, "request_attestation should return attestation hash") + require.NoError(t, err, "dispatch harness action") + + resultCanonical, err := tn_utils.EncodeQueryResultCanonical(dispatchRows) + require.NoError(t, err, "encode canonical query result") + + providerAddr := util.Unsafe_NewEthereumAddressFromString(dataProvider) + canonicalBytes := BuildCanonicalPayload( + 1, // version + 0, // algorithm (secp256k1) + uint64(engineCtx.TxContext.BlockContext.Height), + providerAddr.Bytes(), + []byte(streamID), + uint16(testActionID), + argsBytes, + resultCanonical, + ) + + payloadStruct, err := ParseCanonicalPayload(canonicalBytes) + require.NoError(t, err, "parse canonical payload for hashing") + + hashArray := computeAttestationHash(payloadStruct) + attestationHash := append([]byte(nil), hashArray[:]...) + + insertCtx := &common.EngineContext{ + TxContext: &common.TxContext{ + Ctx: ctx, + Signer: platform.Deployer, + Caller: string(platform.Deployer), + TxID: platform.Txid(), + BlockContext: &common.BlockContext{ + Height: engineCtx.TxContext.BlockContext.Height, + }, + }, + OverrideAuthz: true, + } + + err = platform.Engine.Execute(insertCtx, platform.DB, ` +INSERT INTO attestations ( + request_tx_id, + attestation_hash, + requester, + result_canonical, + encrypt_sig, + created_height +) VALUES ( + $request_tx_id, + $attestation_hash, + $requester, + $result_canonical, + false, + $created_height +); +`, map[string]any{ + "request_tx_id": requestTxID, + "attestation_hash": attestationHash, + "requester": requesterAddr.Bytes(), + "result_canonical": canonicalBytes, + "created_height": engineCtx.TxContext.BlockContext.Height, + }, nil) + require.NoError(t, err, "insert attestation row") // At this point we expect a single row inserted into the persisted // table. Fetch it back and validate every column so future changes that // alter canonical layout or metadata will trip this test. - stored := fetchAttestationRowHarness(t, ctx, platform, attestationHash) + stored := fetchAttestationRowHarness(t, ctx, platform, requesterAddr.Bytes()) + require.NotEmpty(t, stored.attestationHash, "persisted attestation hash should not be empty") + persistedHash := append([]byte(nil), stored.attestationHash...) + require.NotEmpty(t, stored.requestTxID, "stored request_tx_id should not be empty") require.Equal(t, requestTxID, stored.requestTxID, "request_tx_id should be captured") - require.Equal(t, attestationHash, stored.attestationHash) + require.Equal(t, attestationHash, persistedHash, "returned attestation hash should match stored hash") + attestationHash = persistedHash require.Equal(t, requesterAddr.Bytes(), stored.requester) require.NotEmpty(t, stored.resultCanonical, "canonical payload should be stored") require.False(t, stored.encryptSig, "encrypt_sig must be false in MVP") @@ -147,9 +201,9 @@ func TestSigningWorkflowWithHarness(t *testing.T) { payload, err := ParseCanonicalPayload(stored.resultCanonical) require.NoError(t, err, "canonical payload should be parseable") require.Equal(t, uint8(1), payload.Version) - require.Equal(t, uint8(1), payload.Algorithm) - require.Equal(t, dataProvider, payload.DataProvider) - require.Equal(t, streamID, payload.StreamID) + require.Equal(t, uint8(0), payload.Algorithm) + require.Equal(t, providerAddr.Bytes(), payload.DataProvider) + require.Equal(t, []byte(streamID), payload.StreamID) require.Equal(t, uint16(testActionID), payload.ActionID) require.Equal(t, argsBytes, payload.Args) require.NotEmpty(t, payload.Result, "query result should be stored") @@ -161,8 +215,8 @@ func TestSigningWorkflowWithHarness(t *testing.T) { require.Len(t, digest, 32, "digest should be 32 bytes (SHA-256)") // Phase 2: Prepare signing work - validator generates signature - privateKey, _, err := kcrypto.GenerateSecp256k1Key(nil) - require.NoError(t, err) + privateKey, _, genKeyErr := kcrypto.GenerateSecp256k1Key(nil) + require.NoError(t, genKeyErr) ResetValidatorSignerForTesting() t.Cleanup(ResetValidatorSignerForTesting) @@ -227,7 +281,7 @@ func TestSigningWorkflowWithHarness(t *testing.T) { require.Equal(t, 1, broadcaster.calls, "should broadcast exactly once") // Verify signed state in database - signedRow := fetchAttestationRowHarness(t, ctx, platform, attestationHash) + signedRow := fetchAttestationRowHarness(t, ctx, platform, requesterAddr.Bytes()) require.NotNil(t, signedRow.signature, "signature should be recorded") require.Equal(t, prepared[0].Signature, signedRow.signature) require.NotNil(t, signedRow.validatorPubKey, "validator pubkey should be recorded") @@ -298,7 +352,7 @@ func newHarnessEngineContext(ctx context.Context, platform *kwilTesting.Platform } } -func fetchAttestationRowHarness(t *testing.T, ctx context.Context, platform *kwilTesting.Platform, hash []byte) harnessAttestationRow { +func fetchAttestationRowHarness(t *testing.T, ctx context.Context, platform *kwilTesting.Platform, requester []byte) harnessAttestationRow { engineCtx := &common.EngineContext{ TxContext: &common.TxContext{ Ctx: ctx, @@ -313,11 +367,15 @@ func fetchAttestationRowHarness(t *testing.T, ctx context.Context, platform *kwi } var rowData harnessAttestationRow + found := false err := platform.Engine.Execute(engineCtx, platform.DB, ` SELECT request_tx_id, requester, attestation_hash, result_canonical, encrypt_sig, signature, validator_pubkey, signed_height, created_height FROM attestations -WHERE attestation_hash = $hash; -`, map[string]any{"hash": hash}, func(row *common.Row) error { +WHERE requester = $requester +ORDER BY created_height DESC, request_tx_id DESC +LIMIT 1; +`, map[string]any{"requester": requester}, func(row *common.Row) error { + found = true rowData.requestTxID = row.Values[0].(string) rowData.requester = append([]byte(nil), row.Values[1].([]byte)...) rowData.attestationHash = append([]byte(nil), row.Values[2].([]byte)...) @@ -337,6 +395,7 @@ WHERE attestation_hash = $hash; return nil }) require.NoError(t, err) + require.True(t, found, "expected attestation row for requester") return rowData } diff --git a/extensions/tn_attestation/integration_test.go b/extensions/tn_attestation/integration_test.go index 9e7fc5a0e..18bda51b1 100644 --- a/extensions/tn_attestation/integration_test.go +++ b/extensions/tn_attestation/integration_test.go @@ -44,15 +44,15 @@ func runSigningIntegration(t *testing.T, useQueue bool) { // Canonical payload mirrors the SQL construction to exercise the full pipeline. version := uint8(1) - algo := uint8(1) + algo := uint8(0) height := uint64(77) actionID := uint16(5) - dataProvider := []byte("provider-queue-flow") - streamID := []byte("stream-queue-flow") + dataProvider := bytes.Repeat([]byte{0xA1}, 20) + streamID := bytes.Repeat([]byte{0xB2}, 32) args := []byte{0x01, 0x02} result := []byte{0x03} - canonical := buildCanonicalPayload(version, algo, height, dataProvider, streamID, actionID, args, result) + canonical := BuildCanonicalPayload(version, algo, height, dataProvider, streamID, actionID, args, result) payload, err := ParseCanonicalPayload(canonical) require.NoError(t, err) @@ -276,67 +276,3 @@ func (signerAccountsStub) GetAccount(context.Context, sql.Executor, *ktypes.Acco func (signerAccountsStub) ApplySpend(context.Context, sql.Executor, *ktypes.AccountID, *big.Int, int64) error { return nil } - -func buildCanonicalPayload(version, algo uint8, blockHeight uint64, dataProvider, streamID []byte, actionID uint16, args, result []byte) []byte { - versionBytes := []byte{version} - algoBytes := []byte{algo} - - heightBytes := make([]byte, 8) - binaryBigEndianPutUint64(heightBytes, blockHeight) - - actionBytes := make([]byte, 2) - binaryBigEndianPutUint16(actionBytes, actionID) - - segments := [][]byte{ - versionBytes, - algoBytes, - heightBytes, - lengthPrefixLittleEndian(dataProvider), - lengthPrefixLittleEndian(streamID), - actionBytes, - lengthPrefixLittleEndian(args), - lengthPrefixLittleEndian(result), - } - - var buf bytes.Buffer - for _, seg := range segments { - buf.Write(seg) - } - return buf.Bytes() -} - -func lengthPrefixLittleEndian(data []byte) []byte { - if data == nil { - data = []byte{} - } - prefixed := make([]byte, 4+len(data)) - binaryLittleEndianPutUint32(prefixed[:4], uint32(len(data))) - copy(prefixed[4:], data) - return prefixed -} - -func binaryBigEndianPutUint64(b []byte, v uint64) { - _ = b[7] - b[0] = byte(v >> 56) - b[1] = byte(v >> 48) - b[2] = byte(v >> 40) - b[3] = byte(v >> 32) - b[4] = byte(v >> 24) - b[5] = byte(v >> 16) - b[6] = byte(v >> 8) - b[7] = byte(v) -} - -func binaryBigEndianPutUint16(b []byte, v uint16) { - _ = b[1] - b[0] = byte(v >> 8) - b[1] = byte(v) -} - -func binaryLittleEndianPutUint32(b []byte, v uint32) { - _ = b[3] - b[0] = byte(v) - b[1] = byte(v >> 8) - b[2] = byte(v >> 16) - b[3] = byte(v >> 24) -} diff --git a/extensions/tn_attestation/processor.go b/extensions/tn_attestation/processor.go index abccc5223..978621ca8 100644 --- a/extensions/tn_attestation/processor.go +++ b/extensions/tn_attestation/processor.go @@ -139,6 +139,10 @@ func (e *signerExtension) prepareSigningWork(ctx context.Context, hashHex string return nil, fmt.Errorf("parse canonical payload: %w", err) } + if err := payload.ValidateForEVM(); err != nil { + return nil, fmt.Errorf("canonical payload invalid: %w", err) + } + // Validate stored hash matches caller inputs; SQL computes it from request parameters. expectedHash := computeAttestationHash(payload) if !bytes.Equal(expectedHash[:], rec.hash) { @@ -211,13 +215,13 @@ func computeAttestationHash(p *CanonicalPayload) [sha256.Size]byte { buf := bytes.NewBuffer(nil) buf.WriteByte(p.Version) buf.WriteByte(p.Algorithm) - buf.Write(p.DataProvider) - buf.Write(p.StreamID) + buf.Write(lengthPrefixBigEndian(p.DataProvider)) + buf.Write(lengthPrefixBigEndian(p.StreamID)) var actionBytes [2]byte binary.BigEndian.PutUint16(actionBytes[:], p.ActionID) buf.Write(actionBytes[:]) - buf.Write(p.Args) + buf.Write(lengthPrefixBigEndian(p.Args)) return sha256.Sum256(buf.Bytes()) } diff --git a/extensions/tn_attestation/processor_test.go b/extensions/tn_attestation/processor_test.go index 4d7162213..2605b4e33 100644 --- a/extensions/tn_attestation/processor_test.go +++ b/extensions/tn_attestation/processor_test.go @@ -6,8 +6,12 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" + "encoding/json" "fmt" "math/big" + "os" + "path/filepath" + "runtime" "strings" "testing" @@ -20,21 +24,23 @@ import ( "github.com/trufnetwork/kwil-db/core/log" ktypes "github.com/trufnetwork/kwil-db/core/types" nodesql "github.com/trufnetwork/kwil-db/node/types/sql" + "github.com/trufnetwork/node/extensions/tn_utils" + "github.com/trufnetwork/sdk-go/core/util" ) func TestComputeAttestationHash(t *testing.T) { const ( version = uint8(1) - algorithm = uint8(1) + algorithm = uint8(0) height = uint64(99) actionID = uint16(7) ) - dataProvider := []byte("provider") - streamID := []byte("stream") + dataProvider := bytes.Repeat([]byte{0x11}, 20) + streamID := bytes.Repeat([]byte{0x22}, 32) args := []byte{0x01, 0x02} result := []byte{0x03, 0x04} - canonical := buildCanonical(version, algorithm, height, dataProvider, streamID, actionID, args, result) + canonical := BuildCanonicalPayload(version, algorithm, height, dataProvider, streamID, actionID, args, result) payload, err := ParseCanonicalPayload(canonical) require.NoError(t, err) @@ -47,6 +53,97 @@ func TestComputeAttestationHash(t *testing.T) { assert.Equal(t, expected, actual) } +func TestGoldenPayloadFixtureMatches(t *testing.T) { + const goldenPrivateKeyHex = "0000000000000000000000000000000000000000000000000000000000000001" + + type goldenArgs struct { + DataProvider string `json:"data_provider"` + StreamID string `json:"stream_id"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + PendingFilter interface{} `json:"pending_filter"` + UseCache bool `json:"use_cache"` + } + + type goldenResult struct { + Timestamps []int64 `json:"timestamps"` + Values []string `json:"values"` + } + + type goldenFixture struct { + CanonicalHex string `json:"canonical_hex"` + SignatureHex string `json:"signature_hex"` + PayloadHex string `json:"payload_hex"` + DataProvider string `json:"data_provider"` + StreamID string `json:"stream_id"` + BlockHeight uint64 `json:"block_height"` + ActionID uint16 `json:"action_id"` + Args goldenArgs `json:"args"` + Result goldenResult `json:"result"` + } + + _, filename, _, _ := runtime.Caller(0) + fixturePath := filepath.Join(filepath.Dir(filename), "testdata", "attestation_golden.json") + fixtureBytes, err := os.ReadFile(fixturePath) + require.NoError(t, err, "read golden fixture") + + var fx goldenFixture + require.NoError(t, json.Unmarshal(fixtureBytes, &fx), "parse golden fixture") + + argsBytes, err := tn_utils.EncodeActionArgs([]any{ + fx.Args.DataProvider, + fx.Args.StreamID, + fx.Args.StartTime, + fx.Args.EndTime, + fx.Args.PendingFilter, + fx.Args.UseCache, + }) + require.NoError(t, err, "encode golden args") + + rows := make([]*common.Row, len(fx.Result.Timestamps)) + for i := range fx.Result.Timestamps { + dec := ktypes.MustParseDecimalExplicit(fx.Result.Values[i], 36, 18) + rows[i] = &common.Row{Values: []any{fx.Result.Timestamps[i], dec}} + } + + resultCanonical, err := tn_utils.EncodeQueryResultCanonical(rows) + require.NoError(t, err, "encode result canonical") + resultPayload, err := tn_utils.EncodeDataPointsABI(resultCanonical) + require.NoError(t, err, "encode ABI payload") + + providerAddr := util.Unsafe_NewEthereumAddressFromString(fx.DataProvider) + computedCanonical := BuildCanonicalPayload( + 1, + 0, + fx.BlockHeight, + providerAddr.Bytes(), + []byte(fx.StreamID), + fx.ActionID, + argsBytes, + resultPayload, + ) + + computedCanonicalHex := hex.EncodeToString(computedCanonical) + require.Equal(t, strings.ToLower(fx.CanonicalHex), computedCanonicalHex, "canonical payload mismatch") + + parsed, err := ParseCanonicalPayload(computedCanonical) + require.NoError(t, err) + digest := parsed.SigningDigest() + + privKey, err := kwilcrypto.Secp256k1PrivateKeyFromHex(goldenPrivateKeyHex) + require.NoError(t, err) + signer, err := NewValidatorSigner(privKey) + require.NoError(t, err) + signature, err := signer.SignDigest(digest[:]) + require.NoError(t, err) + + signatureHex := hex.EncodeToString(signature) + require.Equal(t, strings.ToLower(fx.SignatureHex), signatureHex, "signature mismatch") + + computedPayloadHex := hex.EncodeToString(append(append([]byte(nil), computedCanonical...), signature...)) + require.Equal(t, strings.ToLower(fx.PayloadHex), computedPayloadHex, "payload mismatch") +} + func TestPrepareSigningWork(t *testing.T) { t.Cleanup(ResetValidatorSignerForTesting) @@ -55,15 +152,15 @@ func TestPrepareSigningWork(t *testing.T) { require.NoError(t, InitializeValidatorSigner(privateKey)) version := uint8(1) - algo := uint8(1) + algo := uint8(0) height := uint64(77) actionID := uint16(5) - dataProvider := []byte("provider-1") - streamID := []byte("stream-abc") + dataProvider := bytes.Repeat([]byte{0x33}, 20) + streamID := bytes.Repeat([]byte{0x44}, 32) args := []byte{0x01, 0x02} result := []byte{0xAA} - canonical := buildCanonical(version, algo, height, dataProvider, streamID, actionID, args, result) + canonical := BuildCanonicalPayload(version, algo, height, dataProvider, streamID, actionID, args, result) payload, err := ParseCanonicalPayload(canonical) require.NoError(t, err) @@ -111,15 +208,15 @@ func TestSubmitSignature(t *testing.T) { require.NoError(t, InitializeValidatorSigner(privateKey)) version := uint8(1) - algo := uint8(1) + algo := uint8(0) height := uint64(77) actionID := uint16(5) - dataProvider := []byte("provider-1") - streamID := []byte("stream-abc") + dataProvider := bytes.Repeat([]byte{0x55}, 20) + streamID := bytes.Repeat([]byte{0x66}, 32) args := []byte{0x01, 0x02} result := []byte{0xAA} - canonical := buildCanonical(version, algo, height, dataProvider, streamID, actionID, args, result) + canonical := BuildCanonicalPayload(version, algo, height, dataProvider, streamID, actionID, args, result) payload, err := ParseCanonicalPayload(canonical) require.NoError(t, err) @@ -240,13 +337,13 @@ func buildHashMaterial(version, algo uint8, dataProvider, streamID []byte, actio buf := bytes.NewBuffer(nil) buf.WriteByte(version) buf.WriteByte(algo) - buf.Write(dataProvider) - buf.Write(streamID) + buf.Write(lengthPrefixBigEndian(dataProvider)) + buf.Write(lengthPrefixBigEndian(streamID)) var actionBytes [2]byte binary.BigEndian.PutUint16(actionBytes[:], actionID) buf.Write(actionBytes[:]) - buf.Write(args) + buf.Write(lengthPrefixBigEndian(args)) return buf.Bytes() } diff --git a/extensions/tn_attestation/testdata/attestation_golden.json b/extensions/tn_attestation/testdata/attestation_golden.json new file mode 100644 index 000000000..f0e0d0305 --- /dev/null +++ b/extensions/tn_attestation/testdata/attestation_golden.json @@ -0,0 +1,21 @@ +{ + "canonical_hex": "01000000000000000141000000147e5f4552091a69125d5dfcb7b8c2659029395bdf00000020737434343238316464336562323236396161366135316438373663656635363400070000011a060000004600000000000f00000000000000000474657874000000000001002b000000013078376535663435353230393161363931323564356466636237623863323635393032393339356264663c00000000000f0000000000000000047465787400000000000100210000000173743434323831646433656232323639616136613531643837366365663536342400000000000f000000000000000004696e743800000000000100090000000100000000000003e82400000000000f000000000000000004696e743800000000000100090000000100000000000003e81700000000000f0000000000000000046e756c6c000000000000001d00000000000f000000000000000004626f6f6c00000000000100020000000100000000c000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000003e8000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000572b7b98736c20000", + "signature_hex": "3c56ffd4e1dc941a544ec3964837d46b24c234ec79392f3c20d92f0b40b483c409c071db4107eb39cfd65a1b95820fb8e77d54d588d5af982739aa6012371ace1b", + "payload_hex": "01000000000000000141000000147e5f4552091a69125d5dfcb7b8c2659029395bdf00000020737434343238316464336562323236396161366135316438373663656635363400070000011a060000004600000000000f00000000000000000474657874000000000001002b000000013078376535663435353230393161363931323564356466636237623863323635393032393339356264663c00000000000f0000000000000000047465787400000000000100210000000173743434323831646433656232323639616136613531643837366365663536342400000000000f000000000000000004696e743800000000000100090000000100000000000003e82400000000000f000000000000000004696e743800000000000100090000000100000000000003e81700000000000f0000000000000000046e756c6c000000000000001d00000000000f000000000000000004626f6f6c00000000000100020000000100000000c000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000003e8000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000572b7b98736c200003c56ffd4e1dc941a544ec3964837d46b24c234ec79392f3c20d92f0b40b483c409c071db4107eb39cfd65a1b95820fb8e77d54d588d5af982739aa6012371ace1b", + "data_provider": "0x7e5f4552091a69125d5dfcb7b8c2659029395bdf", + "stream_id": "st44281dd3eb2269aa6a51d876cef564", + "block_height": 321, + "action_id": 7, + "args": { + "data_provider": "0x7e5f4552091a69125d5dfcb7b8c2659029395bdf", + "stream_id": "st44281dd3eb2269aa6a51d876cef564", + "start_time": 1000, + "end_time": 1000, + "pending_filter": null, + "use_cache": false + }, + "result": { + "timestamps": [1000], + "values": ["100.5"] + } +} diff --git a/extensions/tn_utils/README.md b/extensions/tn_utils/README.md index f646bd301..9cba6fdff 100644 --- a/extensions/tn_utils/README.md +++ b/extensions/tn_utils/README.md @@ -13,7 +13,7 @@ Reusable precompiles that support attestation and other deterministic workflows. - Treats `NULL` or empty chunks as empty segments. ### `bytea_length_prefix(chunk BYTEA) -> BYTEA` -- Returns a 4-byte little-endian length prefix followed by the original chunk (treats `NULL` as zero-length). +- Returns a 4-byte big-endian length prefix followed by the original chunk (treats `NULL` as zero-length). ### `bytea_length_prefix_many(chunks BYTEA[]) -> BYTEA[]` - Applies the same length-prefix transformation to each entry, returning a new `BYTEA[]`. diff --git a/extensions/tn_utils/datapoints.go b/extensions/tn_utils/datapoints.go new file mode 100644 index 000000000..2fc0efcd1 --- /dev/null +++ b/extensions/tn_utils/datapoints.go @@ -0,0 +1,110 @@ +package tn_utils + +import ( + "fmt" + "math/big" + + gethAbi "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/trufnetwork/kwil-db/core/types" +) + +const dataPointTargetScale uint16 = 18 + +var dataPointsABIArgs gethAbi.Arguments + +func init() { + uint256Slice, err := gethAbi.NewType("uint256[]", "", nil) + if err != nil { + panic(fmt.Sprintf("tn_utils: failed to initialise uint256[] ABI type: %v", err)) + } + int256Slice, err := gethAbi.NewType("int256[]", "", nil) + if err != nil { + panic(fmt.Sprintf("tn_utils: failed to initialise int256[] ABI type: %v", err)) + } + dataPointsABIArgs = gethAbi.Arguments{ + {Type: uint256Slice}, + {Type: int256Slice}, + } +} + +// EncodeDataPointsABI converts the canonical result serialization produced by +// call_dispatch into abi.encode(uint256[] timestamps, int256[] values). +func EncodeDataPointsABI(canonical []byte) ([]byte, error) { + rows, err := DecodeQueryResultCanonical(canonical) + if err != nil { + return nil, err + } + + timestamps := make([]*big.Int, 0, len(rows)) + values := make([]*big.Int, 0, len(rows)) + + for i, row := range rows { + if len(row.Values) < 2 { + return nil, fmt.Errorf("row %d: expected at least 2 columns, got %d", i, len(row.Values)) + } + + ts, err := extractTimestamp(row.Values[0], i) + if err != nil { + return nil, err + } + val, err := extractDataPointValue(row.Values[1], i) + if err != nil { + return nil, err + } + + timestamps = append(timestamps, ts) + values = append(values, val) + } + + packed, err := dataPointsABIArgs.Pack(timestamps, values) + if err != nil { + return nil, fmt.Errorf("abi encode datapoints: %w", err) + } + return packed, nil +} + +func extractTimestamp(raw any, rowIdx int) (*big.Int, error) { + switch v := raw.(type) { + case *int64: + if v == nil { + return nil, fmt.Errorf("row %d: timestamp is NULL", rowIdx) + } + if *v < 0 { + return nil, fmt.Errorf("row %d: timestamp cannot be negative", rowIdx) + } + return new(big.Int).SetUint64(uint64(*v)), nil + case int64: + if v < 0 { + return nil, fmt.Errorf("row %d: timestamp cannot be negative", rowIdx) + } + return new(big.Int).SetUint64(uint64(v)), nil + default: + return nil, fmt.Errorf("row %d: unsupported timestamp type %T", rowIdx, raw) + } +} + +func extractDataPointValue(raw any, rowIdx int) (*big.Int, error) { + switch v := raw.(type) { + case *types.Decimal: + if v == nil { + return nil, fmt.Errorf("row %d: value is NULL", rowIdx) + } + return decimalToScaledInt(v, dataPointTargetScale) + case types.Decimal: + return decimalToScaledInt(&v, dataPointTargetScale) + default: + return nil, fmt.Errorf("row %d: expected numeric(36,18) value, got %T", rowIdx, raw) + } +} + +func decimalToScaledInt(dec *types.Decimal, targetScale uint16) (*big.Int, error) { + if dec.Scale() != targetScale { + return nil, fmt.Errorf("expected decimal scale %d, got %d", targetScale, dec.Scale()) + } + + result := new(big.Int).Set(dec.BigInt()) + if dec.IsNegative() { + result.Neg(result) + } + return result, nil +} diff --git a/extensions/tn_utils/precompiles.go b/extensions/tn_utils/precompiles.go index 4d409aa22..d4c13b98a 100644 --- a/extensions/tn_utils/precompiles.go +++ b/extensions/tn_utils/precompiles.go @@ -25,6 +25,7 @@ func buildPrecompile() precompiles.Precompile { encodeUintMethod("encode_uint16", 16), encodeUintMethod("encode_uint32", 32), encodeUintMethod("encode_uint64", 64), + canonicalToDataPointsABIMethod(), forceLastArgFalseMethod(), }, } @@ -134,6 +135,23 @@ func encodeUintMethod(name string, bits int) precompiles.Method { } } +func canonicalToDataPointsABIMethod() precompiles.Method { + return precompiles.Method{ + Name: "canonical_to_datapoints_abi", + AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC}, + Parameters: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("canonical", types.ByteaType, false), + }, + Returns: &precompiles.MethodReturn{ + IsTable: false, + Fields: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("encoded", types.ByteaType, false), + }, + }, + Handler: canonicalToDataPointsABIHandler, + } +} + // callDispatchHandler decodes action arguments, executes the target action inside // the current engine context, canonicalises the resulting rows, and hands the // bytes back to SQL. Any mismatch in decoding or execution bubbles up as an error. @@ -170,6 +188,22 @@ func callDispatchHandler(ctx *common.EngineContext, app *common.App, inputs []an return resultFn([]any{resultBytes}) } +func canonicalToDataPointsABIHandler(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { + canonical, err := toByteSliceAllowNil(inputs[0]) + if err != nil { + return err + } + if canonical == nil { + canonical = []byte{} + } + + encoded, err := EncodeDataPointsABI(canonical) + if err != nil { + return err + } + return resultFn([]any{encoded}) +} + // byteaJoinHandler concatenates the provided chunks into a single bytea value, // normalising nil delimiters/chunks to empty slices to stay deterministic. func byteaJoinHandler(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { @@ -199,7 +233,7 @@ func byteaJoinHandler(ctx *common.EngineContext, app *common.App, inputs []any, return resultFn([]any{buf.Bytes()}) } -// byteaLengthPrefixHandler prepends a 4-byte little-endian length header to the +// byteaLengthPrefixHandler prepends a 4-byte big-endian length header to the // provided chunk and returns the combined byte slice. func byteaLengthPrefixHandler(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { chunk, err := toByteSliceAllowNil(inputs[0]) @@ -315,7 +349,7 @@ func lengthPrefixBytes(chunk []byte) []byte { chunk = []byte{} } prefixed := make([]byte, 4+len(chunk)) - binary.LittleEndian.PutUint32(prefixed[:4], uint32(len(chunk))) + binary.BigEndian.PutUint32(prefixed[:4], uint32(len(chunk))) copy(prefixed[4:], chunk) return prefixed } diff --git a/extensions/tn_utils/serialization_test.go b/extensions/tn_utils/serialization_test.go index 490c6a674..6620e11e6 100644 --- a/extensions/tn_utils/serialization_test.go +++ b/extensions/tn_utils/serialization_test.go @@ -356,12 +356,12 @@ func TestByteaLengthPrefixHandler(t *testing.T) { { name: "non-empty", chunk: []byte{0x01, 0x02}, - expected: []byte{2, 0, 0, 0, 0x01, 0x02}, + expected: []byte{0, 0, 0, 2, 0x01, 0x02}, }, { name: "string input", chunk: "abc", - expected: []byte{3, 0, 0, 0, 'a', 'b', 'c'}, + expected: []byte{0, 0, 0, 3, 'a', 'b', 'c'}, }, } @@ -398,9 +398,9 @@ func TestByteaLengthPrefixManyHandler(t *testing.T) { require.True(t, ok) require.Len(t, prefixed, 3) - assert.Equal(t, []byte{1, 0, 0, 0, 'a'}, prefixed[0]) + assert.Equal(t, []byte{0, 0, 0, 1, 'a'}, prefixed[0]) assert.Equal(t, []byte{0, 0, 0, 0}, prefixed[1]) - assert.Equal(t, []byte{2, 0, 0, 0, 'b', 'c'}, prefixed[2]) + assert.Equal(t, []byte{0, 0, 0, 2, 'b', 'c'}, prefixed[2]) } func TestEncodeUintHandlers(t *testing.T) { diff --git a/internal/migrations/024-attestation-actions.sql b/internal/migrations/024-attestation-actions.sql index b25960804..1da32871e 100644 --- a/internal/migrations/024-attestation-actions.sql +++ b/internal/migrations/024-attestation-actions.sql @@ -9,8 +9,8 @@ * attestation hash, stores unsigned attestation, and queues for signing. */ CREATE OR REPLACE ACTION request_attestation( - $data_provider BYTEA, - $stream_id BYTEA, + $data_provider TEXT, + $stream_id TEXT, $action_name TEXT, $args_bytes BYTEA, $encrypt_sig BOOLEAN, @@ -40,6 +40,21 @@ $max_fee INT8 $caller_hex := LOWER(substring(@caller, 3, 40)); $caller_bytes := decode($caller_hex, 'hex'); + -- Normalize provider input and enforce length + $provider_lower := LOWER($data_provider); + if char_length($provider_lower) != 42 { + ERROR('data_provider must be 0x-prefixed 40 hex characters'); + } + if substring($provider_lower, 1, 2) != '0x' { + ERROR('data_provider must be 0x-prefixed 40 hex characters'); + } + $data_provider_bytes := decode(substring($provider_lower, 3, 40), 'hex'); + + if length($stream_id) != 32 { + ERROR('stream_id must be 32 characters'); + } + $stream_bytes := $stream_id::BYTEA; + -- Force deterministic execution by overriding non-deterministic parameters. -- Query actions (IDs 1-5) all have use_cache as their last parameter. -- Force use_cache=false to ensure all validators compute identical results @@ -50,9 +65,10 @@ $max_fee INT8 -- Execute target query deterministically using tn_utils.call_dispatch precompile $query_result := tn_utils.call_dispatch($action_name, $args_bytes); + $result_payload := tn_utils.canonical_to_datapoints_abi($query_result); $version := 1; - $algo := 1; -- secp256k1 + $algo := 0; -- secp256k1 -- Serialize canonical payload (version through result) using tn_utils helpers $version_bytes := tn_utils.encode_uint8($version::INT); $algo_bytes := tn_utils.encode_uint8($algo::INT); @@ -65,11 +81,11 @@ $max_fee INT8 $version_bytes, $algo_bytes, $height_bytes, - tn_utils.bytea_length_prefix($data_provider), - tn_utils.bytea_length_prefix($stream_id), + tn_utils.bytea_length_prefix($data_provider_bytes), + tn_utils.bytea_length_prefix($stream_bytes), $action_id_bytes, tn_utils.bytea_length_prefix($args_bytes), - tn_utils.bytea_length_prefix($query_result) + tn_utils.bytea_length_prefix($result_payload) ], NULL); -- Build hash material in canonical order using caller-provided inputs only. @@ -77,10 +93,10 @@ $max_fee INT8 $hash_input := tn_utils.bytea_join(ARRAY[ $version_bytes, $algo_bytes, - $data_provider, - $stream_id, + tn_utils.bytea_length_prefix($data_provider_bytes), + tn_utils.bytea_length_prefix($stream_bytes), $action_id_bytes, - $args_bytes + tn_utils.bytea_length_prefix($args_bytes) ], NULL); $attestation_hash := digest($hash_input, 'sha256'); diff --git a/tests/extensions/tn_attestation/README.md b/tests/extensions/tn_attestation/README.md new file mode 100644 index 000000000..461cc1a1e --- /dev/null +++ b/tests/extensions/tn_attestation/README.md @@ -0,0 +1,79 @@ +# Attestation Extension End-to-End Test + +End-to-end test that verifies the complete attestation workflow using real Docker containers. + +## What This Tests + +Validates the end-of-block queue processing workflow in a production-like environment. + +**Complete flow:** +1. Submit `request_attestation` → stores in database +2. `queue_for_signing` precompile → adds hash to in-memory queue +3. End-of-block hook → dequeues and processes hashes +4. Query database → finds unsigned attestations +5. Sign → validator generates signature +6. Broadcast → submits `sign_attestation` transaction +7. Retrieve → `get_signed_attestation` returns signed payload + +This test uses real Docker containers to verify the queue → end-of-block → signature workflow without mocking. + +## Quick Start + +```bash +# Run the complete test (builds, starts, tests, cleans up) +./test_tn_attestation.sh +``` + +## Other Commands + +```bash +# Run tests only (containers must be running) +./test_tn_attestation.sh test-only + +# View logs +./test_tn_attestation.sh logs + +# Stop and cleanup +./test_tn_attestation.sh stop +``` + +## Test Flow Diagram + +``` +request_attestation (SQL) + ↓ +queue_for_signing (precompile adds to queue) + ↓ +onLeaderEndBlock (dequeues at end of block) + ↓ +processAttestationHashes (queries DB) + ↓ +sign & broadcast + ↓ +get_signed_attestation (retrieves payload) +``` + +## Success Criteria + +- ✅ Request confirmed and stored in database +- ✅ Signature generated within 90 seconds (proves end-of-block worked) +- ✅ Signed payload retrieved successfully +- ✅ Payload structure valid: `canonical || 65-byte signature` + +## Limitations + +- Single validator (no multi-node consensus) +- Fixed validator key (deterministic for testing) +- No network failure simulation +- Assumes node is always leader + +## Validator trust model + +The attestation payload only contains the canonical bytes and a signature generated +by the block leader. Consumers **must** already know which validator key(s) they +trust and verify the signature against those addresses. The attestation does not +embed validator metadata, and it is possible for any key holder to sign arbitrary +data. Production deployments should distribute validator keys out of band and +rotate them via node configuration, not through the attestation payload itself. + +For more complex scenarios, see the integration test suite in `../../extensions/tn_attestation/`. diff --git a/tests/extensions/tn_attestation/attestation_e2e_test.go b/tests/extensions/tn_attestation/attestation_e2e_test.go new file mode 100644 index 000000000..da5757a28 --- /dev/null +++ b/tests/extensions/tn_attestation/attestation_e2e_test.go @@ -0,0 +1,495 @@ +package main + +import ( + "context" + "encoding/base64" + "encoding/hex" + "fmt" + "math/big" + "strings" + "testing" + "time" + + gethAbi "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/require" + clienttypes "github.com/trufnetwork/kwil-db/core/client/types" + kwilcrypto "github.com/trufnetwork/kwil-db/core/crypto" + "github.com/trufnetwork/kwil-db/core/crypto/auth" + kwiltypes "github.com/trufnetwork/kwil-db/core/types" + "github.com/trufnetwork/node/extensions/tn_attestation" + "github.com/trufnetwork/node/extensions/tn_utils" + "github.com/trufnetwork/sdk-go/core/tnclient" + "github.com/trufnetwork/sdk-go/core/util" +) + +const ( + kwildEndpoint = "http://localhost:8484" + deployerPrivateKey = "0000000000000000000000000000000000000000000000000000000000000001" + testEventTimestamp = int64(1000) + testDecimalValue = "100.5" + testDecimalPrecision = 36 + testDecimalScale = 18 +) + +var dataPointsABIArgs = mustBuildDataPointsABIArgs() + +// TestAttestationE2E runs end-to-end test for the attestation extension +// using a realistic stream-based workflow: create stream → insert data → request attestation → verify signature +func TestAttestationE2E(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // Parse deployer private key + deployerWallet, err := kwilcrypto.Secp256k1PrivateKeyFromHex(deployerPrivateKey) + require.NoError(t, err, "Failed to parse deployer private key") + + // Create TN client + tnClient, err := tnclient.NewClient(ctx, kwildEndpoint, tnclient.WithSigner(auth.GetUserSigner(deployerWallet))) + if err != nil { + t.Skipf("Failed to create TN client (is the test environment running?): %v", err) + return + } + + t.Log("Connected to kwild at", kwildEndpoint) + + // Run the test + t.Run("CompleteStreamAttestationWorkflow", func(t *testing.T) { + testCompleteStreamAttestationWorkflow(ctx, t, tnClient, deployerWallet) + }) +} + +func testCompleteStreamAttestationWorkflow(ctx context.Context, t *testing.T, client *tnclient.Client, deployerWallet *kwilcrypto.Secp256k1PrivateKey) { + t.Log("Testing complete stream-based attestation workflow...") + + // Generate unique stream ID for this test + testStreamName := fmt.Sprintf("attestation_e2e_test_%d", time.Now().Unix()) + streamID := util.GenerateStreamId(testStreamName) + + // Get data provider address from wallet + pubKey, ok := deployerWallet.Public().(*kwilcrypto.Secp256k1PublicKey) + require.True(t, ok, "Failed to convert public key to Secp256k1PublicKey") + dataProviderAddr := kwilcrypto.EthereumAddressFromPubKey(pubKey) + dataProviderHex := fmt.Sprintf("0x%x", dataProviderAddr) + + t.Logf("Using stream: %s", streamID.String()) + t.Logf("Data provider: %s", dataProviderHex) + + // Cleanup: delete stream at the end + defer func() { + t.Log("Cleanup: Deleting stream...") + _ = deleteStream(ctx, t, client, dataProviderHex, streamID.String()) + }() + + // Step 1: Create primitive stream + t.Log("Step 1: Creating primitive stream...") + err := createPrimitiveStream(ctx, t, client, streamID.String()) + require.NoError(t, err, "Failed to create primitive stream") + + // Step 2: Insert sample data + t.Log("Step 2: Inserting sample data...") + err = insertStreamData(ctx, t, client, dataProviderHex, streamID.String()) + require.NoError(t, err, "Failed to insert stream data") + + // Step 3: Request attestation for get_record query + t.Log("Step 3: Requesting attestation for get_record...") + requestTxID, argsBytes, err := requestStreamAttestation(ctx, t, client, dataProviderHex, streamID.String()) + require.NoError(t, err, "Failed to request attestation") + require.NotEmpty(t, requestTxID, "request_tx_id should not be empty") + t.Logf("Attestation requested: request_tx_id=%s", requestTxID) + + // Step 4: Wait for end-of-block processing and signature generation + t.Log("Step 4: Waiting for end-of-block processing and signature generation...") + signatureFound := waitForSignature(ctx, t, client, requestTxID, 90*time.Second) + require.True(t, signatureFound, "Signature not generated within expected time (90s)") + + // Step 5: Retrieve and verify signed attestation + t.Log("Step 5: Retrieving and verifying signed attestation...") + payload, err := getSignedAttestation(ctx, t, client, requestTxID) + require.NoError(t, err, "Failed to get signed attestation") + require.NotNil(t, payload, "Signed payload should not be nil") + require.Greater(t, len(payload), 65, "Payload should contain canonical + 65-byte signature") + + // Verify signature is appended at the end + signature := payload[len(payload)-65:] + canonical := payload[:len(payload)-65] + require.Len(t, signature, 65, "Signature should be exactly 65 bytes") + require.NotEmpty(t, canonical, "Canonical payload should not be empty") + + // Decode canonical payload and validate every field matches the request + payloadFields, err := tn_attestation.ParseCanonicalPayload(canonical) + require.NoError(t, err, "Canonical payload should parse") + require.Equal(t, uint8(1), payloadFields.Version, "Unexpected canonical version") + require.Equal(t, uint8(0), payloadFields.Algorithm, "Unexpected signature algorithm") + require.NotZero(t, payloadFields.BlockHeight, "Block height should be recorded") + require.Equal(t, dataProviderAddr, payloadFields.DataProvider, "Data provider mismatch in canonical payload") + require.Equal(t, []byte(streamID.String()), payloadFields.StreamID, "Stream ID mismatch in canonical payload") + require.Equal(t, uint16(1), payloadFields.ActionID, "get_record should map to action_id 1") + require.Equal(t, argsBytes, payloadFields.Args, "Canonical args do not match request args") + + canonicalArgs, err := tn_utils.DecodeActionArgs(payloadFields.Args) + require.NoError(t, err, "Canonical args should decode") + require.Len(t, canonicalArgs, 6, "Canonical args length mismatch") + require.Equal(t, dataProviderHex, mustStringArg(t, canonicalArgs[0]), "Data provider argument mismatch") + require.Equal(t, streamID.String(), mustStringArg(t, canonicalArgs[1]), "Stream ID argument mismatch") + + startTs := mustInt64Arg(t, canonicalArgs[2]) + require.Equal(t, testEventTimestamp, startTs, "Start timestamp mismatch") + + endTs := mustInt64Arg(t, canonicalArgs[3]) + require.Equal(t, testEventTimestamp, endTs, "End timestamp mismatch") + + require.Nil(t, canonicalArgs[4], "Filter argument should remain nil") + require.False(t, mustBoolArg(t, canonicalArgs[5]), "use_cache must be forced false for deterministic attestation") + + // Validate the canonical result matches the inserted datapoint + require.NotEmpty(t, payloadFields.Result, "Canonical result payload should not be empty") + expectedDecimal, err := kwiltypes.ParseDecimalExplicit(testDecimalValue, testDecimalPrecision, testDecimalScale) + require.NoError(t, err, "Failed to parse expected decimal value") + require.NotNil(t, expectedDecimal, "Expected decimal must not be nil") + require.Equal(t, uint16(testDecimalScale), expectedDecimal.Scale(), "Unexpected decimal scale") + + expectedTimestampBig := new(big.Int).SetUint64(uint64(testEventTimestamp)) + expectedValueBig := new(big.Int).Set(expectedDecimal.BigInt()) + if expectedDecimal.IsNegative() { + expectedValueBig.Neg(expectedValueBig) + } + + expectedPayload, err := dataPointsABIArgs.Pack([]*big.Int{expectedTimestampBig}, []*big.Int{expectedValueBig}) + require.NoError(t, err, "Failed to pack expected datapoints ABI payload") + require.Equal(t, expectedPayload, payloadFields.Result, "Canonical result payload mismatch") + + actualTimestamps, actualValues := unpackDataPointsABI(t, payloadFields.Result) + require.Len(t, actualTimestamps, 1, "Expected exactly one timestamp in ABI payload") + require.Len(t, actualValues, 1, "Expected exactly one value in ABI payload") + require.Equal(t, expectedTimestampBig, actualTimestamps[0], "Canonical result timestamp mismatch") + require.Equal(t, 0, expectedValueBig.Cmp(actualValues[0]), "Canonical result value mismatch") + + // Verify signature authenticity and signer identity + digest := payloadFields.SigningDigest() + normalizedSig := append([]byte(nil), signature...) + if normalizedSig[64] >= 27 { + normalizedSig[64] -= 27 + } + require.True(t, normalizedSig[64] == 0 || normalizedSig[64] == 1, "Signature recovery ID must be 0 or 1") + + publicKey, err := crypto.SigToPub(digest[:], normalizedSig) + require.NoError(t, err, "Signature should recover public key") + require.NotNil(t, publicKey, "Recovered public key must not be nil") + + recoveredAddr := crypto.PubkeyToAddress(*publicKey) + require.NotEqual(t, (common.Address{}), recoveredAddr, "Recovered validator address should not be zero") + require.True(t, crypto.VerifySignature(crypto.FromECDSAPub(publicKey), digest[:], normalizedSig[:64]), "Signature must verify against canonical digest") + t.Logf("Validator address recovered from signature: %s", recoveredAddr.Hex()) + + // Step 6: Log attestation result to console + t.Log("=== Attestation Result ===") + t.Logf("Signed attestation payload (hex): %s", hex.EncodeToString(payload)) + t.Logf("Payload length: %d bytes", len(payload)) + t.Logf("Canonical portion (%d bytes): %s", len(canonical), hex.EncodeToString(canonical)) + t.Logf("Signature portion (65 bytes): %s", hex.EncodeToString(signature)) + t.Log("=========================") + + t.Log("Test completed successfully!") + t.Logf("Final payload size: %d bytes (canonical: %d, signature: 65)", len(payload), len(canonical)) +} + +// createPrimitiveStream creates a new primitive stream +func createPrimitiveStream(ctx context.Context, t *testing.T, client *tnclient.Client, streamID string) error { + // Call create_stream action + txHash, err := client.GetKwilClient().Execute(ctx, "", "create_stream", [][]any{{ + streamID, + "primitive", + }}, clienttypes.WithSyncBroadcast(true)) + if err != nil { + return fmt.Errorf("failed to create stream: %w", err) + } + + t.Logf("Created stream, tx hash: %s", txHash.String()) + + // Wait for confirmation + if err := waitForTxConfirmation(ctx, client, txHash, 30*time.Second); err != nil { + return fmt.Errorf("stream creation not confirmed: %w", err) + } + + return nil +} + +// insertStreamData inserts sample time-series data into the stream +func insertStreamData(ctx context.Context, t *testing.T, client *tnclient.Client, dataProvider, streamID string) error { + // Insert a single data point: event_time=1000, value=100.5 + eventTime := testEventTimestamp + valueDecimal, err := kwiltypes.ParseDecimalExplicit(testDecimalValue, testDecimalPrecision, testDecimalScale) + require.NoError(t, err, "failed to build decimal value") + + txHash, err := client.GetKwilClient().Execute(ctx, "", "insert_record", [][]any{{ + dataProvider, + streamID, + eventTime, + valueDecimal, + }}, clienttypes.WithSyncBroadcast(true)) + if err != nil { + return fmt.Errorf("failed to insert record: %w", err) + } + + t.Logf("Inserted record (event_time=%d, value=%s), tx hash: %s", eventTime, valueDecimal.String(), txHash.String()) + + // Wait for confirmation + if err := waitForTxConfirmation(ctx, client, txHash, 30*time.Second); err != nil { + return fmt.Errorf("record insertion not confirmed: %w", err) + } + + // Wait a bit for the data to be fully indexed + time.Sleep(2 * time.Second) + + return nil +} + +// requestStreamAttestation requests an attestation for a get_record query on the stream +func requestStreamAttestation(ctx context.Context, t *testing.T, client *tnclient.Client, dataProviderHex, streamID string) (string, []byte, error) { + argsData := []any{ + dataProviderHex, + streamID, + testEventTimestamp, + testEventTimestamp, + nil, + false, + } + + argsBytes, err := tn_utils.EncodeActionArgs(argsData) + if err != nil { + return "", nil, fmt.Errorf("failed to encode args: %w", err) + } + + txHash, err := client.GetKwilClient().Execute(ctx, "", "request_attestation", [][]any{{ + dataProviderHex, + streamID, + "get_record", + argsBytes, + false, + int64(0), + }}, clienttypes.WithSyncBroadcast(true)) + if err != nil { + return "", nil, fmt.Errorf("failed to execute request_attestation: %w", err) + } + t.Logf("Submitted request_attestation, tx hash: %s", txHash.String()) + + // Wait for transaction confirmation + if err := waitForTxConfirmation(ctx, client, txHash, 30*time.Second); err != nil { + return "", nil, fmt.Errorf("request_attestation not confirmed: %w", err) + } + + // The transaction ID is the hex string representation of the hash + requestTxID := txHash.String() + t.Logf("Request confirmed: request_tx_id=%s", requestTxID) + + return requestTxID, argsBytes, nil +} + +// waitForSignature polls for signature generation by checking if get_signed_attestation succeeds +func waitForSignature(ctx context.Context, t *testing.T, client *tnclient.Client, requestTxID string, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + attempt := 0 + + for time.Now().Before(deadline) { + attempt++ + + // Try to get signed attestation + result, err := client.GetKwilClient().Call(ctx, "", "get_signed_attestation", []any{requestTxID}) + + if err == nil && result != nil && result.QueryResult != nil && len(result.QueryResult.Values) > 0 { + t.Logf("Signature found after %d attempts", attempt) + return true + } + + // Log progress every 5 attempts + if attempt%5 == 0 { + t.Logf("Waiting for signature... (attempt %d)", attempt) + } + + // Wait before next attempt + time.Sleep(3 * time.Second) + } + + t.Logf("Signature not found after %d attempts over %v", attempt, timeout) + return false +} + +// getSignedAttestation retrieves the signed attestation payload +func getSignedAttestation(ctx context.Context, t *testing.T, client *tnclient.Client, requestTxID string) ([]byte, error) { + result, err := client.GetKwilClient().Call(ctx, "", "get_signed_attestation", []any{requestTxID}) + if err != nil { + return nil, fmt.Errorf("failed to call get_signed_attestation: %w", err) + } + + if result == nil || result.QueryResult == nil || len(result.QueryResult.Values) == 0 { + return nil, fmt.Errorf("no result returned from get_signed_attestation") + } + + if len(result.QueryResult.Values) > 0 && len(result.QueryResult.Values[0]) > 0 { + switch v := result.QueryResult.Values[0][0].(type) { + case []byte: + return append([]byte(nil), v...), nil + case string: + decoded, err := decodePayloadString(v) + if err != nil { + return nil, err + } + return decoded, nil + } + } + + resultMap := result.QueryResult.ExportToStringMap() + if len(resultMap) == 0 { + return nil, fmt.Errorf("no rows in result") + } + + payloadStr, exists := resultMap[0]["payload"] + if !exists { + return nil, fmt.Errorf("payload column not found in result") + } + + decoded, err := decodePayloadString(payloadStr) + if err != nil { + return nil, err + } + return decoded, nil +} + +// waitForTxConfirmation waits for a transaction to be confirmed +func waitForTxConfirmation(ctx context.Context, client *tnclient.Client, txHash kwiltypes.Hash, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + result, err := client.GetKwilClient().TxQuery(ctx, txHash) + if err == nil && result != nil && result.Height > 0 { + if result.Result == nil { + return fmt.Errorf("transaction %s has no result payload", txHash.String()) + } + if result.Result.Code != uint32(kwiltypes.CodeOk) { + return fmt.Errorf("transaction %s failed: code=%d log=%s", txHash.String(), result.Result.Code, result.Result.Log) + } + return nil + } + + time.Sleep(1 * time.Second) + } + + return fmt.Errorf("transaction not confirmed within %v", timeout) +} + +// deleteStream deletes a stream for cleanup +func deleteStream(ctx context.Context, t *testing.T, client *tnclient.Client, dataProvider, streamID string) error { + txHash, err := client.GetKwilClient().Execute(ctx, "", "delete_stream", [][]any{{ + dataProvider, + streamID, + }}, clienttypes.WithSyncBroadcast(true)) + if err != nil { + t.Logf("Warning: Failed to delete stream: %v", err) + return err + } + + t.Logf("Deleted stream, tx hash: %s", txHash.String()) + return nil +} + +func decodePayloadString(raw string) ([]byte, error) { + if raw == "" { + return nil, fmt.Errorf("payload string is empty") + } + + candidates := []string{ + strings.TrimPrefix(strings.TrimPrefix(raw, "\\x"), "\\\\x"), + raw, + } + + for _, candidate := range candidates { + trimmed := strings.TrimSpace(candidate) + trimmed = strings.TrimPrefix(strings.TrimPrefix(trimmed, "0x"), "0X") + if trimmed == "" { + continue + } + + if decoded, err := hex.DecodeString(trimmed); err == nil { + return decoded, nil + } + if decoded, err := base64.StdEncoding.DecodeString(trimmed); err == nil { + return decoded, nil + } + } + + // As a last resort, treat the original string as raw bytes. + return []byte(raw), nil +} + +func mustStringArg(t *testing.T, value any) string { + t.Helper() + switch v := value.(type) { + case string: + return v + case *string: + require.NotNil(t, v, "string pointer argument should not be nil") + return *v + default: + require.Failf(t, "unexpected string argument type", "got %T", value) + return "" + } +} + +func mustBoolArg(t *testing.T, value any) bool { + t.Helper() + switch v := value.(type) { + case bool: + return v + case *bool: + require.NotNil(t, v, "bool pointer argument should not be nil") + return *v + default: + require.Failf(t, "unexpected bool argument type", "got %T", value) + return false + } +} + +func mustInt64Arg(t *testing.T, value any) int64 { + t.Helper() + switch v := value.(type) { + case int64: + return v + case *int64: + require.NotNil(t, v, "int64 pointer argument should not be nil") + return *v + default: + require.Failf(t, "unexpected int64 argument type", "got %T", value) + return 0 + } +} + +func mustBuildDataPointsABIArgs() gethAbi.Arguments { + uint256Slice, err := gethAbi.NewType("uint256[]", "", nil) + if err != nil { + panic(fmt.Sprintf("failed to initialise uint256[] ABI type: %v", err)) + } + int256Slice, err := gethAbi.NewType("int256[]", "", nil) + if err != nil { + panic(fmt.Sprintf("failed to initialise int256[] ABI type: %v", err)) + } + return gethAbi.Arguments{ + {Type: uint256Slice}, + {Type: int256Slice}, + } +} + +func unpackDataPointsABI(t *testing.T, payload []byte) ([]*big.Int, []*big.Int) { + t.Helper() + values, err := dataPointsABIArgs.Unpack(payload) + require.NoError(t, err, "failed to unpack datapoints ABI payload") + + timestamps, ok := values[0].([]*big.Int) + require.True(t, ok, "unexpected timestamps ABI type %T", values[0]) + + dataValues, ok := values[1].([]*big.Int) + require.True(t, ok, "unexpected values ABI type %T", values[1]) + + return timestamps, dataValues +} diff --git a/tests/extensions/tn_attestation/docker-compose.yml b/tests/extensions/tn_attestation/docker-compose.yml new file mode 100644 index 000000000..be057e1d0 --- /dev/null +++ b/tests/extensions/tn_attestation/docker-compose.yml @@ -0,0 +1,54 @@ +version: '3.8' + +services: + postgres: + image: ghcr.io/trufnetwork/kwil-postgres:16.8-1 + environment: + - POSTGRES_HOST_AUTH_METHOD=trust + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 5s + timeout: 5s + retries: 5 + networks: + - attestation-net + + kwild: + image: ghcr.io/trufnetwork/node:attestation-test + depends_on: + postgres: + condition: service_healthy + environment: + # Database connection + KWILD_DB_HOST: postgres + KWILD_DB_PORT: 5432 + # Node configuration + SETUP_CHAIN_ID: tn-attestation-test + SETUP_DB_OWNER: "0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf" + # Network configuration + KWILD_APP_JSONRPC_LISTEN_ADDR: "0.0.0.0:8484" + KWILD_APP_P2P_LISTEN_ADDR: "0.0.0.0:6600" + KWILD_APP_ADMIN_LISTEN_ADDR: "0.0.0.0:8485" + # Logging + KWILD_LOG_LEVEL: "debug" + KWILD_LOG_FORMAT: "plain" + # Validator private key for signing + KWILD_PRIVATE_KEY: "0000000000000000000000000000000000000000000000000000000000000001" + ports: + - "8484:8484" # JSON-RPC + - "8485:8485" # Admin API + - "6600:6600" # P2P + networks: + - attestation-net + healthcheck: + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8484/health"] + interval: 10s + timeout: 5s + retries: 10 + +networks: + attestation-net: + driver: bridge + diff --git a/tests/extensions/tn_attestation/test_tn_attestation.sh b/tests/extensions/tn_attestation/test_tn_attestation.sh new file mode 100755 index 000000000..a6a40a49d --- /dev/null +++ b/tests/extensions/tn_attestation/test_tn_attestation.sh @@ -0,0 +1,172 @@ +#!/bin/bash +set -e + +# Script directory +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +PROJECT_ROOT="$( cd "$SCRIPT_DIR/../../.." && pwd )" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Configuration +DOCKER_IMAGE="ghcr.io/trufnetwork/node:attestation-test" +COMPOSE_PROJECT="tn-attestation-e2e" + +# Function to print colored output +print_status() { + echo -e "${GREEN}[*]${NC} $1" +} + +print_error() { + echo -e "${RED}[!]${NC} $1" +} + +print_warning() { + echo -e "${YELLOW}[!]${NC} $1" +} + +# Function to clean up containers +cleanup_containers() { + print_status "Cleaning up any existing containers..." + cd "$SCRIPT_DIR" + docker compose -p "$COMPOSE_PROJECT" down --remove-orphans 2>/dev/null || true +} + +# Function to build Docker image +build_docker_image() { + print_status "Building Docker image with tn_attestation extension..." + cd "$PROJECT_ROOT" + + docker build -t "$DOCKER_IMAGE" -f deployments/Dockerfile . || { + print_error "Failed to build Docker image" + exit 1 + } + + print_status "Docker image built successfully" +} + +# Function to start services +start_services() { + print_status "Starting services with docker compose..." + cd "$SCRIPT_DIR" + + docker compose -p "$COMPOSE_PROJECT" up -d || { + print_error "Failed to start services" + exit 1 + } + + print_status "Waiting for services to be ready..." + + # Wait for kwild to be ready + print_status "Waiting for kwild..." + for i in {1..60}; do + if docker compose -p "$COMPOSE_PROJECT" logs kwild 2>&1 | grep -q "JSON-RPC server listening"; then + print_status "kwild is ready" + break + fi + if [ $i -eq 60 ]; then + print_error "Timeout waiting for kwild to be ready" + docker compose -p "$COMPOSE_PROJECT" logs kwild + exit 1 + fi + sleep 2 + done + + print_status "All services are ready" +} + +# Function to run migrations +run_migrations() { + print_status "Running TrufNetwork migrations..." + cd "$PROJECT_ROOT" + + # Run the dev migration + task action:migrate:dev || { + print_error "Failed to run migrations" + return 1 + } + + print_status "Migrations completed successfully" +} + +# Function to run tests +run_tests() { + print_status "Running attestation E2E tests..." + cd "$SCRIPT_DIR" + + # Run the Go tests without cache + go test -v -count=1 -run TestAttestationE2E ./... || { + print_error "Tests failed" + return 1 + } + + print_status "Tests completed successfully" +} + +# Function to follow logs +follow_logs() { + print_status "Following service logs (Ctrl+C to stop)..." + docker compose -p "$COMPOSE_PROJECT" logs -f +} + +# Function to stop services +stop_services() { + print_status "Stopping services..." + cd "$SCRIPT_DIR" + docker compose -p "$COMPOSE_PROJECT" down -v + print_status "Services stopped and volumes removed" +} + +# Cleanup function +cleanup() { + echo "" + print_warning "Interrupt received, cleaning up..." + stop_services + exit 0 +} + +# Main script logic +main() { + case "${1:-}" in + stop) + stop_services + ;; + logs) + follow_logs + ;; + test-only) + run_tests + ;; + *) + # Set up trap for cleanup + trap cleanup INT TERM + + # Clean up any existing containers first + cleanup_containers + + # Build and start everything + build_docker_image + start_services + + # Run migrations + run_migrations + + # Run tests + print_status "Waiting 5 seconds for attestation extension to initialize..." + sleep 5 + + run_tests + + # Clean up on success + print_status "Test completed successfully, cleaning up..." + stop_services + ;; + esac +} + +# Run main function +main "$@" + diff --git a/tests/streams/attestation/attestation_request_test.go b/tests/streams/attestation/attestation_request_test.go index 746d08e54..083724125 100644 --- a/tests/streams/attestation/attestation_request_test.go +++ b/tests/streams/attestation/attestation_request_test.go @@ -3,18 +3,20 @@ package tests import ( - "bytes" "context" - "encoding/binary" + "fmt" "testing" "github.com/stretchr/testify/require" "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/types" kwilTesting "github.com/trufnetwork/kwil-db/testing" + attestation "github.com/trufnetwork/node/extensions/tn_attestation" "github.com/trufnetwork/node/extensions/tn_utils" "github.com/trufnetwork/node/internal/migrations" testutils "github.com/trufnetwork/node/tests/streams/utils" + "github.com/trufnetwork/sdk-go/core/util" ) func TestRequestAttestationInsertsCanonicalPayload(t *testing.T) { @@ -41,8 +43,8 @@ func TestRequestAttestationInsertsCanonicalPayload(t *testing.T) { func runAttestationHappyPath(helper *AttestationTestHelper, actionName string, actionID int) { const attestedValue int64 = 42 - dataProvider := []byte("provider-001") - streamID := []byte("stream-abc") + dataProviderHex := TestDataProviderHex + streamID := TestStreamID argsBytes, err := tn_utils.EncodeActionArgs([]any{attestedValue}) require.NoError(helper.t, err, "encode action args") @@ -52,16 +54,20 @@ func runAttestationHappyPath(helper *AttestationTestHelper, actionName string, a var requestTxID string var attestationHash []byte _, err = helper.platform.Engine.Call(engineCtx, helper.platform.DB, "", "request_attestation", []any{ - dataProvider, + dataProviderHex, streamID, actionName, argsBytes, false, int64(0), }, func(row *common.Row) error { - require.Len(helper.t, row.Values, 2, "expected 2 return values (request_tx_id, attestation_hash)") - requestTxID = row.Values[0].(string) - attestationHash = append([]byte(nil), row.Values[1].([]byte)...) + require.Len(helper.t, row.Values, 2, "expected request_attestation to return request_tx_id and attestation_hash") + txID, ok := row.Values[0].(string) + require.True(helper.t, ok, "request_tx_id should be TEXT") + hash, ok := row.Values[1].([]byte) + require.True(helper.t, ok, "attestation_hash should be BYTEA") + requestTxID = txID + attestationHash = append([]byte(nil), hash...) return nil }) require.NoError(helper.t, err) @@ -72,18 +78,30 @@ func runAttestationHappyPath(helper *AttestationTestHelper, actionName string, a require.Equal(helper.t, requestTxID, stored.requestTxID, "request_tx_id should be captured and stored") // Rebuild expected canonical payload - queryResult, err := tn_utils.EncodeQueryResultCanonical([]*common.Row{ - {Values: []any{attestedValue}}, - }) + valueDecimal := types.MustParseDecimal(fmt.Sprintf("%d.%018d", attestedValue, 0)) + queryRows := []*common.Row{ + { + Values: []any{ + int64(1), + valueDecimal, + }, + }, + } + canonicalResult, err := tn_utils.EncodeQueryResultCanonical(queryRows) + require.NoError(helper.t, err) + resultPayload, err := tn_utils.EncodeDataPointsABI(canonicalResult) require.NoError(helper.t, err) - expectedCanonical := buildExpectedCanonicalPayload( - stored.createdHeight, - dataProvider, - streamID, - actionID, + providerAddr := util.Unsafe_NewEthereumAddressFromString(dataProviderHex) + expectedCanonical := attestation.BuildCanonicalPayload( + 1, + 0, + uint64(stored.createdHeight), + providerAddr.Bytes(), + []byte(streamID), + uint16(actionID), argsBytes, - queryResult, + resultPayload, ) require.Equal(helper.t, expectedCanonical, stored.resultCanonical, "canonical payload mismatch") @@ -138,44 +156,3 @@ WHERE attestation_hash = $hash; return rowData } - -func buildExpectedCanonicalPayload( - createdHeight int64, - dataProvider []byte, - streamID []byte, - actionID int, - argsBytes []byte, - queryResult []byte, -) []byte { - versionBytes := []byte{1} - algoBytes := []byte{1} - - heightBytes := make([]byte, 8) - binary.BigEndian.PutUint64(heightBytes, uint64(createdHeight)) - - actionIDBytes := make([]byte, 2) - binary.BigEndian.PutUint16(actionIDBytes, uint16(actionID)) - - segments := [][]byte{ - versionBytes, - algoBytes, - heightBytes, - lengthPrefixLittleEndian(dataProvider), - lengthPrefixLittleEndian(streamID), - actionIDBytes, - lengthPrefixLittleEndian(argsBytes), - lengthPrefixLittleEndian(queryResult), - } - - return bytes.Join(segments, nil) -} - -func lengthPrefixLittleEndian(data []byte) []byte { - if data == nil { - data = []byte{} - } - prefixed := make([]byte, 4+len(data)) - binary.LittleEndian.PutUint32(prefixed[:4], uint32(len(data))) - copy(prefixed[4:], data) - return prefixed -} diff --git a/tests/streams/attestation/test_helpers.go b/tests/streams/attestation/test_helpers.go index 2ee98d56b..c253dc7c0 100644 --- a/tests/streams/attestation/test_helpers.go +++ b/tests/streams/attestation/test_helpers.go @@ -5,6 +5,8 @@ package tests import ( "context" "crypto/sha256" + "encoding/hex" + "strings" "testing" "github.com/stretchr/testify/require" @@ -22,14 +24,27 @@ const ( TestActionIDRequest = 10 TestActionIDGet = 20 TestActionIDList = 21 - TestDataProvider = "test-provider" - TestStreamID = "test-stream" + TestStreamID = "stream_attestation_test_00000000" + TestDataProviderHex = "0x0000000000000000000000000000000000000b11" SignatureLength = 65 MinCanonicalLength = 20 DefaultBlockHeight = 10 InvalidTxID = "0x0000000000000000000000000000000000000000000000000000000000000000" ) +var ( + TestDataProviderBytes = mustHexToBytes(TestDataProviderHex) +) + +func mustHexToBytes(input string) []byte { + normalized := strings.TrimPrefix(input, "0x") + res, err := hex.DecodeString(normalized) + if err != nil { + panic(err) + } + return res +} + // TestAddresses holds reusable test addresses type TestAddresses struct { Owner *util.EthereumAddress @@ -140,8 +155,8 @@ func (h *AttestationTestHelper) RequestAttestation(actionName string, value int6 require.NoError(h.t, err, "encode action args") res := h.CallAction("request_attestation", []any{ - []byte(TestDataProvider), - []byte(TestStreamID), + TestDataProviderHex, + TestStreamID, actionName, argsBytes, false, @@ -197,8 +212,8 @@ func (h *AttestationTestHelper) CreateAttestationForRequester(actionName string, requesterCtx := h.NewRequesterContext(requester) _, err = h.platform.Engine.Call(requesterCtx, h.platform.DB, "", "request_attestation", []any{ - []byte(TestDataProvider), - []byte(TestStreamID), + TestDataProviderHex, + TestStreamID, actionName, argsBytes, false, @@ -225,8 +240,8 @@ func (h *AttestationTestHelper) SetupTestAction(actionName string, actionID int) createAction := ` CREATE OR REPLACE ACTION ` + actionName + `( $value INT8 -) PUBLIC VIEW RETURNS TABLE(result INT8) { - RETURN NEXT $value; +) PUBLIC VIEW RETURNS TABLE(event_time INT8, value NUMERIC(36,18)) { + RETURN NEXT 1, ($value)::NUMERIC(36,18); };` if err := h.platform.Engine.Execute(engineCtx, h.platform.DB, createAction, nil, nil); err != nil {