diff --git a/extensions/register.go b/extensions/register.go index 5002ca362..0ecdfa05f 100644 --- a/extensions/register.go +++ b/extensions/register.go @@ -1,16 +1,18 @@ package extensions import ( - "github.com/trufnetwork/node/extensions/database-size" + database_size "github.com/trufnetwork/node/extensions/database-size" "github.com/trufnetwork/node/extensions/leaderwatch" "github.com/trufnetwork/node/extensions/tn_attestation" "github.com/trufnetwork/node/extensions/tn_cache" "github.com/trufnetwork/node/extensions/tn_digest" "github.com/trufnetwork/node/extensions/tn_vacuum" + "github.com/trufnetwork/node/extensions/tn_utils" ) func init() { leaderwatch.InitializeExtension() + tn_utils.InitializeExtension() tn_cache.InitializeExtension() tn_digest.InitializeExtension() tn_vacuum.InitializeExtension() diff --git a/extensions/tn_cache/syncschecker/sync_checker.go b/extensions/tn_cache/syncschecker/sync_checker.go index ac59584fe..3eb3ec8f2 100644 --- a/extensions/tn_cache/syncschecker/sync_checker.go +++ b/extensions/tn_cache/syncschecker/sync_checker.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "sync" "time" @@ -20,6 +21,20 @@ const ( healthEndpoint = "http://localhost:8484/api/v1/health" ) +// retryableHTTPLogger adapts kwil's logger to work with go-retryablehttp +// It logs HTTP requests at Debug level so they only appear when debug logging is enabled +type retryableHTTPLogger struct { + logger log.Logger +} + +// Printf implements the retryablehttp.Logger interface +// We log at Debug level since these are routine health checks +func (l *retryableHTTPLogger) Printf(format string, v ...interface{}) { + // Remove trailing newline if present, as Debugf adds its own + format = strings.TrimSuffix(format, "\n") + l.logger.Debugf(format, v...) +} + // SyncChecker monitors if the node is synced enough to perform cache operations type SyncChecker struct { logger log.Logger @@ -134,6 +149,8 @@ func (sc *SyncChecker) updateStatus(ctx context.Context) { client.RetryMax = 3 client.RetryWaitMin = 1 * time.Second client.RetryWaitMax = 5 * time.Second + // Use kwil logger adapter so HTTP logs respect the configured log level + client.Logger = &retryableHTTPLogger{logger: sc.logger} req, err := retryablehttp.NewRequest("GET", sc.endpoint, nil) if err != nil { diff --git a/extensions/tn_utils/README.md b/extensions/tn_utils/README.md new file mode 100644 index 000000000..f646bd301 --- /dev/null +++ b/extensions/tn_utils/README.md @@ -0,0 +1,52 @@ +# tn_utils Extension + +Reusable precompiles that support attestation and other deterministic workflows. + +## Methods + +### `call_dispatch(action_name TEXT, args_bytes BYTEA) -> BYTEA` +- Dispatches to another action while preserving the current engine context. +- Returns the canonical, deterministic encoding of the target action's result rows. + +### `bytea_join(chunks BYTEA[], delimiter BYTEA) -> BYTEA` +- Concatenates an array of `BYTEA` chunks, inserting the provided delimiter between entries. +- Treats `NULL` or empty chunks as empty segments. + +### `bytea_length_prefix(chunk BYTEA) -> BYTEA` +- Returns a 4-byte little-endian length prefix followed by the original chunk (treats `NULL` as zero-length). + +### `bytea_length_prefix_many(chunks BYTEA[]) -> BYTEA[]` +- Applies the same length-prefix transformation to each entry, returning a new `BYTEA[]`. + +### `encode_uint8(value INT) -> BYTEA` +### `encode_uint16(value INT) -> BYTEA` +### `encode_uint32(value INT) -> BYTEA` +### `encode_uint64(value INT) -> BYTEA` +- Encode unsigned integers to big-endian byte arrays with the specified width. + +## Usage + +```sql +USE tn_utils AS util; + +-- Call another action +$result_bytes := util.call_dispatch('get_record', $args_bytes); + +-- Join canonical payload chunks +$payload := util.bytea_join(util.bytea_length_prefix_many(ARRAY[ + $version_bytes, + $algo_bytes, + $result_bytes +]), E''); +``` + +The Go package lives at `github.com/trufnetwork/node/extensions/tn_utils`. +Import it to register the utilities precompile bundle on node startup: + +```go +import "github.com/trufnetwork/node/extensions/tn_utils" + +func init() { + tn_utils.InitializeExtension() +} +``` diff --git a/extensions/tn_utils/extension.go b/extensions/tn_utils/extension.go new file mode 100644 index 000000000..8a635e472 --- /dev/null +++ b/extensions/tn_utils/extension.go @@ -0,0 +1,24 @@ +package tn_utils + +import ( + "context" + "fmt" + + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/extensions/precompiles" + sql "github.com/trufnetwork/kwil-db/node/types/sql" +) + +// ExtensionName identifies this utilities precompile bundle. +const ExtensionName = "tn_utils" + +// InitializeExtension registers the utilities precompile bundle. +func InitializeExtension() { + if err := precompiles.RegisterInitializer(ExtensionName, initializePrecompile); err != nil { + panic(fmt.Sprintf("failed to register %s initializer: %v", ExtensionName, err)) + } +} + +func initializePrecompile(ctx context.Context, service *common.Service, db sql.DB, alias string, metadata map[string]any) (precompiles.Precompile, error) { + return buildPrecompile(), nil +} diff --git a/extensions/tn_utils/precompiles.go b/extensions/tn_utils/precompiles.go new file mode 100644 index 000000000..e33ec24ce --- /dev/null +++ b/extensions/tn_utils/precompiles.go @@ -0,0 +1,344 @@ +package tn_utils + +import ( + "bytes" + "encoding/binary" + "fmt" + "math" + + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/types" + "github.com/trufnetwork/kwil-db/extensions/precompiles" +) + +// buildPrecompile groups all tn_utils methods into a single precompile bundle so +// migrations can bind to them with `USE tn_utils AS ...`. Keeping the definition +// here makes the exported registration code in extension.go tiny and obvious. +func buildPrecompile() precompiles.Precompile { + return precompiles.Precompile{ + Methods: []precompiles.Method{ + callDispatchMethod(), + byteaJoinMethod(), + byteaLengthPrefixMethod(), + byteaLengthPrefixManyMethod(), + encodeUintMethod("encode_uint8", 8), + encodeUintMethod("encode_uint16", 16), + encodeUintMethod("encode_uint32", 32), + encodeUintMethod("encode_uint64", 64), + }, + } +} + +// callDispatchMethod exposes deterministic metered dispatch to another action. +// Arguments and results are transferred as canonical byte blobs so cross-validator +// comparisons remain byte-for-byte identical. +func callDispatchMethod() precompiles.Method { + return precompiles.Method{ + Name: "call_dispatch", + AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC}, + Parameters: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("action_name", types.TextType, false), + precompiles.NewPrecompileValue("args_bytes", types.ByteaType, false), + }, + Returns: &precompiles.MethodReturn{ + IsTable: false, + Fields: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("result_bytes", types.ByteaType, false), + }, + }, + Handler: callDispatchHandler, + } +} + +// byteaJoinMethod mirrors the behaviour of the Go canonical encoder: accepts a +// bytea array, tolerates NULL entries, and joins segments with an optional delimiter. +func byteaJoinMethod() precompiles.Method { + // Mirrors the Go-side canonical concatenation: accepts BYTEA arrays, treats nil + // chunks/delimiters as empty, and preserves deterministic ordering. We cannot + // get the same behaviour with SQL's || operator (it is binary-only and NULL + // propagates), so keeping this precompile ensures SQL migrations stay aligned + // with the attestation encoder. + return precompiles.Method{ + Name: "bytea_join", + AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC}, + Parameters: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("chunks", types.ByteaArrayType, false), + precompiles.NewPrecompileValue("delimiter", types.ByteaType, true), + }, + Returns: &precompiles.MethodReturn{ + IsTable: false, + Fields: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("merged", types.ByteaType, false), + }, + }, + Handler: byteaJoinHandler, + } +} + +// byteaLengthPrefixMethod length-prefixes a single chunk using little endian so +// validators can unambiguously slice the serialized payload. +func byteaLengthPrefixMethod() precompiles.Method { + return precompiles.Method{ + Name: "bytea_length_prefix", + AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC}, + Parameters: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("chunk", types.ByteaType, false), + }, + Returns: &precompiles.MethodReturn{ + IsTable: false, + Fields: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("prefixed", types.ByteaType, false), + }, + }, + Handler: byteaLengthPrefixHandler, + } +} + +// byteaLengthPrefixManyMethod maps length-prefixing across a bytea array; used +// for canonical payload construction where every field needs a length tag. +func byteaLengthPrefixManyMethod() precompiles.Method { + return precompiles.Method{ + Name: "bytea_length_prefix_many", + AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC}, + Parameters: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("chunks", types.ByteaArrayType, false), + }, + Returns: &precompiles.MethodReturn{ + IsTable: false, + Fields: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("prefixed_chunks", types.ByteaArrayType, false), + }, + }, + Handler: byteaLengthPrefixManyHandler, + } +} + +// encodeUintMethod registers a family of fixed-width unsigned integer encoders +// (8/16/32/64 bit). SQL only has signed ints, so we validate ranges before +// emitting the big-endian bytes expected by the attestation format. +func encodeUintMethod(name string, bits int) precompiles.Method { + return precompiles.Method{ + Name: name, + AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC}, + Parameters: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("value", types.IntType, false), + }, + Returns: &precompiles.MethodReturn{ + IsTable: false, + Fields: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("bytes", types.ByteaType, false), + }, + }, + Handler: encodeUintHandler(bits), + } +} + +// callDispatchHandler decodes action arguments, executes the target action inside +// the current engine context, canonicalises the resulting rows, and hands the +// bytes back to SQL. Any mismatch in decoding or execution bubbles up as an error. +func callDispatchHandler(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { + actionName, ok := inputs[0].(string) + if !ok { + return fmt.Errorf("action_name must be string, got %T", inputs[0]) + } + + argsBytes, ok := inputs[1].([]byte) + if !ok { + return fmt.Errorf("args_bytes must be []byte, got %T", inputs[1]) + } + + args, err := DecodeActionArgs(argsBytes) + if err != nil { + return fmt.Errorf("failed to decode args for action '%s': %w", actionName, err) + } + + var rows []*common.Row + _, err = app.Engine.Call(ctx, app.DB, "main", actionName, args, func(row *common.Row) error { + rows = append(rows, row) + return nil + }) + if err != nil { + return fmt.Errorf("action '%s' call failed: %w", actionName, err) + } + + resultBytes, err := EncodeQueryResultCanonical(rows) + if err != nil { + return fmt.Errorf("failed to encode results from action '%s': %w", actionName, err) + } + + return resultFn([]any{resultBytes}) +} + +// byteaJoinHandler concatenates the provided chunks into a single bytea value, +// normalising nil delimiters/chunks to empty slices to stay deterministic. +func byteaJoinHandler(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { + chunks, err := toByteSliceArray(inputs[0]) + if err != nil { + return err + } + + delimiter, err := toByteSliceAllowNil(inputs[1]) + if err != nil { + return err + } + if delimiter == nil { + delimiter = []byte{} + } + + var buf bytes.Buffer + for i, chunk := range chunks { + if i > 0 && len(delimiter) > 0 { + buf.Write(delimiter) + } + if len(chunk) > 0 { + buf.Write(chunk) + } + } + + return resultFn([]any{buf.Bytes()}) +} + +// byteaLengthPrefixHandler prepends a 4-byte little-endian length header to the +// provided chunk and returns the combined byte slice. +func byteaLengthPrefixHandler(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { + chunk, err := toByteSliceAllowNil(inputs[0]) + if err != nil { + return err + } + + return resultFn([]any{lengthPrefixBytes(chunk)}) +} + +// byteaLengthPrefixManyHandler applies lengthPrefixBytes to each element in a +// bytea array, returning the transformed array. +func byteaLengthPrefixManyHandler(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { + chunks, err := toByteSliceArray(inputs[0]) + if err != nil { + return err + } + + prefixed := make([][]byte, len(chunks)) + for i, chunk := range chunks { + prefixed[i] = lengthPrefixBytes(chunk) + } + + return resultFn([]any{prefixed}) +} + +// encodeUintHandler validates the integer fits within the target width and +// serialises it using big-endian order. It supports the four unsigned widths +// needed for the canonical payload. +func encodeUintHandler(bits int) precompiles.HandlerFunc { + return func(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { + value, err := toInt64(inputs[0]) + if err != nil { + return err + } + if value < 0 { + return fmt.Errorf("value must be non-negative, got %d", value) + } + + var encoded []byte + switch bits { + case 8: + if value > math.MaxUint8 { + return fmt.Errorf("value %d exceeds uint8 max", value) + } + encoded = []byte{byte(value)} + case 16: + if value > math.MaxUint16 { + return fmt.Errorf("value %d exceeds uint16 max", value) + } + encoded = make([]byte, 2) + binary.BigEndian.PutUint16(encoded, uint16(value)) + case 32: + if value > math.MaxUint32 { + return fmt.Errorf("value %d exceeds uint32 max", value) + } + encoded = make([]byte, 4) + binary.BigEndian.PutUint32(encoded, uint32(value)) + case 64: + encoded = make([]byte, 8) + binary.BigEndian.PutUint64(encoded, uint64(value)) + default: + return fmt.Errorf("unsupported integer size %d", bits) + } + + return resultFn([]any{encoded}) + } +} + +func toByteSliceArray(value any) ([][]byte, error) { + switch v := value.(type) { + case [][]byte: + return v, nil + case []any: + result := make([][]byte, len(v)) + for i, elem := range v { + if elem == nil { + result[i] = nil + continue + } + b, err := toByteSlice(elem) + if err != nil { + return nil, fmt.Errorf("chunks[%d]: %w", i, err) + } + result[i] = b + } + return result, nil + default: + return nil, fmt.Errorf("chunks must be [][]byte, got %T", value) + } +} + +func toByteSlice(value any) ([]byte, error) { + switch v := value.(type) { + case []byte: + return v, nil + case string: + return []byte(v), nil + default: + return nil, fmt.Errorf("expected []byte, got %T", value) + } +} + +func toByteSliceAllowNil(value any) ([]byte, error) { + if value == nil { + return nil, nil + } + return toByteSlice(value) +} + +func lengthPrefixBytes(chunk []byte) []byte { + if chunk == nil { + chunk = []byte{} + } + prefixed := make([]byte, 4+len(chunk)) + binary.LittleEndian.PutUint32(prefixed[:4], uint32(len(chunk))) + copy(prefixed[4:], chunk) + return prefixed +} + +func toInt64(value any) (int64, error) { + switch v := value.(type) { + case int64: + return v, nil + case int32: + return int64(v), nil + case int: + return int64(v), nil + case uint64: + if v > math.MaxInt64 { + return 0, fmt.Errorf("value %d exceeds int64 max", v) + } + return int64(v), nil + case uint32: + return int64(v), nil + case uint16: + return int64(v), nil + case uint8: + return int64(v), nil + default: + return 0, fmt.Errorf("expected integer type, got %T", value) + } +} diff --git a/extensions/tn_utils/serialization.go b/extensions/tn_utils/serialization.go new file mode 100644 index 000000000..539ccf120 --- /dev/null +++ b/extensions/tn_utils/serialization.go @@ -0,0 +1,239 @@ +// Package tn_utils provides reusable precompiles for dynamic dispatch and +// canonical serialization helpers. +// +// This package leverages Kwil's native encoding (types.EncodeValue) to ensure +// byte-for-byte identical serialization across all validators, supporting all +// Kwil data types automatically. +// +// Security Note: +// The call_dispatch precompile does NOT enforce any access control or allowlists. +// Callers are responsible for validating which actions can be dispatched before +// invoking this precompile. +// +// Encoding Format: +// Uses Kwil's types.EncodedValue format which includes type information and deterministic +// byte ordering (BigEndian for integers, length-prefixed for variable-length data). +package tn_utils + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/types" +) + +// EncodeActionArgs encodes action arguments into canonical bytes using Kwil's native encoding. +// Each argument is encoded with its type information for deterministic execution. +// +// Format: [arg_count:uint32][encoded_arg1][encoded_arg2]... +// Where each encoded_arg uses types.EncodedValue.MarshalBinary() format +// +// Supported types (via types.EncodeValue): +// - nil +// - int, int8, int16, int32, int64, uint, uint16, uint32, uint64 +// - string +// - []byte +// - bool +// - [16]byte, types.UUID (UUID) +// - types.Decimal +// - Arrays of the above types (e.g., []string, []int64) +// +// Returns an error if any argument cannot be encoded by Kwil's type system. +func EncodeActionArgs(args []any) ([]byte, error) { + buf := new(bytes.Buffer) + + // Write argument count + if err := binary.Write(buf, binary.LittleEndian, uint32(len(args))); err != nil { + return nil, fmt.Errorf("failed to write arg count: %w", err) + } + + // Encode each argument using Kwil's native encoding + for i, arg := range args { + encodedVal, err := types.EncodeValue(arg) + if err != nil { + return nil, fmt.Errorf("failed to encode arg %d: %w", i, err) + } + + // Serialize the EncodedValue + argBytes, err := encodedVal.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("failed to marshal arg %d: %w", i, err) + } + + // Write length-prefixed argument bytes + if err := binary.Write(buf, binary.LittleEndian, uint32(len(argBytes))); err != nil { + return nil, fmt.Errorf("failed to write arg %d length: %w", i, err) + } + if _, err := buf.Write(argBytes); err != nil { + return nil, fmt.Errorf("failed to write arg %d bytes: %w", i, err) + } + } + + return buf.Bytes(), nil +} + +// DecodeActionArgs decodes canonical bytes back into action arguments. +// This is the inverse operation of EncodeActionArgs. +// +// Returns an error if: +// - Data is too short to contain arg count +// - Individual arguments fail to decode +// - Type information is invalid +func DecodeActionArgs(data []byte) ([]any, error) { + if len(data) < 4 { + return nil, fmt.Errorf("data too short for arg count") + } + + buf := bytes.NewReader(data) + + // Read argument count + var argCount uint32 + if err := binary.Read(buf, binary.LittleEndian, &argCount); err != nil { + return nil, fmt.Errorf("failed to read arg count: %w", err) + } + + args := make([]any, argCount) + + // Decode each argument + for i := uint32(0); i < argCount; i++ { + // Read argument length + var argLen uint32 + if err := binary.Read(buf, binary.LittleEndian, &argLen); err != nil { + return nil, fmt.Errorf("failed to read arg %d length: %w", i, err) + } + + // Read argument bytes + argBytes := make([]byte, argLen) + if _, err := io.ReadFull(buf, argBytes); err != nil { + return nil, fmt.Errorf("failed to read arg %d bytes: %w", i, err) + } + + // Unmarshal EncodedValue + var encodedVal types.EncodedValue + if err := encodedVal.UnmarshalBinary(argBytes); err != nil { + return nil, fmt.Errorf("failed to unmarshal arg %d: %w", i, err) + } + + // Decode to Go value + decodedVal, err := encodedVal.Decode() + if err != nil { + return nil, fmt.Errorf("failed to decode arg %d value: %w", i, err) + } + + args[i] = decodedVal + } + + return args, nil +} + +// EncodeQueryResultCanonical encodes query results into canonical bytes using Kwil's native encoding. +// All validators executing the same query will produce identical bytes. +// +// Format: [row_count:uint32][row1][row2]... +// Each row: [col_count:uint32][encoded_col1][encoded_col2]... +// Where each encoded_col uses types.EncodeValue format +// +// The column order follows the query result set order. Kwil enforces deterministic ordering +// by automatically adding ORDER BY clauses to queries when needed. +// +// Returns an error if: +// - Any value has an unsupported type +// - Encoding operations fail +func EncodeQueryResultCanonical(rows []*common.Row) ([]byte, error) { + buf := new(bytes.Buffer) + + // Write row count + if err := binary.Write(buf, binary.LittleEndian, uint32(len(rows))); err != nil { + return nil, fmt.Errorf("failed to write row count: %w", err) + } + + // Encode each row + for i, row := range rows { + // Write column count + if err := binary.Write(buf, binary.LittleEndian, uint32(len(row.Values))); err != nil { + return nil, fmt.Errorf("failed to write col count for row %d: %w", i, err) + } + + // Encode each column value using Kwil's native encoding + for j, value := range row.Values { + encodedVal, err := types.EncodeValue(value) + if err != nil { + return nil, fmt.Errorf("failed to encode col %d of row %d: %w", j, i, err) + } + + // Serialize the EncodedValue + colBytes, err := encodedVal.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("failed to marshal col %d of row %d: %w", j, i, err) + } + + // Write length-prefixed column bytes + if err := binary.Write(buf, binary.LittleEndian, uint32(len(colBytes))); err != nil { + return nil, fmt.Errorf("failed to write col %d length for row %d: %w", j, i, err) + } + if _, err := buf.Write(colBytes); err != nil { + return nil, fmt.Errorf("failed to write col %d bytes for row %d: %w", j, i, err) + } + } + } + + return buf.Bytes(), nil +} + +// DecodeQueryResultCanonical decodes canonical bytes back into query results. +// This is the inverse operation of EncodeQueryResultCanonical. +func DecodeQueryResultCanonical(data []byte) ([]*common.Row, error) { + if len(data) < 4 { + return nil, fmt.Errorf("data too short for row count") + } + + buf := bytes.NewReader(data) + + // Read row count + var rowCount uint32 + if err := binary.Read(buf, binary.LittleEndian, &rowCount); err != nil { + return nil, fmt.Errorf("failed to read row count: %w", err) + } + + rows := make([]*common.Row, rowCount) + + for i := uint32(0); i < rowCount; i++ { + row := &common.Row{} + + var colCount uint32 + if err := binary.Read(buf, binary.LittleEndian, &colCount); err != nil { + return nil, fmt.Errorf("failed to read col count for row %d: %w", i, err) + } + + for j := uint32(0); j < colCount; j++ { + var colLen uint32 + if err := binary.Read(buf, binary.LittleEndian, &colLen); err != nil { + return nil, fmt.Errorf("failed to read col %d length for row %d: %w", j, i, err) + } + + colBytes := make([]byte, colLen) + if _, err := io.ReadFull(buf, colBytes); err != nil { + return nil, fmt.Errorf("failed to read col %d bytes for row %d: %w", j, i, err) + } + + var encodedVal types.EncodedValue + if err := encodedVal.UnmarshalBinary(colBytes); err != nil { + return nil, fmt.Errorf("failed to unmarshal col %d of row %d: %w", j, i, err) + } + + decodedVal, err := encodedVal.Decode() + if err != nil { + return nil, fmt.Errorf("failed to decode col %d value of row %d: %w", j, i, err) + } + + row.Values = append(row.Values, decodedVal) + } + + rows[i] = row + } + + return rows, nil +} diff --git a/extensions/tn_utils/serialization_test.go b/extensions/tn_utils/serialization_test.go new file mode 100644 index 000000000..103e417c2 --- /dev/null +++ b/extensions/tn_utils/serialization_test.go @@ -0,0 +1,463 @@ +package tn_utils + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/core/types" + "github.com/trufnetwork/kwil-db/extensions/precompiles" +) + +func TestEncodeDecodeBasicTypes(t *testing.T) { + original := []any{ + int64(42), + int32(100), + "hello", + []byte("world"), + true, + nil, + } + + encoded, err := EncodeActionArgs(original) + require.NoError(t, err) + + decoded, err := DecodeActionArgs(encoded) + require.NoError(t, err) + + assert.Equal(t, normalizeDecodedSlice(original), normalizeDecodedSlice(decoded)) +} + +func TestEncodeDecodeArrays(t *testing.T) { + original := []any{ + []string{"hello", "world"}, + []int64{1, 2, 3}, + []bool{true, false, true}, + } + + encoded, err := EncodeActionArgs(original) + require.NoError(t, err) + + decoded, err := DecodeActionArgs(encoded) + require.NoError(t, err) + + assert.Equal(t, normalizeDecodedSlice(original), normalizeDecodedSlice(decoded)) +} + +func TestEncodeDecodeDecimal(t *testing.T) { + original := []any{types.MustParseDecimal("123.456")} + + encoded, err := EncodeActionArgs(original) + require.NoError(t, err) + + decoded, err := DecodeActionArgs(encoded) + require.NoError(t, err) + + assert.Equal(t, normalizeDecodedSlice(original), normalizeDecodedSlice(decoded)) +} + +func TestEncodeDecodeUUID(t *testing.T) { + uuid := types.UUID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + original := []any{uuid} + + encoded, err := EncodeActionArgs(original) + require.NoError(t, err) + + decoded, err := DecodeActionArgs(encoded) + require.NoError(t, err) + + assert.Equal(t, normalizeDecodedSlice(original), normalizeDecodedSlice(decoded)) +} + +func TestDeterministicEncoding(t *testing.T) { + original := []any{int64(42), "hello", []byte("world")} + + encoded1, err := EncodeActionArgs(original) + require.NoError(t, err) + + encoded2, err := EncodeActionArgs(original) + require.NoError(t, err) + + assert.Equal(t, encoded1, encoded2) +} + +func TestEmptyArgs(t *testing.T) { + original := []any{} + + encoded, err := EncodeActionArgs(original) + require.NoError(t, err) + + decoded, err := DecodeActionArgs(encoded) + require.NoError(t, err) + + assert.Equal(t, normalizeDecodedSlice(original), normalizeDecodedSlice(decoded)) +} + +func TestLargeArray(t *testing.T) { + original := []any{ + []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}, + } + + encoded, err := EncodeActionArgs(original) + require.NoError(t, err) + + decoded, err := DecodeActionArgs(encoded) + require.NoError(t, err) + + assert.Equal(t, normalizeDecodedSlice(original), normalizeDecodedSlice(decoded)) +} + +func TestMixedTypes(t *testing.T) { + dec := types.MustParseDecimal("999.999") + uuid := types.UUID([16]byte{16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}) + + original := []any{ + int64(-42), + int32(-100), + uint64(1234567890123456789), + "complex string with émojis 🚀", + []byte{0x00, 0x01, 0x02, 0x03, 0xFF}, + true, + false, + nil, + dec, + uuid, + []int64{-1, 0, 1}, + []string{"", "non-empty"}, + [][]byte{{}, {0xFF}}, + } + + encoded, err := EncodeActionArgs(original) + require.NoError(t, err) + + decoded, err := DecodeActionArgs(encoded) + require.NoError(t, err) + + assert.Equal(t, normalizeDecodedSlice(original), normalizeDecodedSlice(decoded)) +} + +// Helpers ------------------------------------------------------------------- + +func normalizeDecodedSlice(values []any) []any { + normalized := make([]any, len(values)) + for i, v := range values { + normalized[i] = normalizeDecodedValue(v) + } + return normalized +} + +func cloneByteSlices(in [][]byte) [][]byte { + if in == nil { + return nil + } + out := make([][]byte, len(in)) + for i := range in { + if in[i] == nil { + continue + } + out[i] = append([]byte(nil), in[i]...) + } + return out +} + +func normalizeDecodedValue(val any) any { + if val == nil { + return nil + } + rv := reflect.ValueOf(val) + if rv.Kind() == reflect.Pointer { + if rv.IsNil() { + return nil + } + // Preserve decimal pointers so callers that rely on mutability keep references. + if rv.Type().Elem() == reflect.TypeOf(types.Decimal{}) { + return val + } + return normalizeDecodedValue(rv.Elem().Interface()) + } + + switch rv.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return rv.Int() + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return int64(rv.Uint()) + case reflect.Bool: + return rv.Bool() + case reflect.String: + return rv.String() + case reflect.Slice: + if rv.Type().Elem().Kind() == reflect.Uint8 { + b := make([]byte, rv.Len()) + reflect.Copy(reflect.ValueOf(b), rv) + return b + } + normalizedElems := make([]any, rv.Len()) + allString := true + allInt := true + allBool := true + allBytes := true + for i := 0; i < rv.Len(); i++ { + norm := normalizeDecodedValue(rv.Index(i).Interface()) + normalizedElems[i] = norm + switch norm.(type) { + case string: + allInt = false + allBool = false + case int64: + allString = false + allBool = false + case bool: + allString = false + allInt = false + case []byte: + allString = false + allInt = false + allBool = false + default: + allString = false + allInt = false + allBool = false + allBytes = false + } + if _, ok := norm.([]byte); !ok { + allBytes = false + } + } + switch { + case allString: + out := make([]string, len(normalizedElems)) + for i, elem := range normalizedElems { + out[i] = elem.(string) + } + return out + case allInt: + out := make([]int64, len(normalizedElems)) + for i, elem := range normalizedElems { + out[i] = elem.(int64) + } + return out + case allBool: + out := make([]bool, len(normalizedElems)) + for i, elem := range normalizedElems { + out[i] = elem.(bool) + } + return out + case allBytes: + out := make([][]byte, len(normalizedElems)) + for i, elem := range normalizedElems { + out[i] = elem.([]byte) + } + return out + default: + return normalizedElems + } + case reflect.Struct: + return val + default: + return val + } +} + +func TestErrorHandling(t *testing.T) { + t.Run("empty_data", func(t *testing.T) { + _, err := DecodeActionArgs([]byte{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "too short") + }) + + t.Run("short_data", func(t *testing.T) { + _, err := DecodeActionArgs([]byte{0x01, 0x00, 0x00}) + require.Error(t, err) + assert.Contains(t, err.Error(), "too short") + }) + + t.Run("invalid_length", func(t *testing.T) { + data := []byte{ + 0x01, 0x00, 0x00, 0x00, + 0xFF, 0xFF, 0xFF, 0xFF, + } + _, err := DecodeActionArgs(data) + require.Error(t, err) + }) +} + +func TestByteaJoinHandler(t *testing.T) { + tests := []struct { + name string + chunks any + delimiter any + expected []byte + }{ + { + name: "basic join", + chunks: [][]byte{[]byte("foo"), []byte("bar")}, + delimiter: []byte("|"), + expected: []byte("foo|bar"), + }, + { + name: "empty delimiter", + chunks: [][]byte{[]byte("a"), []byte("b")}, + delimiter: []byte{}, + expected: []byte("ab"), + }, + { + name: "single chunk", + chunks: [][]byte{[]byte("solo")}, + delimiter: []byte("|"), + expected: []byte("solo"), + }, + { + name: "with nil chunks", + chunks: []any{[]byte("a"), nil, []byte("c")}, + delimiter: []byte("|"), + expected: []byte("a||c"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var result []any + err := byteaJoinHandler(nil, nil, []any{tt.chunks, tt.delimiter}, func(out []any) error { + result = out + return nil + }) + require.NoError(t, err) + require.Len(t, result, 1) + assert.Equal(t, tt.expected, result[0]) + }) + } +} + +func TestByteaJoinHandlerErrors(t *testing.T) { + t.Run("invalid_chunks_type", func(t *testing.T) { + err := byteaJoinHandler(nil, nil, []any{123, []byte("|")}, func([]any) error { return nil }) + require.Error(t, err) + }) + + t.Run("invalid_delimiter_type", func(t *testing.T) { + err := byteaJoinHandler(nil, nil, []any{[][]byte{[]byte("a")}, 123}, func([]any) error { return nil }) + require.Error(t, err) + }) +} + +func TestByteaLengthPrefixHandler(t *testing.T) { + tests := []struct { + name string + chunk any + expected []byte + }{ + { + name: "nil chunk", + chunk: nil, + expected: []byte{0, 0, 0, 0}, + }, + { + name: "non-empty", + chunk: []byte{0x01, 0x02}, + expected: []byte{2, 0, 0, 0, 0x01, 0x02}, + }, + { + name: "string input", + chunk: "abc", + expected: []byte{3, 0, 0, 0, 'a', 'b', 'c'}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var out []any + err := byteaLengthPrefixHandler(nil, nil, []any{tt.chunk}, func(result []any) error { + out = result + return nil + }) + require.NoError(t, err) + require.Len(t, out, 1) + assert.Equal(t, tt.expected, out[0]) + }) + } +} + +func TestByteaLengthPrefixManyHandler(t *testing.T) { + input := [][]byte{ + []byte("a"), + nil, + []byte("bc"), + } + + var out []any + err := byteaLengthPrefixManyHandler(nil, nil, []any{input}, func(result []any) error { + out = result + return nil + }) + require.NoError(t, err) + require.Len(t, out, 1) + + prefixed, ok := out[0].([][]byte) + require.True(t, ok) + require.Len(t, prefixed, 3) + + assert.Equal(t, []byte{1, 0, 0, 0, 'a'}, prefixed[0]) + assert.Equal(t, []byte{0, 0, 0, 0}, prefixed[1]) + assert.Equal(t, []byte{2, 0, 0, 0, 'b', 'c'}, prefixed[2]) +} + +func TestEncodeUintHandlers(t *testing.T) { + type testCase struct { + name string + handler precompiles.HandlerFunc + input any + expected []byte + } + + cases := []testCase{ + { + name: "uint8", + handler: encodeUintHandler(8), + input: int64(255), + expected: []byte{0xFF}, + }, + { + name: "uint16", + handler: encodeUintHandler(16), + input: int64(0xABCD), + expected: []byte{0xAB, 0xCD}, + }, + { + name: "uint32", + handler: encodeUintHandler(32), + input: int64(0x01020304), + expected: []byte{0x01, 0x02, 0x03, 0x04}, + }, + { + name: "uint64", + handler: encodeUintHandler(64), + input: int64(0x0102030405060708), + expected: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var out []any + err := tc.handler(nil, nil, []any{tc.input}, func(result []any) error { + out = result + return nil + }) + require.NoError(t, err) + require.Len(t, out, 1) + assert.Equal(t, tc.expected, out[0]) + }) + } + + t.Run("value exceeds range", func(t *testing.T) { + err := encodeUintHandler(8)(nil, nil, []any{int64(300)}, func([]any) error { return nil }) + require.Error(t, err) + }) + + t.Run("negative value", func(t *testing.T) { + err := encodeUintHandler(16)(nil, nil, []any{int64(-1)}, func([]any) error { return nil }) + require.Error(t, err) + }) +} diff --git a/internal/migrations/000-extensions.sql b/internal/migrations/000-extensions.sql index 724414946..da4706596 100644 --- a/internal/migrations/000-extensions.sql +++ b/internal/migrations/000-extensions.sql @@ -3,4 +3,6 @@ -- Isolating the `USE` statements is crucial for idempotency, allowing migrations -- to be re-applied safely and ensuring objects are created in the correct schema. USE tn_cache AS tn_cache; -USE database_size AS database_size; \ No newline at end of file +USE database_size AS database_size; +USE tn_utils AS tn_utils; +USE tn_attestation AS tn_attestation; diff --git a/internal/migrations/023-attestation-schema.sql b/internal/migrations/023-attestation-schema.sql new file mode 100644 index 000000000..d6dfc6d03 --- /dev/null +++ b/internal/migrations/023-attestation-schema.sql @@ -0,0 +1,46 @@ +/* + * ATTESTATION SCHEMA MIGRATION + * + * Creates the essential tables needed for the attestation system: + * - attestations: Stores attestation requests and signatures with composite PK (requester, attestation_hash) + * - attestation_actions: Allowlist of actions permitted for attestation with normalized IDs + */ + +-- Attestations table with composite primary key supporting per-user attestations +CREATE TABLE IF NOT EXISTS attestations ( + attestation_hash BYTEA NOT NULL, + requester BYTEA NOT NULL, + result_canonical BYTEA NOT NULL, + encrypt_sig BOOLEAN NOT NULL DEFAULT false, + created_height INT8 NOT NULL, + signature BYTEA, + validator_pubkey BYTEA, + signed_height INT8, + + CONSTRAINT pk_attestations PRIMARY KEY (requester, created_height, attestation_hash), + CONSTRAINT chk_att_encrypt_sig_false CHECK (encrypt_sig = false) +); + +-- Allowlist table for actions permitted for attestation +CREATE TABLE IF NOT EXISTS attestation_actions ( + action_name TEXT PRIMARY KEY, + action_id INT NOT NULL UNIQUE, + + CONSTRAINT chk_att_action_id_range CHECK (action_id >= 1 AND action_id <= 255) +); + +-- Indexes for efficient querying +CREATE INDEX IF NOT EXISTS ix_att_created_height + ON attestations(created_height); + +CREATE INDEX IF NOT EXISTS ix_att_signed_height + ON attestations(signed_height); + +-- Bootstrap the action ID registry per issue #1197 +INSERT INTO attestation_actions (action_name, action_id) VALUES + ('get_record', 1), + ('get_index', 2), + ('get_change_over_time', 3), + ('get_last_record', 4), + ('get_first_record', 5) +ON CONFLICT (action_name) DO NOTHING; diff --git a/internal/migrations/024-attestation-actions.sql b/internal/migrations/024-attestation-actions.sql new file mode 100644 index 000000000..497ac77a1 --- /dev/null +++ b/internal/migrations/024-attestation-actions.sql @@ -0,0 +1,114 @@ +/* + * ATTESTATION ACTIONS MIGRATION + * + * Current scope: + * - request_attestation: User requests signed attestation of query results + * + * Placeholders: + * - sign_attestation – TODO + * - get_signed_attestation / list_attestations – TODO + */ + +-- ============================================================================= +-- CORE ATTESTATION ACTIONS +-- ============================================================================= + +/** + * request_attestation: Request signed attestation of query results + * + * Validates action is allowed, executes query deterministically, calculates + * attestation hash, stores unsigned attestation, and queues for signing. + */ +CREATE OR REPLACE ACTION request_attestation( + $data_provider BYTEA, + $stream_id BYTEA, + $action_name TEXT, + $args_bytes BYTEA, + $encrypt_sig BOOLEAN, +$max_fee INT8 +) PUBLIC RETURNS (attestation_hash BYTEA) { + -- Validate encryption flag (must be false in MVP) + if $encrypt_sig = true { + ERROR('Encryption not implemented'); + } + + -- Validate action is in allowlist + $action_id := 0; + for $row in SELECT action_id FROM attestation_actions WHERE action_name = $action_name { + $action_id := $row.action_id; + } + if $action_id = 0 { + ERROR('Action not allowed for attestation: ' || $action_name); + } + + -- Get current block height + $created_height := @height; + + -- Normalize caller address to bytes for storage + $caller_hex := LOWER(substring(@caller, 3, 40)); + $caller_bytes := decode($caller_hex, 'hex'); + + -- Execute target query deterministically using tn_utils.call_dispatch precompile + -- TODO: some arguments are not deterministic, such as `use_cache` + -- we should aim at filtering these out before we release attestations. + -- One idea is to also store a force_args in the whitelisted actions. Then this should help us force + -- some args per action + $query_result := tn_utils.call_dispatch($action_name, $args_bytes); + + -- Calculate attestation hash from (version|algo|data_provider|stream_id|action_id|args) + $version := 1; + $algo := 1; -- secp256k1 + -- Serialize canonical payload (version through result) using tn_utils helpers + $version_bytes := tn_utils.encode_uint8($version::INT); + $algo_bytes := tn_utils.encode_uint8($algo::INT); + $height_bytes := tn_utils.encode_uint64($created_height::INT); + $action_id_bytes := tn_utils.encode_uint16($action_id::INT); + + -- Build hash material in canonical order (no length prefixes) to match + -- the engine-side hashing utilities used by the signing service. + $hash_input := tn_utils.bytea_join(ARRAY[ + $version_bytes, + $algo_bytes, + $data_provider, + $stream_id, + $action_id_bytes, + $args_bytes + ], NULL); + $attestation_hash := digest($hash_input, 'sha256'); + + -- Canonical payload mirrors Go helpers: each field length-prefixed so the + -- validator can recover every component without ambiguity. + $result_canonical := tn_utils.bytea_join(ARRAY[ + $version_bytes, + $algo_bytes, + $height_bytes, + tn_utils.bytea_length_prefix($data_provider), + tn_utils.bytea_length_prefix($stream_id), + $action_id_bytes, + tn_utils.bytea_length_prefix($args_bytes), + tn_utils.bytea_length_prefix($query_result) + ], NULL); + + -- Store unsigned attestation + INSERT INTO attestations ( + attestation_hash, requester, result_canonical, encrypt_sig, + created_height, signature, validator_pubkey, signed_height + ) VALUES ( + $attestation_hash, $caller_bytes, $result_canonical, $encrypt_sig, + $created_height, NULL, NULL, NULL + ); + + -- Queue for signing (no-op on non-leader validators; handled by precompile) + tn_attestation.queue_for_signing(encode($attestation_hash, 'hex')); + + RETURN $attestation_hash; +}; + +-- ----------------------------------------------------------------------------- +-- TODO: sign_attestation +-- Placeholder to avoid merge conflicts with the signing workflow. +-- CREATE OR REPLACE ACTION sign_attestation(...) { ... }; + +-- TODO: get_signed_attestation / list_attestations +-- CREATE OR REPLACE ACTION get_signed_attestation(...) { ... }; +-- CREATE OR REPLACE ACTION list_attestations(...) RETURNS TABLE(...) { ... }; diff --git a/tests/streams/attestation/attestation_request_test.go b/tests/streams/attestation/attestation_request_test.go new file mode 100644 index 000000000..17568b6f5 --- /dev/null +++ b/tests/streams/attestation/attestation_request_test.go @@ -0,0 +1,245 @@ +//go:build kwiltest + +package tests + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/trufnetwork/kwil-db/common" + kwilTesting "github.com/trufnetwork/kwil-db/testing" + "github.com/trufnetwork/node/extensions/tn_utils" + "github.com/trufnetwork/node/internal/migrations" + testutils "github.com/trufnetwork/node/tests/streams/utils" + "github.com/trufnetwork/sdk-go/core/util" +) + +func TestRequestAttestationInsertsCanonicalPayload(t *testing.T) { + const ( + testActionName = "test_attestation_action" + testActionID = 10 + ) + ownerAddr := util.Unsafe_NewEthereumAddressFromString("0x0000000000000000000000000000000000000a11") + + testutils.RunSchemaTest(t, kwilTesting.SchemaTest{ + Name: "ATTESTATION01_RequestInsertion", + SeedScripts: migrations.GetSeedScriptPaths(), + Owner: ownerAddr.Address(), + FunctionTests: []kwilTesting.TestFunc{ + func(ctx context.Context, platform *kwilTesting.Platform) error { + platform.Deployer = ownerAddr.Bytes() + + require.NoError(t, setupTestAttestationAction(ctx, platform, testActionName, testActionID)) + runAttestationHappyPath(t, ctx, platform, testActionName, testActionID) + return nil + }, + }, + }, testutils.GetTestOptionsWithCache()) +} + +func setupTestAttestationAction(ctx context.Context, platform *kwilTesting.Platform, actionName string, actionID int) error { + engineCtx, err := newEngineContext(ctx, platform) + if err != nil { + return err + } + + createAction := ` +CREATE OR REPLACE ACTION ` + actionName + `( + $value INT8 +) PUBLIC VIEW RETURNS TABLE(result INT8) { + RETURN NEXT $value; +};` + + if err := platform.Engine.Execute(engineCtx, platform.DB, createAction, nil, nil); err != nil { + return fmt.Errorf("create action: %w", err) + } + + engineCtx, err = newEngineContext(ctx, platform) + if err != nil { + return err + } + + insertAllowlist := ` +INSERT INTO attestation_actions(action_name, action_id) +VALUES ($action_name, $action_id) +ON CONFLICT (action_name) DO UPDATE SET action_id = EXCLUDED.action_id;` + + params := map[string]any{ + "action_name": actionName, + "action_id": actionID, + } + + if err := platform.Engine.Execute(engineCtx, platform.DB, insertAllowlist, params, nil); err != nil { + return fmt.Errorf("insert attestation action allowlist: %w", err) + } + + return nil +} + +func runAttestationHappyPath(t *testing.T, ctx context.Context, platform *kwilTesting.Platform, actionName string, actionID int) { + const attestedValue int64 = 42 + + dataProvider := []byte("provider-001") + streamID := []byte("stream-abc") + + argsBytes, err := tn_utils.EncodeActionArgs([]any{attestedValue}) + require.NoError(t, err, "encode action args") + + engineCtx, err := newEngineContext(ctx, platform) + require.NoError(t, err) + + var attestationHash []byte + _, err = platform.Engine.Call(engineCtx, platform.DB, "", "request_attestation", []any{ + dataProvider, + streamID, + actionName, + argsBytes, + false, + int64(0), + }, func(row *common.Row) error { + if len(row.Values) != 1 { + return fmt.Errorf("expected single return value, got %d", len(row.Values)) + } + hash, ok := row.Values[0].([]byte) + if !ok { + return fmt.Errorf("expected BYTEA return, got %T", row.Values[0]) + } + attestationHash = append([]byte(nil), hash...) + return nil + }) + require.NoError(t, err) + require.NotEmpty(t, attestationHash, "request_attestation should return attestation hash") + + stored := fetchAttestationRow(t, ctx, platform, attestationHash) + + // Rebuild expected canonical payload + queryResult, err := tn_utils.EncodeQueryResultCanonical([]*common.Row{ + {Values: []any{attestedValue}}, + }) + require.NoError(t, err) + + expectedCanonical := buildExpectedCanonicalPayload( + stored.createdHeight, + dataProvider, + streamID, + actionID, + argsBytes, + queryResult, + ) + + require.Equal(t, expectedCanonical, stored.resultCanonical, "canonical payload mismatch") + require.False(t, stored.encryptSig, "encrypt_sig must remain false in MVP") + require.Nil(t, stored.signature, "signature must be NULL before signing") + require.Nil(t, stored.validatorPubKey, "validator_pubkey must be NULL before signing") + require.Nil(t, stored.signedHeight, "signed_height must be NULL before signing") + require.Equal(t, attestationHash, stored.attestationHash, "returned hash should equal stored hash") +} + +type attestationRow struct { + attestationHash []byte + resultCanonical []byte + encryptSig bool + signature []byte + validatorPubKey []byte + signedHeight *int64 + createdHeight int64 +} + +func fetchAttestationRow(t *testing.T, ctx context.Context, platform *kwilTesting.Platform, hash []byte) attestationRow { + engineCtx, err := newEngineContext(ctx, platform) + require.NoError(t, err) + + var rowData attestationRow + err = platform.Engine.Execute(engineCtx, platform.DB, ` +SELECT attestation_hash, result_canonical, encrypt_sig, signature, validator_pubkey, signed_height, created_height +FROM attestations +WHERE attestation_hash = $hash; +`, map[string]any{"hash": hash}, func(row *common.Row) error { + rowData.attestationHash = append([]byte(nil), row.Values[0].([]byte)...) + rowData.resultCanonical = append([]byte(nil), row.Values[1].([]byte)...) + rowData.encryptSig = row.Values[2].(bool) + if row.Values[3] != nil { + rowData.signature = append([]byte(nil), row.Values[3].([]byte)...) + } + if row.Values[4] != nil { + rowData.validatorPubKey = append([]byte(nil), row.Values[4].([]byte)...) + } + if row.Values[5] != nil { + height := row.Values[5].(int64) + rowData.signedHeight = &height + } + rowData.createdHeight = row.Values[6].(int64) + return nil + }) + require.NoError(t, err) + require.NotNil(t, rowData.resultCanonical, "attestation row must exist") + + return rowData +} + +func buildExpectedCanonicalPayload( + createdHeight int64, + dataProvider []byte, + streamID []byte, + actionID int, + argsBytes []byte, + queryResult []byte, +) []byte { + versionBytes := []byte{1} + algoBytes := []byte{1} + + heightBytes := make([]byte, 8) + binary.BigEndian.PutUint64(heightBytes, uint64(createdHeight)) + + actionIDBytes := make([]byte, 2) + binary.BigEndian.PutUint16(actionIDBytes, uint16(actionID)) + + segments := [][]byte{ + versionBytes, + algoBytes, + heightBytes, + lengthPrefixLittleEndian(dataProvider), + lengthPrefixLittleEndian(streamID), + actionIDBytes, + lengthPrefixLittleEndian(argsBytes), + lengthPrefixLittleEndian(queryResult), + } + + return bytes.Join(segments, nil) +} + +func lengthPrefixLittleEndian(data []byte) []byte { + if data == nil { + data = []byte{} + } + prefixed := make([]byte, 4+len(data)) + binary.LittleEndian.PutUint32(prefixed[:4], uint32(len(data))) + copy(prefixed[4:], data) + return prefixed +} + +func newEngineContext(ctx context.Context, platform *kwilTesting.Platform) (*common.EngineContext, error) { + deployer, err := util.NewEthereumAddressFromBytes(platform.Deployer) + if err != nil { + return nil, fmt.Errorf("create deployer address: %w", err) + } + + txContext := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + }, + Signer: platform.Deployer, + Caller: deployer.Address(), + TxID: platform.Txid(), + } + + return &common.EngineContext{ + TxContext: txContext, + }, nil +} diff --git a/tests/streams/utils/utils.go b/tests/streams/utils/utils.go index 927092823..f726b3725 100644 --- a/tests/streams/utils/utils.go +++ b/tests/streams/utils/utils.go @@ -16,7 +16,9 @@ import ( // Extension registration "github.com/trufnetwork/node/extensions/database-size" + "github.com/trufnetwork/node/extensions/tn_attestation" "github.com/trufnetwork/node/extensions/tn_cache" + "github.com/trufnetwork/node/extensions/tn_utils" "github.com/trufnetwork/node/tests/streams/utils/cache" ) @@ -31,6 +33,9 @@ func init() { if err != nil { panic("failed to register database_size precompiles: " + err.Error()) } + + tn_utils.InitializeExtension() + tn_attestation.InitializeExtension() } // ============================================================================