The TRUF.NETWORK SDK provides a comprehensive toolkit for developers to interact with decentralized data streams. It enables seamless creation, management, and consumption of economic and financial data streams.
- Stream creation and management
- Primitive and composed stream support
- Flexible data retrieval
- Advanced permission management
- Secure, blockchain-backed data streams
The SDK is structured around several key interfaces:
- Client: Primary entry point for network interactions
- Stream: Core stream operations and access control
- Primitive Stream: Raw data stream management (includes
BulkInserterfor high-throughput ingestion) - Composed Stream: Aggregated data stream handling and taxonomy management
- Transaction Actions: Query transaction history, fees, and distributions
- Attestation Actions: Request and parse cryptographically signed attestations for on-chain verification
- Local Stream Actions: Node-local streams via the admin port (off-chain, operator-owned, optional operator-key auth)
- Primitive Streams: Direct data sources with raw data points
- Composed Streams: Aggregated streams combining multiple data sources
- Secure, immutable data recording
- Flexible querying and indexing
- Granular access control
The SDK supports transparent caching through an optional useCache parameter on data retrieval methods (GetRecord, GetIndex, GetFirstRecord, GetIndexChange). When enabled, queries can leverage node-side caching for improved performance, with detailed cache metadata returned in all responses.
package main
import (
"context"
"github.com/trufnetwork/sdk-go/core/tnclient"
"github.com/trufnetwork/sdk-go/core/types"
"github.com/trufnetwork/sdk-go/core/util"
)
func main() {
ctx := context.Background()
// Initialize client with mainnet endpoint
tnClient, err := tnclient.NewClient(
ctx,
"https://gateway.mainnet.truf.network",
tnclient.WithSigner(mySigner),
)
if err != nil {
// Handle client initialization error
}
// Deploy a primitive stream
streamId := util.GenerateStreamId("my-economic-stream")
deployTx, err := tnClient.DeployStream(
ctx,
streamId,
types.StreamTypePrimitive,
)
// Handle deployment and further stream operations
}- Install the SDK
- Configure your network endpoint
- Initialize a client
- Create and manage streams
The Client Interface is the primary entry point for interacting with the TRUF.NETWORK ecosystem. It provides a comprehensive set of methods for managing streams, handling transactions, and interfacing with the underlying blockchain infrastructure.
- Stream lifecycle management
- Transaction handling
- Network interaction
- Address and identity management
Create a client connection to the TRUF.NETWORK:
tnClient, err := tnclient.NewClient(
ctx,
"https://gateway.mainnet.truf.network",
tnclient.WithSigner(mySigner),
// Optional configuration options
)The SDK provides flexible configuration through functional options:
client, err := tnclient.NewClient(
ctx,
"https://gateway.mainnet.truf.network",
tnclient.WithSigner(signer), // Required: Authentication signer
tnclient.WithLogger(logger), // Optional: Custom logger
)WithTransport - Use custom transport implementation:
// For specialized environments (e.g., Chainlink Runtime Environment)
customTransport, err := NewCustomTransport(...)
if err != nil {
return err
}
client, err := tnclient.NewClient(ctx, endpoint,
tnclient.WithTransport(customTransport),
tnclient.WithSigner(signer),
)Use cases for custom transports:
- Chainlink Runtime Environment (CRE) workflows
- Testing with mock transports
- Custom HTTP client requirements
- Alternative communication protocols
The SDK provides specialized options for Chainlink Runtime Environment workflows.
Configures the client to use CRE's HTTP client instead of standard net/http.
func WithCRETransport(runtime cre.NodeRuntime, endpoint string) OptionParameters:
runtime(cre.NodeRuntime): The NodeRuntime fromcre.RunInNodeMode()endpoint(string): TRUF.NETWORK gateway URL
Build Requirement: Must use //go:build wasip1 tag
Example:
//go:build wasip1
package main
import (
"github.com/smartcontractkit/cre-sdk-go/cre"
"github.com/trufnetwork/sdk-go/core/tnclient"
)
func onTrigger(config *Config, runtime cre.Runtime) (*Result, error) {
return cre.RunInNodeMode(config, runtime,
func(config *Config, nodeRuntime cre.NodeRuntime) (*Result, error) {
client, err := tnclient.NewClient(ctx, config.Endpoint,
tnclient.WithCRETransport(nodeRuntime, config.Endpoint),
)
if err != nil {
return nil, err
}
// All read operations work
streams, err := client.ListStreams(ctx, types.ListStreamsInput{})
actions, err := client.LoadActions()
records, err := actions.GetRecord(ctx, getRecordInput)
return &Result{Records: records}, nil
},
cre.ConsensusAggregationFromTags[*Result](),
).Await()
}When to use:
- CRE workflows requiring read-only access
- Listing streams
- Reading records
- Querying data
Convenience function combining signer and CRE transport configuration for write operations.
func WithCRETransportAndSigner(runtime cre.NodeRuntime, endpoint string, signer auth.Signer) OptionParameters:
runtime(cre.NodeRuntime): The NodeRuntime fromcre.RunInNodeMode()endpoint(string): TRUF.NETWORK gateway URLsigner(auth.Signer): Cryptographic signer for transactions
Build Requirement: Must use //go:build wasip1 tag
Example:
//go:build wasip1
package main
import (
"github.com/smartcontractkit/cre-sdk-go/cre"
"github.com/trufnetwork/kwil-db/core/crypto/auth"
"github.com/trufnetwork/sdk-go/core/tnclient"
)
func onTrigger(config *Config, runtime cre.Runtime) (*Result, error) {
return cre.RunInNodeMode(config, runtime,
func(config *Config, nodeRuntime cre.NodeRuntime) (*Result, error) {
// Create signer
signer := &auth.EthPersonalSigner{Key: privateKey}
// Create client with both transport and signer
client, err := tnclient.NewClient(ctx, config.Endpoint,
tnclient.WithCRETransportAndSigner(nodeRuntime, config.Endpoint, signer),
)
if err != nil {
return nil, err
}
// Now you can perform write operations
actions, err := client.LoadActions()
// Insert records
txHash, err := actions.InsertRecords(ctx, types.InsertRecordsInput{
DataProvider: config.DataProvider,
StreamId: config.StreamId,
Records: [][]interface{}{
{"value1", "value2"},
},
})
// Wait for transaction confirmation
result, err := client.WaitTx(ctx, txHash, 2*time.Second)
return &Result{TxHash: txHash}, nil
},
cre.ConsensusAggregationFromTags[*Result](),
).Await()
}When to use:
- CRE workflows requiring write access
- Inserting records
- Deploying streams
- Any operation requiring transaction signing
Equivalent to:
client, err := tnclient.NewClient(ctx, endpoint,
tnclient.WithSigner(signer),
tnclient.WithCRETransport(nodeRuntime, endpoint),
)All CRE-specific code must include the build tag:
//go:build wasip1
package mainCompilation:
# Build for CRE (WASM)
GOOS=wasip1 GOARCH=wasm go build -o workflow.wasm
# Regular build (excludes CRE code)
go build- Build tag required: All files using CRE transport must have
//go:build wasip1 - No net/http: Standard HTTP client not available in WASM
- Context handling: Use
context.WithTimeoutfor all operations - Error handling: Implement robust error handling for network operations
📖 Complete Guide: CRE Integration Guide
🎯 Working Example: examples/truf-cre-demo/
🔗 CRE Documentation: docs.chain.link/cre
GetKwilClient() - Access underlying GatewayClient (HTTP transport only):
// For advanced use cases requiring low-level control
if gwClient := client.GetKwilClient(); gwClient != nil {
// Direct GatewayClient access for advanced scenarios
result, err := gwClient.Call(ctx, "", "custom_action", args)
}
// Returns nil for non-HTTP transportsImportant:
GetKwilClient()is provided for advanced use cases that require direct low-level access. For most scenarios, prefer using the high-level Client methods which are transport-agnostic.
The SDK uses a pluggable transport layer that allows different communication implementations:
- HTTPTransport (default): Standard
net/httpcommunication with the TRUF.NETWORK - Custom transports: For specialized runtime environments (e.g., Chainlink CRE)
- Mock transports: For testing without network dependencies
This abstraction enables the SDK to work in various runtime environments while maintaining a consistent, high-level API. All Client methods work transparently with any transport implementation.
Critical: All TN operations are asynchronous by default. They return success when transactions enter the mempool, NOT when they're executed on-chain.
📚 Complete Example: See
examples/transaction-lifecycle-example/main.gofor a comprehensive demonstration of safe transaction patterns with detailed explanations.
Common Race Condition:
// ❌ DANGEROUS - Race condition
deployTx, _ := tnClient.DeployStream(ctx, streamId, types.StreamTypePrimitive)
insertTx, _ := primitiveActions.InsertRecord(ctx, input) // Might fail!Two Solutions for Safe Operations:
Waits for a transaction to be mined and confirmed. Always check the Result.Code to detect failures:
import (
kwiltypes "github.com/trufnetwork/kwil-db/core/types"
// ... other imports
)
// Deploy stream
deployTx, err := tnClient.DeployStream(ctx, streamId, types.StreamTypePrimitive)
if err != nil {
return err
}
// Wait for deployment to complete
txResponse, err := tnClient.WaitForTx(ctx, deployTx, time.Second*5)
if err != nil {
return err
} else if txResponse.Result.Code != uint32(kwiltypes.CodeOk) {
return fmt.Errorf("deployment failed: %d", txResponse.Result.Code)
}
// Now safe to proceed
insertTx, err := primitiveActions.InsertRecord(ctx, input)For operations that support TxOpt parameters, use WithSyncBroadcast(true):
import (
client "github.com/trufnetwork/kwil-db/core/client/types"
// ... other imports
)
// Synchronous record insertion (waits for mining)
insertTx, err := primitiveActions.InsertRecord(ctx, input,
client.WithSyncBroadcast(true))
// Synchronous taxonomy update (waits for mining)
taxonomyTx, err := composedActions.InsertTaxonomy(ctx, taxonomy,
client.WithSyncBroadcast(true))Note: DeployStream and DestroyStream don't support TxOpt, so use WaitForTx with them.
Deploy a new stream (primitive or composed):
streamId := util.GenerateStreamId("my-economic-stream")
txHash, err := tnClient.DeployStream(
ctx,
streamId,
types.StreamTypePrimitive
)Deploy a new stream and apply per-stream options. Use this instead of
DeployStream when you need non-default behavior — currently the
AllowZeros toggle for streams where value=0 is a meaningful measurement.
txHash, err := tnClient.DeployStreamWithOptions(
ctx,
streamId,
types.StreamTypePrimitive,
tnclient.DeployStreamOptions{AllowZeros: true},
)AllowZeros defaults to false — zeros are dropped at insert time and
excluded from get_record results, matching the historical behavior. The
toggle can be flipped post-deployment via IAction.SetAllowZeros.
Owner-gated mutator and public reader for the per-stream allow_zeros
flag. The flip is forward-only — historical inserts are not rewritten.
action, _ := tnClient.LoadActions()
_, _ = action.SetAllowZeros(ctx, tnClient.OwnStreamLocator(streamId), true)
current, _ := action.GetAllowZeros(ctx, tnClient.OwnStreamLocator(streamId))
// current == trueStreamDefinition (used with BatchDeployStreams) also exposes
AllowZeros per definition; the zero-value field defaults to false so
pre-existing batch callers continue to deploy zero-filtered streams.
Remove an existing stream:
txHash, err := tnClient.DestroyStream(ctx, streamId)Load an existing primitive stream:
primitiveStream, err := tnClient.LoadPrimitiveStream(
tnClient.OwnStreamLocator(streamId)
)Load an existing composed stream:
composedStream, err := tnClient.LoadComposedStream(
tnClient.OwnStreamLocator(streamId)
)Generate a stream locator using the current client's address:
streamLocator := tnClient.OwnStreamLocator(streamId)Retrieve the client's Ethereum address:
clientAddress := tnClient.Address()
addressString := clientAddress.String()func createAndManageStream(ctx context.Context, tnClient *tnclient.Client) error {
// Generate unique stream ID
streamId := util.GenerateStreamId("market-data-stream")
// Deploy stream
deployTx, err := tnClient.DeployStream(
ctx,
streamId,
types.StreamTypePrimitive,
)
if err != nil {
return fmt.Errorf("stream deployment failed: %v", err)
}
// Wait for deployment confirmation
txRes, err := tnClient.WaitForTx(ctx, deployTx, time.Second * 5)
if err != nil {
return fmt.Errorf("deployment confirmation failed: %v", err)
} else if txRes.Result.Code != uint32(kwiltypes.CodeOk) {
return fmt.Errorf("deployment failed: %s", txRes.Result.Log)
}
// Load the stream
primitiveStream, err := tnClient.LoadPrimitiveStream(
tnClient.OwnStreamLocator(streamId)
)
if err != nil {
return fmt.Errorf("stream loading failed: %v", err)
}
// Perform stream operations...
return nil
}- Always handle errors
- Use appropriate context timeouts
- Log important transactions
- Implement retry mechanisms
- Ensure proper error handling and logging
The Stream Interface is the core abstraction for data streams in the TRUF.NETWORK ecosystem. It provides a comprehensive set of methods for managing stream lifecycle, visibility, and access control.
- Immutable Data: Streams store data points that cannot be altered once recorded
- Visibility Control: Fine-grained access management
- Flexible Querying: Multiple methods for data retrieval
- Permissions Management: Granular control over stream access
- Unified Data Types: All stream data operations return
StreamResultorActionResultfor consistency
The SDK uses a unified approach for all stream data operations:
- StreamResult: Core data structure with
EventTimeandValuefields - ActionResult: Contains an array of
StreamResultplusCacheMetadata - All data retrieval methods (
GetRecord,GetIndex,GetIndexChange) returnActionResult - This unified approach eliminates the need for separate
StreamIndexandStreamIndexChangetypes, and provides cache metadata by default
All stream data operations return cache metadata that provides insights into query performance and cache behavior:
type CacheMetadata struct {
// Cache hit/miss statistics
CacheHit bool `json:"cache_hit"` // Whether the query hit the cache
CacheDisabled bool `json:"cache_disabled"` // Whether caching is disabled
// Cache height information
CacheHeight *int64 `json:"cache_height"` // Block height when data was cached
// Query context (populated by SDK)
StreamId string `json:"stream_id"` // Stream identifier
DataProvider string `json:"data_provider"` // Data provider address
From *int64 `json:"from"` // Query start time
To *int64 `json:"to"` // Query end time
FrozenAt *int64 `json:"frozen_at"` // Time-travel timestamp
RowsServed int `json:"rows_served"` // Number of rows returned
}Use cache metadata to optimize query performance:
result, err := primitiveActions.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
From: &from,
To: &to,
UseCache: &[]bool{true}[0],
})
if err != nil {
return err
}
// Analyze cache performance
if result.Metadata.CacheHit {
fmt.Printf("Cache hit! Served %d rows from cache\n", result.Metadata.RowsServed)
if result.Metadata.CacheHeight != nil {
fmt.Printf("Cache height: %d\n", *result.Metadata.CacheHeight)
}
} else {
fmt.Printf("Cache miss - data retrieved from database\n")
}For batch operations, use AggregateCacheMetadata to analyze overall cache performance:
// Collect metadata from multiple queries
var metadataList []types.CacheMetadata
// ... perform multiple queries and collect metadata ...
// Aggregate statistics
aggregated := types.AggregateCacheMetadata(metadataList)
fmt.Printf("Cache hit rate: %.2f%% (%d hits / %d queries)\n",
aggregated.CacheHitRate * 100,
aggregated.CacheHits,
aggregated.TotalQueries)
fmt.Printf("Total rows served: %d\n", aggregated.TotalRowsServed)GetRecord(ctx context.Context, input types.GetRecordInput) (types.ActionResult, error)Retrieves the raw time-series data for the specified stream, including cache metadata. Internally the SDK calls the on-chain action get_record, which automatically delegates to either get_record_primitive or get_record_composed depending on the type of the stream.
Returns types.ActionResult:
Results: Array ofStreamResultcontaining the actual dataMetadata: Cache performance and hit/miss statistics
Usage Example:
// Basic usage without caching
result, err := primitiveActions.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
From: &from,
To: &to,
})
if err != nil {
return err
}
// Access the results
for _, record := range result.Results {
fmt.Printf("Time: %d, Value: %s\n", record.EventTime, record.Value.String())
}
// Access cache metadata
fmt.Printf("Cache Hit: %v\n", result.Metadata.CacheHit)
fmt.Printf("Rows Served: %d\n", result.Metadata.RowsServed)Cache-Optimized Usage:
// Enable caching for improved performance
useCache := true
result, err := primitiveActions.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
From: &from,
To: &to,
UseCache: &useCache,
})
if err != nil {
return err
}
// Performance analysis
if result.Metadata.CacheHit {
fmt.Printf("✓ Cache hit! Query served in optimized time\n")
if result.Metadata.CacheHeight != nil {
fmt.Printf("Cache height: %d\n", *result.Metadata.CacheHeight)
}
} else {
fmt.Printf("○ Cache miss - data retrieved from source\n")
}
Behaviour
- If both
FromandToarenil, the latest data-point (LOCF-filled for composed streams) is returned. - Gap-filling logic is applied to primitive streams so that the value immediately preceding
Fromis included—this guarantees that visualisations can safely draw a continuous line. - For composed streams, the value is calculated recursively by aggregating the weighted values of all child primitives at each point in time. All permission checks (
read,compose) are enforced inside the SQL action.
Input fields (types.GetRecordInput):
DataProvider(string) Owner address of the stream.StreamId(string) ID of the stream (stxxxxxxxxxxxxxxxxxxxxxxxxxxxx).From,To(*int) Unix timestamp range (inclusive). Passnilto make the bound open-ended.FrozenAt(*int) Time-travel flag. Only events created on or before this block-timestamp are considered.BaseDate(*int) Base date for index calculations. If not provided, defaults to the stream'sdefault_base_timemetadata.Prefix(*string) Optional prefix filter for stream operations.UseCache(*bool) Enable/disable caching for this query. Whennil, defaults tofalse. Whentrue, enables server-side caching for improved performance on repeated queries.
Returned slice: each StreamResult contains
EventTime(int) Unix timestamp of the point.Value(apd.Decimal) Raw numeric value.
GetIndex(ctx context.Context, input types.GetIndexInput) ([]types.StreamResult, error)Returns a rebased index of the stream where the value at BaseDate (defaults to metadata key default_base_time) is normalised to 100.
Mathematically:
index(t) = 100 × value(t) / value(baseDate)
The same recursive aggregation, gap-filling and permission rules described in GetRecord apply here; the only difference is the final normalisation step.
Important details
- If
BaseDateisnilthe function will fall back to the first available record for the stream. - Division-by-zero protection is enforced in the SQL action—an error is thrown when the base value is 0.
- For single-point queries (
From==To==nil) only the latest indexed value is returned.
The returned types.ActionResult has the same structure as GetRecord but semantically represents an index instead of raw values, with each record's Value field containing the indexed data. Access the results via the Results field.
GetIndexChange(ctx context.Context, input types.GetIndexChangeInput) (types.ActionResult, error)Computes the percentage change of the index over a fixed time interval. Internally the SDK obtains the indexed series via get_index and then, for every returned row whose timestamp is t, finds the closest index value at or before t − timeInterval.
Formula:
Δindex(t) = ( index(t) − index(t − Δ ) ) / index(t − Δ ) × 100
where Δ = timeInterval (in seconds).
Only rows for which a matching previous value exists and is non-zero are emitted. This is performed server-side by the SQL action get_index_change, ensuring minimal bandwidth usage.
Typical use-cases:
- Day-over-day change: pass
86400seconds. - Year-on-year change: pass
31 536 000seconds.
Input fields (types.GetIndexChangeInput):
All fields from GetIndexInput plus:
TimeInterval(int) Interval in seconds used for the delta computation (mandatory).UseCache(*bool) Enable/disable caching for this query. Whennil, defaults tofalse. Whentrue, enables server-side caching for improved performance on repeated queries.
Return value: Returns types.ActionResult where each Value in the Results array represents percentage change, e.g. 2.5 means +2.5 %.
GetFirstRecord(ctx context.Context, input types.GetFirstRecordInput) (types.ActionResult, error)Retrieves the first record from a stream, optionally after a specified timestamp.
Parameters:
ctx: The context for the operationinput: GetFirstRecordInput containing query parameters
Input fields (types.GetFirstRecordInput):
DataProvider(string): Owner address of the streamStreamId(string): ID of the streamAfter(*int): Optional timestamp to search after. If provided, returns the first record after this timeFrozenAt(*int): Time-travel flag. Only events created on or before this block-timestamp are consideredUseCache(*bool): Enable/disable caching for this query. Whennil, defaults tofalse
Returns types.ActionResult:
Results: Array containing a singleStreamResultwith the first recordMetadata: Cache performance and hit/miss statistics
Usage Example:
// Get the very first record in a stream
result, err := primitiveActions.GetFirstRecord(ctx, types.GetFirstRecordInput{
DataProvider: provider,
StreamId: streamId,
UseCache: &[]bool{true}[0],
})
if err != nil {
return err
}
if len(result.Results) > 0 {
firstRecord := result.Results[0]
fmt.Printf("First record: Time=%d, Value=%s\n",
firstRecord.EventTime, firstRecord.Value.String())
}
// Get the first record after a specific timestamp
after := int(time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC).Unix())
result, err = primitiveActions.GetFirstRecord(ctx, types.GetFirstRecordInput{
DataProvider: provider,
StreamId: streamId,
After: &after,
UseCache: &[]bool{true}[0],
})SetReadVisibility(ctx context.Context, visibility util.VisibilityEnum) (transactions.TxHash, error)Sets the read visibility of the stream.
Parameters:
ctx: The context for the operation.visibility: The visibility setting (Public,Private).
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
SetComposeVisibility(ctx context.Context, visibility util.VisibilityEnum) (transactions.TxHash, error)Sets the compose visibility of the stream.
Parameters:
ctx: The context for the operation.visibility: The visibility setting (Public,Private).
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
AllowReadWallet(ctx context.Context, wallet util.EthereumAddress) (transactions.TxHash, error)Allows a wallet to read the stream.
Parameters:
ctx: The context for the operation.wallet: The Ethereum address of the wallet.
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
DisableReadWallet(ctx context.Context, wallet util.EthereumAddress) (transactions.TxHash, error)Disables a wallet from reading the stream.
Parameters:
ctx: The context for the operation.wallet: The Ethereum address of the wallet.
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
AllowComposeStream(ctx context.Context, locator StreamLocator) (transactions.TxHash, error)Allows a stream to use this stream as a child.
Parameters:
ctx: The context for the operation.locator: The locator of the composed stream.
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
DisableComposeStream(ctx context.Context, locator StreamLocator) (transactions.TxHash, error)Disables a stream from using this stream as a child.
Parameters:
ctx: The context for the operation.- `locator": The locator of the composed stream.
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
CallProcedure(ctx context.Context, procedure string, args []any) (*kwiltypes.QueryResult, error)Invokes a read-only stored procedure on the underlying database and returns a QueryResult that you can inspect or decode into typed structs using contractsapi.DecodeCallResult[T].
Parameters:
ctx: Operation context.procedure: The name of the stored procedure to execute.args: A positional slice ([]any) containing the arguments expected by the procedure. Usenilfor optional parameters you wish to skip.
Returns:
*kwiltypes.QueryResult: The raw query result.error: An error if the call fails.
// Load the generic Action API
actions, _ := tnClient.LoadActions()
// Prepare arguments
from := int(time.Now().AddDate(0, 0, -7).Unix())
to := int(time.Now().Unix())
args := []any{from, to, nil, nil, 31_536_000} // 1-year interval
// Call the procedure
result, err := actions.CallProcedure(ctx, "get_divergence_index_change", args)
if err != nil {
return err
}
fmt.Println("Columns:", result.ColumnNames)
for _, row := range result.Values {
fmt.Println(row)
}GetHistory(ctx context.Context, input types.GetHistoryInput) ([]types.BridgeHistory, error)Retrieves the transaction history for a wallet on a specific bridge.
Parameters:
ctx: Operation context.input: Input containing:BridgeIdentifier: The unique identifier of the bridge (e.g., "hoodi_tt2")Wallet: The wallet address to queryLimit: Max number of records to return (optional, default 20)Offset: Number of records to skip (optional, default 0)
Returns:
[]types.BridgeHistory: List of history recordserror: Error if query fails
Example:
history, err := client.GetHistory(ctx, types.GetHistoryInput{
BridgeIdentifier: "hoodi_tt2",
Wallet: "0x...",
})type BridgeHistory struct {
Type string `json:"type"` // "deposit" or "withdrawal"
Amount string `json:"amount"` // NUMERIC(78,0) as string
FromAddress []byte `json:"from_address"` // Sender address (if available)
ToAddress []byte `json:"to_address"` // Recipient address
InternalTxHash []byte `json:"internal_tx_hash"` // Kwil TX hash
ExternalTxHash []byte `json:"external_tx_hash"` // Ethereum TX hash
Status string `json:"status"` // "completed", "claimed", "pending_epoch"
BlockHeight int64 `json:"block_height"` // Kwil block height
BlockTimestamp int64 `json:"block_timestamp"` // Kwil block timestamp
ExternalBlockHeight *int64 `json:"external_block_height"` // Ethereum block height
}The SDK provides intelligent caching to optimize query performance:
When to Use Caching:
- Repeated queries with identical parameters
- Dashboard or monitoring applications
- Data visualization with frequent refreshes
- Batch processing where data consistency is acceptable
Cache Behavior:
- Cache is pre-configured for specific streams by node operators
- No automatic invalidation when new data arrives - cache refreshes periodically based on operator configuration
- When
FrozenAtorBaseDateparameters are specified, cache is bypassed - Cache date is returned allowing users to determine acceptable data freshness
- Users can contact node operators for additional cached streams or host their own node
Performance Tips:
// 1. Use caching for repeated queries
useCache := true
result, err := stream.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
UseCache: &useCache,
})
// 2. Monitor cache hit rates (batch example)
if aggregated.CacheHitRate < 0.5 {
log.Printf("Low cache hit rate: %.2f%%", aggregated.CacheHitRate*100)
}
// 3. Analyze cache height for data consistency
if result.Metadata.CacheHeight != nil {
fmt.Printf("Data cached at block height: %d\n", *result.Metadata.CacheHeight)
}For multiple stream queries, leverage batch operations and cache aggregation:
// Batch cache analysis
var allMetadata []types.CacheMetadata
// Perform multiple queries
for _, streamId := range streamIds {
result, err := stream.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
UseCache: &[]bool{true}[0],
})
if err != nil {
continue
}
allMetadata = append(allMetadata, result.Metadata)
}
// Analyze overall performance
aggregated := types.AggregateCacheMetadata(allMetadata)
fmt.Printf("Overall cache performance: %.2f%% hit rate\n",
aggregated.CacheHitRate*100)Time Range Queries:
- Use specific time ranges instead of open-ended queries when possible
- Note:
FrozenAtparameter bypasses cache - use for consistent historical data when cache freshness is not suitable - Consider pagination for large datasets
Index Operations:
- Note:
BaseDateparameter bypasses cache - use when precise index calculations are required - For frequently accessed base dates, consider working with node operators to ensure proper caching
- Monitor cache metadata to understand data freshness for your use case
- Always handle errors
- Use context with appropriate timeouts
- Validate wallet addresses
- Log permission changes
- Implement retry mechanisms
- Use caching strategically for improved performance
- Monitor cache hit rates and data freshness
- Aggregate cache metadata for batch operations
- Visibility changes are blockchain transactions
- Cache metadata is always returned, even when caching is disabled
- Cache refresh intervals are configured by node operators
- Cache is bypassed when
FrozenAtorBaseDateparameters are used
Primitive streams are the foundational data sources in the TRUF.NETWORK ecosystem. They represent raw, unprocessed data points that can be used directly or as components in more complex composed streams.
- Direct data input mechanism
- Immutable record storage
InsertRecords(ctx context.Context, inputs []types.InsertRecordInput) (transactions.TxHash, error)Allows insertion of one or multiple records into a primitive stream.
type InsertRecordInput struct {
DataProvider string // Address of the data provider
StreamId string // Unique stream identifier
EventTime int // Unix timestamp of the record
Value float64 // Numeric value of the record
}// Insert a single record
records := []types.InsertRecordInput{
{
DataProvider: myAddress,
StreamId: "my-economic-stream",
EventTime: int(time.Now().Unix()),
Value: 105.75, // Economic indicator value
},
}
txHash, err := primitiveStream.InsertRecords(ctx, records)-
Consistent Timestamps
- Use UTC timestamps
- Handle potential time zone complexities
-
Data Validation
- Validate input values before insertion
-
Error Handling
- Implement retry mechanisms
- Log insertion failures
- Batch record insertions when possible
- For bulk ingestion of hundreds or thousands of records, use
BulkInserter— it pipelines broadcasts so admission (~50ms) becomes the rate limit instead of inclusion (~1–2s per block)
For high-throughput ingestion (hundreds or thousands of records), use
BulkInserter. It chunks records to the protocol's per-tx cap and broadcasts
each chunk fire-and-forget with a locally-cached nonce — typically reducing
ingestion time from hours to minutes for a single signer.
inserter, err := tnClient.LoadBulkInserter(opts ...contractsapi.BulkInserterOption)Returns a *contractsapi.BulkInserter wired to the client's primitive action,
gateway client, and signer. Requires HTTP transport.
contractsapi.WithBatchSize(n int) // records per tx; default 10 (protocol cap)
contractsapi.WithMaxInflight(n int) // broadcasts queued before forced drain; default 200
contractsapi.WithMaxAttempts(n int) // initial + retries per chunk on non-catchup transient
// errors (invalid nonce, mempool full); default 15
contractsapi.WithCatchupMaxAttempts(n int) // initial + retries per chunk on "node is catching up"
// rejections (separate budget from above because catch-up
// events on a public RPC routinely run minutes long);
// default 20
contractsapi.WithInfraMaxAttempts(n int) // initial + retries per chunk on pre-broadcast infra errors
// ("no available backend", "connection refused",
// "no such host"); reuses retryBackoff for inter-attempt
// sleep. Mid-request errors (EOF, connection reset, context
// deadline) deliberately stay fatal — they may have fired
// post-admit and retrying risks duplicate inserts.
// Default: 10 (90s wait per chunk before bubbling up)
contractsapi.WithRetryBackoff(d time.Duration) // base backoff for invalid-nonce / mempool-full;
// actual delay = backoff * (attempt + 1); default 2s
contractsapi.WithCatchupBackoff(d time.Duration) // base backoff when the backend rejects with "node is
// catching up"; actual delay = backoff * (attempt + 1).
// With default 15s and CatchupMaxAttempts=20, the loop
// does 20 attempts with 19 backoffs (the last attempt
// fails without sleeping), so worst-case wait per chunk
// is 15+30+...+285s ≈ 47.5 min; default 15s
contractsapi.WithProgressLogEveryN(n int) // emit INFO progress line every N chunks (chunks done /
// total, rows done, elapsed, chunks/sec, ETA);
// 0 disables; default 0
contractsapi.WithWaitInterval(d time.Duration) // polling interval for WaitTx during drain; default 1s
contractsapi.WithLogger(log.Logger) // structured logger; default discardhashes, err := inserter.InsertAll(ctx, inputs []types.InsertRecordInput) ([]kwiltypes.Hash, error)Chunks inputs by batchSize, broadcasts each chunk pipelined (no wait
between broadcasts), and drains the inflight queue every maxInflight plus
once at the end. Returns hashes in submission order.
inserter, err := tnClient.LoadBulkInserter()
if err != nil { /* ... */ }
inputs := []types.InsertRecordInput{
{DataProvider: addr, StreamId: streamID, EventTime: 1700000000, Value: 1.5},
// ... thousands more
}
hashes, err := inserter.InsertAll(ctx, inputs)
if err != nil {
var bie *contractsapi.BulkInsertError
if errors.As(err, &bie) {
if bie.DrainFailure {
// All broadcasts succeeded; only WaitTx failed.
// hashes contains every submitted tx — investigate or poll later.
} else {
// Broadcast failed at chunk bie.FailedChunkIndex.
// Resume from inputs[bie.FailedChunkIndex*batchSize:].
}
}
}- One BulkInserter per signer key. The cache is per-instance; concurrent
inserters from the same signer will collide on nonces because the mempool
admits transactions strictly in nonce order
(
kwil-db/node/txapp/mempool.go:180-204). - Sequential per signer, not concurrent. Out-of-order HTTP arrival from
one signer triggers
ErrInvalidNoncerejections; the helper is single-threaded by design. - Different signers are independent. Per-signer nonces are independent, so multiple BulkInserters with different keys run safely in parallel.
BulkInsertError distinguishes broadcast failures from drain failures:
type BulkInsertError struct {
FailedChunkIndex int // for broadcast failures: the chunk that failed
// for drain failures: total chunks broadcast
DrainFailure bool // true if all broadcasts succeeded but WaitTx failed
LastError error
}Unwrap() exposes the underlying error, so errors.Is(err, kwiltypes.ErrInvalidNonce) works.
- Source:
core/contractsapi/bulk_inserter.go - Working example:
examples/bulk_insert_example - Mirrors the cached-nonce pattern from
node/extensions/tn_attestation/extension.go(kwilteam/node#1356) which solves the same problem on the node side for the attestation submitter.
The Composed Stream interface provides advanced capabilities for creating and managing aggregated data streams in the TRUF.NETWORK ecosystem.
A taxonomy defines how multiple primitive or composed streams are combined to create a new, more complex stream. Key components include:
- Parent Stream: The new composed stream being created
- Child Streams: Source streams used for aggregation
- Weights: Relative importance of each child stream
taxonomy := types.Taxonomy{
ParentStream: composedStreamLocator,
TaxonomyItems: []types.TaxonomyItem{
{
ChildStream: primitiveStream1Locator,
Weight: 0.6, // 60% contribution
},
{
ChildStream: primitiveStream2Locator,
Weight: 0.4, // 40% contribution
},
},
StartDate: &startTimestamp,
}DescribeTaxonomies(ctx context.Context, params types.DescribeTaxonomiesParams) ([]types.TaxonomyItem, error)Retrieves the current taxonomy configuration for a composed stream. This is the key method for discovering how composed streams aggregate their child streams.
Parameters:
ctx: Operation contextparams: Taxonomy description parametersStream: Stream locator (identifies the composed stream)LatestVersion: Flag to return only the most recent taxonomy version
Returns:
- List of
TaxonomyItemobjects containing:ChildStream: Locator of each child streamWeight: Weight/contribution of each child stream (0.0 to 1.0)
- Error if retrieval fails
Example Usage:
// Get the latest taxonomy for a composed stream
params := types.DescribeTaxonomiesParams{
Stream: tnClient.OwnStreamLocator(composedStreamId),
LatestVersion: true,
}
taxonomyItems, err := composedActions.DescribeTaxonomies(ctx, params)
if err != nil {
log.Printf("Failed to describe taxonomies: %v", err)
return
}
fmt.Printf("Taxonomy for stream %s:\n", composedStreamId.String())
for _, item := range taxonomyItems {
fmt.Printf(" Child: %s (Weight: %.2f)\n",
item.ChildStream.StreamId.String(), item.Weight)
}SetTaxonomy(ctx context.Context, taxonomies []types.TaxonomyItem) (kwiltypes.Hash, error)Configures or updates the taxonomy for a composed stream.
Parameters:
ctx: Operation contexttaxonomies: Taxonomy configuration
Returns:
- Transaction hash
- Error if setting taxonomy fails
- Carefully design taxonomy weights
Always check for errors when working with composed streams:
- Validate taxonomy before setting
- Handle potential child stream access issues
- Manage weight distribution carefully
// Create a composed stream aggregating market sentiment and economic indicators
composedStreamId := util.GenerateStreamId("market-composite-index")
err := tnClient.DeployStream(ctx, composedStreamId, types.StreamTypeComposed)
composedActions, err := tnClient.LoadComposedActions()
taxonomyTx, err := composedActions.InsertTaxonomy(ctx, types.Taxonomy{
ParentStream: tnClient.OwnStreamLocator(composedStreamId),
TaxonomyItems: []types.TaxonomyItem{
{
ChildStream: sentimentStreamLocator,
Weight: 0.6,
},
{
ChildStream: economicIndicatorLocator,
Weight: 0.4,
},
},
})The Transaction Actions Interface provides methods for querying transaction history, fees, and distributions from the TRUF.NETWORK ledger. This interface is essential for auditing, analytics, and tracking fee distributions across the network.
- Query detailed transaction information by hash
- List transactions by wallet with flexible filtering
- Track fee distributions to validators and proposers
- Pagination support for large result sets
- Filter by transaction type (paid, received, or both)
func (c *Client) LoadTransactionActions() (*contractsapi.TransactionActions, error)Initializes the transaction actions interface for querying transaction data.
Returns:
*TransactionActions: Interface for transaction querieserror: Error if initialization fails
Example:
txActions, err := client.LoadTransactionActions()
if err != nil {
log.Fatalf("Failed to load transaction actions: %v", err)
}func (a *TransactionActions) GetTransactionEvent(
ctx context.Context,
input types.GetTransactionEventInput,
) (*types.TransactionEvent, error)Retrieves detailed information about a specific transaction by its hash.
Parameters:
ctx: Context for the operationinput: Input containing:TxID: Transaction hash (with or without0xprefix)
Returns:
*TransactionEvent: Complete transaction details including:TxID: Transaction hash (0x-prefixed)BlockHeight: Block number where transaction was includedMethod: Method name (e.g., "deployStream", "insertRecords")Caller: Ethereum address of the caller (lowercase, 0x-prefixed)FeeAmount: Total fee amount as string (handles large numbers)FeeRecipient: Primary fee recipient address (nullable)Metadata: Optional metadata JSON (nullable)FeeDistributions: Array of fee distributions showing who received what amount
error: Error if query fails or transaction not found
Example:
txEvent, err := txActions.GetTransactionEvent(ctx, types.GetTransactionEventInput{
TxID: "0xabcdef123456...",
})
if err != nil {
log.Fatalf("Failed to get transaction: %v", err)
}
fmt.Printf("Method: %s\n", txEvent.Method)
fmt.Printf("Caller: %s\n", txEvent.Caller)
fmt.Printf("Fee: %s wei\n", txEvent.FeeAmount)
fmt.Printf("Block: %d\n", txEvent.BlockHeight)
// Check fee distributions
for _, dist := range txEvent.FeeDistributions {
fmt.Printf(" → %s: %s wei\n", dist.Recipient, dist.Amount)
}func (a *TransactionActions) ListTransactionFees(
ctx context.Context,
input types.ListTransactionFeesInput,
) ([]types.TransactionFeeEntry, error)Lists transactions filtered by wallet address and mode, with pagination support.
Parameters:
ctx: Context for the operationinput: Input containing:Wallet: Ethereum address to query (required)Mode: Filter mode - one of:types.TransactionFeeModePaid: Transactions where wallet paid feestypes.TransactionFeeModeReceived: Transactions where wallet received fee distributionstypes.TransactionFeeModeBoth: All transactions involving the wallet
Limit: Maximum results to return (optional, default: 20, max: 1000)Offset: Pagination offset (optional, default: 0)
Returns:
[]TransactionFeeEntry: Array of transaction entries, each containing:TxID: Transaction hashBlockHeight: Block numberMethod: Method nameCaller: Caller addressTotalFee: Total fee amountFeeRecipient: Primary recipient (nullable)Metadata: Optional metadata (nullable)DistributionSequence: Distribution index (for multiple distributions)DistributionRecipient: Recipient address for this distribution (nullable)DistributionAmount: Amount for this distribution (nullable)
error: Error if query fails
Note: This method returns one row per fee distribution. If a transaction has multiple distributions, it will appear multiple times with different DistributionSequence values.
Example - List Fees Paid:
wallet := client.Address().Address()
limit := 10
entries, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: wallet,
Mode: types.TransactionFeeModePaid,
Limit: &limit,
})
if err != nil {
log.Fatalf("Failed to list fees: %v", err)
}
for _, entry := range entries {
fmt.Printf("%s: %s wei (block %d)\n",
entry.Method, entry.TotalFee, entry.BlockHeight)
}Example - Pagination:
limit := 20
offset := 0
// Get first page
page1, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: wallet,
Mode: types.TransactionFeeModeBoth,
Limit: &limit,
Offset: &offset,
})
// Get second page
offset = 20
page2, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: wallet,
Mode: types.TransactionFeeModeBoth,
Limit: &limit,
Offset: &offset,
})Example - Fees Received:
// Track fee distributions received by a validator
entries, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: validatorAddress,
Mode: types.TransactionFeeModeReceived,
Limit: &limit,
})
totalReceived := big.NewInt(0)
for _, entry := range entries {
if entry.DistributionAmount != nil {
amount, _ := new(big.Int).SetString(*entry.DistributionAmount, 10)
totalReceived.Add(totalReceived, amount)
}
}
fmt.Printf("Total fees received: %s wei\n", totalReceived.String())type TransactionEvent struct {
TxID string
BlockHeight int64
Method string
Caller string
FeeAmount string
FeeRecipient *string
Metadata *string
FeeDistributions []FeeDistribution
}type FeeDistribution struct {
Recipient string `json:"recipient"`
Amount string `json:"amount"`
}type TransactionFeeEntry struct {
TxID string
BlockHeight int64
Method string
Caller string
TotalFee string
FeeRecipient *string
Metadata *string
DistributionSequence int
DistributionRecipient *string
DistributionAmount *string
}type TransactionFeeMode string
const (
TransactionFeeModePaid TransactionFeeMode = "paid"
TransactionFeeModeReceived TransactionFeeMode = "received"
TransactionFeeModeBoth TransactionFeeMode = "both"
)// Calculate total fees paid by wallet in last 30 days
entries, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: myWallet,
Mode: types.TransactionFeeModePaid,
})
totalSpent := big.NewInt(0)
for _, entry := range entries {
amount, _ := new(big.Int).SetString(entry.TotalFee, 10)
totalSpent.Add(totalSpent, amount)
}
fmt.Printf("Total spent: %s wei\n", totalSpent.String())// Analyze transaction types and their costs
methodCounts := make(map[string]int)
methodCosts := make(map[string]*big.Int)
entries, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: myWallet,
Mode: types.TransactionFeeModePaid,
})
for _, entry := range entries {
methodCounts[entry.Method]++
if _, ok := methodCosts[entry.Method]; !ok {
methodCosts[entry.Method] = big.NewInt(0)
}
amount, _ := new(big.Int).SetString(entry.TotalFee, 10)
methodCosts[entry.Method].Add(methodCosts[entry.Method], amount)
}
for method, count := range methodCounts {
avgCost := new(big.Int).Div(methodCosts[method], big.NewInt(int64(count)))
fmt.Printf("%s: %d calls, avg cost %s wei\n", method, count, avgCost.String())
}// Monitor where your fees are going
txEvent, err := txActions.GetTransactionEvent(ctx, types.GetTransactionEventInput{
TxID: deployTxHash,
})
fmt.Printf("Transaction: %s\n", txEvent.TxID)
fmt.Printf("Total Fee: %s wei\n", txEvent.FeeAmount)
fmt.Println("\nFee Distributions:")
for i, dist := range txEvent.FeeDistributions {
fmt.Printf(" %d. %s: %s wei\n", i+1, dist.Recipient, dist.Amount)
}-
Error Handling: Always check for errors, especially for transaction not found
txEvent, err := txActions.GetTransactionEvent(ctx, input) if err != nil { if strings.Contains(err.Error(), "not found") { // Handle missing transaction } return err }
-
Pagination: Use reasonable page sizes to avoid overwhelming the API
limit := 100 // Good balance between API calls and memory
-
Large Numbers: Use
big.Intfor fee calculations to avoid overflowamount, ok := new(big.Int).SetString(entry.TotalFee, 10) if !ok { return fmt.Errorf("invalid fee amount: %s", entry.TotalFee) }
-
Context Timeout: Set reasonable timeouts for large queries
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()
Common errors and how to handle them:
entries, err := txActions.ListTransactionFees(ctx, input)
if err != nil {
switch {
case strings.Contains(err.Error(), "invalid wallet"):
// Handle invalid wallet address
case strings.Contains(err.Error(), "invalid mode"):
// Handle invalid mode value
case strings.Contains(err.Error(), "limit"):
// Handle limit out of range
default:
// Handle other errors
}
}The contractsapi package provides high-level utilities for decoding prediction market query components. This is essential for extracting market types (above, below, between) and threshold values from marketInfo.QueryComponents.
Decodes ABI-encoded query_components into a structured MarketData object.
Signature:
func DecodeMarketData(encoded []byte) (*MarketData, error)Parameters:
encoded([]byte): TheQueryComponentsfield from aMarketInfoobject.
Returns:
*MarketData: Structured market information.error: Error if decoding fails.
Example:
import "github.com/trufnetwork/sdk-go/core/contractsapi"
// market.QueryComponents is []byte from the node
data, err := contractsapi.DecodeMarketData(market.QueryComponents)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Type: %s\n", data.Type) // e.g. "above"
fmt.Printf("Thresholds: %v\n", data.Thresholds) // e.g. ["100000"]Decodes Kwil-native canonical bytes back into a slice of action arguments.
Signature:
func DecodeActionArgs(data []byte) ([]any, error) {Example:
args, err := contractsapi.DecodeActionArgs(argsBytes)
for i, arg := range args {
fmt.Printf("Arg %d: %v (Type: %T)\n", i, arg, arg)
}type MarketData struct {
DataProvider string `json:"data_provider"`
StreamID string `json:"stream_id"`
ActionID string `json:"action_id"`
Type string `json:"type"` // "above", "below", "between", "equals" or "unknown"
Thresholds []string `json:"thresholds"` // Formatted numeric values as strings
}The Attestation Actions Interface enables users to request cryptographically signed attestations of query results from TRUF.NETWORK validators. These signed attestations can be verified on-chain (e.g., in EVM smart contracts) to trustlessly prove that specific data existed at a particular block height.
- Cryptographic Verification: Signed by network validators using secp256k1
- Tamper-Proof: Immutable attestations linked to specific block heights
- EVM-Compatible: Can be verified in Solidity smart contracts
- Payload Parsing: Decode attestation data including timestamps and values
- Signature Recovery: Extract validator addresses from signatures
- DeFi Protocols: Verify off-chain data on-chain (oracle alternative)
- Prediction Markets: Settle bets with cryptographically verified results
- Insurance: Trigger payouts based on attested data
- Auditing: Prove data provenance and integrity
- Cross-Chain Bridges: Verify state across networks
Creates an attestation action handler for requesting and retrieving signed attestations.
Signature:
func (c *TNClient) LoadAttestationActions() (types.IAttestationAction, error)Returns:
types.IAttestationAction: Attestation action handlererror: Error if initialization fails
Example:
attestationActions, err := tnClient.LoadAttestationActions()
if err != nil {
log.Fatalf("Failed to load attestation actions: %v", err)
}The following methods are part of the types.IAttestationAction interface returned by LoadAttestationActions(). Call these methods on the attestation action handler.
Requests a signed attestation for a specific query. The validator will execute the query at the current block height and sign the results.
Signature:
func RequestAttestation(ctx context.Context, input types.RequestAttestationInput) (*types.RequestAttestationResult, error)Parameters:
types.RequestAttestationInput:
DataProvider(string): Data provider address (0x-prefixed, 42 chars)StreamID(string): Stream identifier (32 characters)ActionName(string): Action to attest (e.g., "get_record")Args([]any): Action arguments (will be canonically encoded)EncryptSig(bool): Must befalse(encryption not supported in MVP)MaxFee(string): Maximum fee willing to pay in wei (NUMERIC(78,0) as string)
Returns:
types.RequestAttestationResult:
RequestTxID(string): Transaction ID for this attestation request
Example:
// Request attestation for AI Index data from last 7 days
now := time.Now()
weekAgo := now.AddDate(0, 0, -7)
result, err := attestationActions.RequestAttestation(ctx, types.RequestAttestationInput{
DataProvider: "0x4710a8d8f0d845da110086812a32de6d90d7ff5c",
StreamID: "stai0000000000000000000000000000",
ActionName: "get_record",
Args: []any{
"0x4710a8d8f0d845da110086812a32de6d90d7ff5c",
"stai0000000000000000000000000000",
int64(weekAgo.Unix()),
int64(now.Unix()),
nil, // frozen_at (optional)
false, // use_cache (forced to false for attestations)
},
EncryptSig: false,
MaxFee: "100000000000000000000", // 100 TRUF
})
if err != nil {
log.Fatalf("Failed to request attestation: %v", err)
}
fmt.Printf("Request TX ID: %s\n", result.RequestTxID)Notes:
- Attestation requests require sufficient TRUF balance for fees
- The validator signs asynchronously (typically 1-2 blocks)
- Use
GetSignedAttestation()to retrieve the signed payload
Retrieves a complete signed attestation payload for a previous attestation request.
Signature:
func GetSignedAttestation(ctx context.Context, input types.GetSignedAttestationInput) (*types.SignedAttestationResult, error)Parameters:
types.GetSignedAttestationInput:
RequestTxID(string): Transaction ID fromRequestAttestation()
Returns:
types.SignedAttestationResult:
Payload([]byte): Canonical payload + 65-byte secp256k1 signature
Payload Format:
The payload consists of:
-
Canonical Fields (variable length):
- Version (1 byte)
- Algorithm (1 byte, 0 = secp256k1)
- Block Height (8 bytes, big-endian uint64)
- Data Provider (length-prefixed, big-endian uint32 + bytes)
- Stream ID (length-prefixed, big-endian uint32 + UTF-8)
- Action ID (2 bytes, big-endian uint16)
- Arguments (length-prefixed, big-endian uint32 + canonical encoding)
- Result (length-prefixed, big-endian uint32 + ABI-encoded data)
-
Signature (last 65 bytes):
- R component (32 bytes)
- S component (32 bytes)
- V recovery ID (1 byte, typically 27 or 28)
Example:
// Poll for signed attestation (max 30 seconds)
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
var signedResult *types.SignedAttestationResult
for {
select {
case <-ctx.Done():
log.Println("Timeout waiting for signature")
goto afterPoll
case <-ticker.C:
signed, err := attestationActions.GetSignedAttestation(ctx, types.GetSignedAttestationInput{
RequestTxID: result.RequestTxID,
})
if err == nil && signed != nil && len(signed.Payload) > 0 {
signedResult = signed
goto afterPoll
}
}
}
afterPoll:
if signedResult != nil {
fmt.Printf("Payload size: %d bytes\n", len(signedResult.Payload))
}Returns metadata for attestations, optionally filtered by requester address.
Signature:
func ListAttestations(ctx context.Context, input types.ListAttestationsInput) ([]types.AttestationMetadata, error)Parameters:
types.ListAttestationsInput:
Requester([]byte, optional): Filter by requester address (20 bytes)Limit(*int, optional): Max results (default/max 5000)Offset(*int, optional): Pagination offset (default 0)OrderBy(*string, optional): Sort order (see below)
Valid OrderBy values:
"created_height ASC"/"created_height DESC""signed_height ASC"/"signed_height DESC"
Returns:
Array of types.AttestationMetadata:
RequestTxID(string): Transaction ID of the attestation requestAttestationHash([]byte): Hash of the attestationRequester([]byte): Address that requested the attestation (20 bytes)CreatedHeight(int64): Block height when requestedSignedHeight(*int64): Block height when signed (nil if not yet signed)EncryptSig(bool): Whether signature is encrypted
Example:
// List recent attestations for current wallet
myAddress := tnClient.Address()
addressBytes, _ := hex.DecodeString(myAddress.Address()[2:])
limit := 10
attestations, err := attestationActions.ListAttestations(ctx, types.ListAttestationsInput{
Requester: addressBytes,
Limit: &limit,
OrderBy: strPtr("created_height desc"),
})
if err != nil {
log.Fatalf("Failed to list attestations: %v", err)
}
fmt.Printf("Found %d attestations\n", len(attestations))
for i, att := range attestations {
status := "unsigned"
if att.SignedHeight != nil {
status = fmt.Sprintf("signed at height %d", *att.SignedHeight)
}
fmt.Printf("%d. TX: %s, Status: %s\n", i+1, att.RequestTxID, status)
}Parses a canonical attestation payload (without signature) into structured data.
Package: github.com/trufnetwork/sdk-go/core/contractsapi
Signature:
func ParseAttestationPayload(payload []byte) (*types.ParsedAttestationPayload, error)Parameters:
payload([]byte): Canonical payload without the 65-byte signature
Returns:
types.ParsedAttestationPayload:
Version(uint8): Payload format versionAlgorithm(uint8): Signature algorithm (0 = secp256k1)BlockHeight(uint64): Block height when attestedDataProvider(string): Data provider address (0x-prefixed hex)StreamID(string): Stream identifierActionID(uint16): Action identifierArguments([]any): Decoded action argumentsResult([]types.DecodedRow): Decoded query results
types.DecodedRow:
Values([]any): Array of decoded column values- For attestation results:
Values[0]is timestamp (string),Values[1]is value (string)
- For attestation results:
Example:
import (
"crypto/sha256"
"github.com/trufnetwork/kwil-db/core/crypto"
"github.com/trufnetwork/sdk-go/core/contractsapi"
)
// Split payload into canonical part and signature
signedPayload := signedResult.Payload
canonicalPayload := signedPayload[:len(signedPayload)-65]
signature := signedPayload[len(signedPayload)-65:]
// Parse the canonical payload
parsed, err := contractsapi.ParseAttestationPayload(canonicalPayload)
if err != nil {
log.Fatalf("Failed to parse payload: %v", err)
}
// Access parsed fields
fmt.Printf("Version: %d\n", parsed.Version)
fmt.Printf("Block Height: %d\n", parsed.BlockHeight)
fmt.Printf("Data Provider: %s\n", parsed.DataProvider)
fmt.Printf("Stream ID: %s\n", parsed.StreamID)
// Access query results
fmt.Printf("Found %d rows:\n", len(parsed.Result))
for i, row := range parsed.Result {
timestamp := row.Values[0] // Unix timestamp as string
value := row.Values[1] // 18-decimal value as string
fmt.Printf("Row %d: Timestamp=%v, Value=%v\n", i+1, timestamp, value)
}To verify the attestation signature and recover the validator's address:
import (
"crypto/sha256"
"github.com/trufnetwork/kwil-db/core/crypto"
)
// Extract canonical payload and signature
canonicalPayload := signedPayload[:len(signedPayload)-65]
signature := signedPayload[len(signedPayload)-65:]
// Hash the canonical payload with SHA256
hash := sha256.Sum256(canonicalPayload)
// Adjust signature format for recovery
// Attestation signatures use Ethereum format (V=27/28)
// kwil-db expects raw format (V=0-3)
adjustedSig := make([]byte, 65)
copy(adjustedSig, signature)
if signature[64] >= 27 {
adjustedSig[64] = signature[64] - 27
}
// Recover validator public key
pubKey, err := crypto.RecoverSecp256k1KeyFromSigHash(hash[:], adjustedSig)
if err != nil {
log.Fatalf("Failed to recover public key: %v", err)
}
// Derive Ethereum address
validatorAddr := crypto.EthereumAddressFromPubKey(pubKey)
fmt.Printf("Validator Address: 0x%x\n", validatorAddr)Important Notes:
- Attestation signatures use Ethereum format with V=27/28
- kwil-db's
RecoverSecp256k1KeyFromSigHashexpects V=0-3 (raw format) - You must subtract 27 from V before calling the recovery function
- The recovered address identifies which validator signed the attestation
Decoded attestation payload structure.
type ParsedAttestationPayload struct {
Version uint8 `json:"version"`
Algorithm uint8 `json:"algorithm"` // 0 = secp256k1
BlockHeight uint64 `json:"blockHeight"`
DataProvider string `json:"dataProvider"` // 0x-prefixed hex
StreamID string `json:"streamId"`
ActionID uint16 `json:"actionId"`
Arguments []any `json:"arguments"`
Result []DecodedRow `json:"result"`
}Represents a decoded row from attestation query results.
type DecodedRow struct {
Values []any `json:"values"`
}For attestation results:
Values[0]: Unix timestamp as string (e.g., "1704067200")Values[1]: 18-decimal fixed-point value as string (e.g., "77.051806494788211665")
Attestation results use ABI encoding (Ethereum format):
abi.encode(uint256[] timestamps, int256[] values)Details:
timestamps: Array of Unix timestamps as uint256values: Array of 18-decimal fixed-point integers as int256- Negative values are properly handled (two's complement)
Example decoded result:
// Raw ABI bytes → Decoded rows
[
{Values: ["1704067200", "77.051806494788211665"]},
{Values: ["1704153600", "80.0"]},
{Values: ["1704240000", "75.5"]},
]See examples/attestation_example/main.go for a complete working example demonstrating:
- Request Attestation: Submit attestation request for AI Index data
- Poll for Signature: Wait for validator to sign (1-2 blocks)
- Retrieve Payload: Get the complete signed attestation
- Verify Signature: Recover validator address from signature
- Parse Payload: Decode attestation fields and query results
- Display Results: Show all attested datapoints with full precision
Key Code Snippets:
// 1. Request attestation
result, err := attestationActions.RequestAttestation(ctx, types.RequestAttestationInput{
DataProvider: "0x4710a8d8f0d845da110086812a32de6d90d7ff5c",
StreamID: "stai0000000000000000000000000000",
ActionName: "get_record",
Args: args,
EncryptSig: false,
MaxFee: "100000000000000000000",
})
// 2. Wait for signing (poll with timeout)
signed, err := attestationActions.GetSignedAttestation(ctx, types.GetSignedAttestationInput{
RequestTxID: result.RequestTxID,
})
// 3. Split payload
canonicalPayload := signed.Payload[:len(signed.Payload)-65]
signature := signed.Payload[len(signed.Payload)-65:]
// 4. Verify signature
hash := sha256.Sum256(canonicalPayload)
adjustedSig := make([]byte, 65)
copy(adjustedSig, signature)
if signature[64] >= 27 {
adjustedSig[64] = signature[64] - 27
}
pubKey, _ := crypto.RecoverSecp256k1KeyFromSigHash(hash[:], adjustedSig)
validatorAddr := crypto.EthereumAddressFromPubKey(pubKey)
// 5. Parse payload
parsed, _ := contractsapi.ParseAttestationPayload(canonicalPayload)
// 6. Display results
for i, row := range parsed.Result {
fmt.Printf("Row %d: Timestamp=%v, Value=%v\n",
i+1, row.Values[0], row.Values[1])
}To verify attestations in Solidity smart contracts:
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
contract AttestationVerifier {
address public validatorAddress;
function verifyAttestation(
bytes memory canonicalPayload,
bytes memory signature
) public view returns (bool) {
require(signature.length == 65, "Invalid signature length");
// Hash the canonical payload
bytes32 digest = sha256(canonicalPayload);
// Extract r, s, v from signature using assembly
bytes32 r;
bytes32 s;
uint8 v;
assembly {
r := mload(add(signature, 32))
s := mload(add(signature, 64))
v := byte(0, mload(add(signature, 96)))
}
// Recover signer address
address signer = ecrecover(digest, v, r, s);
// Verify it matches the known validator
return signer == validatorAddress;
}
function parseValue(bytes memory payload) public pure returns (uint256) {
// Parse and extract specific fields from canonical payload
// Implementation depends on your use case
}
}Usage Pattern:
- User requests attestation off-chain
- Validator signs the query results
- User submits signed payload to smart contract
- Contract verifies signature using
ecrecover - Contract parses payload to extract attested data
- Contract executes logic based on verified data
-
Always Verify Signatures
- Never trust attestation payloads without verifying the validator signature
- Check that the recovered address matches a known validator
-
Handle Async Signing
- Poll with timeout (typically 30 seconds is sufficient)
- Check for errors during polling (attestation may fail)
-
Fee Management
- Ensure sufficient TRUF balance before requesting attestations
- Set reasonable
MaxFeevalues to avoid overpaying
-
Parse Results Carefully
- Timestamps are Unix seconds as strings
- Values are 18-decimal fixed-point as strings
- Convert to appropriate types for your use case
-
Store Request IDs
- Keep track of
RequestTxIDfor later retrieval - Use
ListAttestations()to view attestation history
- Keep track of
-
Test Locally First
- Use local node for development
- Test with mainnet only when ready
Common errors and how to handle them:
// Requesting attestation
result, err := attestationActions.RequestAttestation(ctx, input)
if err != nil {
switch {
case strings.Contains(err.Error(), "Insufficient balance"):
// User needs more TRUF tokens
log.Println("Please fund your wallet with TRUF tokens")
case strings.Contains(err.Error(), "invalid"):
// Input validation failed
log.Println("Check input parameters")
default:
log.Printf("Attestation request failed: %v", err)
}
}
// Retrieving signed attestation
signed, err := attestationActions.GetSignedAttestation(ctx, input)
if err != nil || len(signed.Payload) < 66 {
// Attestation not ready or invalid
log.Println("Attestation not yet signed, try again later")
}
// Parsing payload
parsed, err := contractsapi.ParseAttestationPayload(canonicalPayload)
if err != nil {
switch {
case strings.Contains(err.Error(), "too short"):
// Payload truncated or invalid
log.Println("Invalid payload format")
case strings.Contains(err.Error(), "version"):
// Unsupported payload version
log.Println("Unsupported attestation version")
default:
log.Printf("Parse error: %v", err)
}
}
// Signature verification
pubKey, err := crypto.RecoverSecp256k1KeyFromSigHash(hash[:], adjustedSig)
if err != nil {
// Invalid signature or tampering detected
log.Println("Signature verification failed - payload may be tampered")
}- Attestation Latency: Typically 1-2 blocks (2-4 seconds) for signing
- Payload Size: Varies with result data (typically 1KB-100KB)
- Fee Costs: Depends on query complexity and data size
- Polling Frequency: Recommended 2-second intervals to balance latency and API load
- Signature Verification: Always verify signatures before trusting attestation data
- Replay Protection: Check block height to prevent replay attacks
- Validator Trust: Only accept attestations from known validator addresses
- Payload Integrity: Hash payload before verification; detect tampering
- Fee Limits: Set appropriate
MaxFeeto prevent unexpected charges
The Bridge Actions Interface enables programmatic interaction with the TRUF.NETWORK bridge system. It allows bots and applications to manage token balances, initiate withdrawals to external chains (like Ethereum), and retrieve cryptographic proofs for claiming assets.
| Identifier | Network | Token | Decimals | Notes |
|---|---|---|---|---|
eth_truf |
mainnet | TRUF | 18 | Used for protocol fees (stream write, attestation, market creation) |
eth_usdc |
mainnet | USDC | 6 | Used for prediction-market collateral |
ethereum_bridge |
mainnet | TRUF | 18 | Legacy — replaced by eth_truf |
hoodi_tt |
testnet | TRUF (test) | 18 | Hoodi testnet |
hoodi_tt2 |
testnet | USDC (test) | 18 | Hoodi testnet — prediction-market collateral |
sepolia_bridge |
testnet | TRUF (test) | 18 | Sepolia testnet, deprecated |
Examples below use testnet identifiers; substitute the mainnet equivalent for production. Order-book actions (create_market, place_buy_order, etc.) accept eth_usdc or eth_truf as the Bridge parameter on mainnet.
These methods are available directly on the Client interface.
Retrieves the token balance for a wallet on a specific bridge instance.
Signature:
func (c *Client) GetWalletBalance(bridgeIdentifier string, walletAddress string) (string, error)Parameters:
bridgeIdentifier(string): Unique identifier for the bridge (e.g., "hoodi_tt", "sepolia").walletAddress(string): The wallet address to query (0x-prefixed).
Returns:
string: The balance in wei (as a string to preserve precision).error: Error if the bridge or wallet is invalid.
Example:
balance, err := client.GetWalletBalance("hoodi_tt", "0x123...")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Balance: %s wei\n", balance)Initiates a withdrawal by burning tokens on the TRUF.NETWORK. This is the first step in bridging assets back to an external chain.
Signature:
func (c *Client) Withdraw(bridgeIdentifier string, amount string, recipient string) (string, error)Parameters:
bridgeIdentifier(string): Unique identifier for the bridge (e.g., "hoodi_tt").amount(string): The amount to withdraw in wei. Must be a valid numeric string.recipient(string): The EVM address that will receive the funds on the destination chain.
Returns:
string: The transaction hash of the burn operation on Kwil.error: Error if validation fails or the transaction is rejected.
Example:
// Withdraw 1 token (18 decimals)
txHash, err := client.Withdraw("hoodi_tt", "1000000000000000000", "0xRecipient...")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Burn TX Hash: %s\n", txHash)Sends tokens from the caller to another in-network wallet via the bridge's public transfer action.
The caller pays a 1-token action fee on top of amount, denominated in the same token as the bridge (1 TRUF for eth_truf, 1 USDC for eth_usdc). The action reverts if the caller balance is below amount + 1 token.
Signature:
func (c *Client) Transfer(ctx context.Context, bridgeIdentifier string, recipient string, amount string) (string, error)Note: Callers must pass a supported
bridgeIdentifier—"eth_truf"or"eth_usdc"on mainnet,"sepolia"on dev/test.Client.Transferis not implemented for custom transports (e.g. CRE); when used through a non-HTTP transport it returns a runtime error. Use the default HTTP transport, or implementTransferon your custom transport.
Parameters:
bridgeIdentifier(string): Supported bridge namespace —"eth_truf","eth_usdc"(mainnet) or"sepolia"(dev/test).recipient(string): Destination wallet address (0x-prefixed 40-char hex). Validated client-side.amount(string): Transfer amount in wei. Must parse as a positive decimal.
Returns:
string: Transaction hash of the transfer.error: Error if validation fails (empty / invalid recipient, non-positive or non-numeric amount), the custom transport doesn't implementTransfer, or the transaction is rejected.
Example — Refill bot pattern:
// Top up an adapter wallet to its threshold; budget +1 TRUF for the action fee.
txHash, err := client.Transfer(ctx, "eth_truf", adapterWallet, "100000000000000000000")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Refill TX Hash: %s\n", txHash)Retrieves the cryptographic proofs required to claim a withdrawal on the destination chain (e.g., via the withdraw function on the bridge contract).
Signature:
func (c *Client) GetWithdrawalProof(ctx context.Context, input types.GetWithdrawalProofInput) ([]types.WithdrawalProof, error)Parameters:
input:BridgeIdentifier(string): The bridge ID (e.g., "hoodi_tt").Wallet(string): The wallet address that initiated the withdrawal.
Returns:
[]types.WithdrawalProof: A list of proof objects, each containing:BlockHeight: Kwil block height of the withdrawal.BlockHash: Base64 encoded block hash.Root: Base64 encoded merkle root.Signatures: List of Base64 encoded validator signatures.Amount: The amount withdrawn.Recipient: The recipient address.
Example:
proofs, err := client.GetWithdrawalProof(ctx, types.GetWithdrawalProofInput{
BridgeIdentifier: "hoodi_tt",
Wallet: "0xSender...",
})
if len(proofs) > 0 {
proof := proofs[0]
fmt.Printf("Ready to claim %s tokens for recipient %s\n", proof.Amount, proof.Recipient)
// Use proof.Signatures, proof.Root, etc. to submit claim transaction on Ethereum
}ILocalActions talks to the node's admin JSON-RPC server (default port
8485) instead of the gateway. Local streams live off-chain on a single
node: no consensus broadcast, no transaction fees, implicitly owned by the
node operator. The server derives data_provider from the node's
secp256k1 key — clients never supply it on the wire.
Use local actions when the data should stay on one node (private primitive
data, composed streams that compose local children, node-operator tooling).
Use the full Client for anything that needs consensus (cross-node reads,
role management, bridge operations).
Three equivalent entry points:
import (
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/trufnetwork/sdk-go/core/tnclient"
"github.com/trufnetwork/sdk-go/core/types"
)
// 1. Standalone — no gateway, no wallet, no chain id.
local, err := tnclient.NewLocalClient("http://127.0.0.1:8485")
// 2. Standalone + operator-key signing (required on
// require_signature = true nodes).
priv, _ := ethcrypto.HexToECDSA(os.Getenv("OPERATOR_KEY"))
signed, err := tnclient.NewLocalClientWithSigner("http://127.0.0.1:8485", priv)
// 3. Share the admin transport with a full Client.
client, _ := tnclient.NewClient(ctx, gatewayURL,
tnclient.WithSigner(walletSigner),
tnclient.WithAdmin("http://127.0.0.1:8485"),
tnclient.WithLocalSigner(priv), // attaches _auth to tn_local calls
)
viaClient, _ := client.LoadLocalActions()Attaches an operator secp256k1 private key for signing tn_local admin
requests. Required when the target node has
[extensions.tn_local] require_signature = true. When set, the SDK
attaches a server-recoverable _auth envelope (sig, ts, ver) to
every request; the server rejects requests signed by any other key. Pass
nil (or leave the option off) to behave identically to the plain
NewLocalClient / LoadLocalActions — only nodes with the flag off will
accept the request.
Extract the autogenerated operator key from the dev container once:
export OPERATOR_KEY="$(docker exec tn-db cat /root/.kwild/nodekey.json | jq -r '.key')"Every method below accepts only business inputs — no DataProvider is
ever sent on the wire. Responses include DataProvider (server-derived,
always the node's own address, lowercased).
| Method | Purpose |
|---|---|
CreateStream(ctx, LocalCreateStreamInput) |
Create a local primitive or composed stream. |
DeleteStream(ctx, LocalDeleteStreamInput) |
Remove a local stream and all child rows (records, taxonomies). |
InsertRecords(ctx, LocalInsertRecordsInput) |
Append records. Parallel slices: StreamID[i] gets (EventTime[i], Value[i]). |
InsertTaxonomy(ctx, LocalInsertTaxonomyInput) |
Add a taxonomy group to a composed stream. |
DisableTaxonomy(ctx, LocalDisableTaxonomyInput) |
Soft-delete a taxonomy group. |
GetRecord(ctx, LocalGetRecordInput) |
Query records (latest if both bounds unset). |
GetIndex(ctx, LocalGetIndexInput) |
Query the computed index series. |
ListStreams(ctx) |
List every local stream on this node. |
- No signer + server
require_signature = false→ request goes through unsigned. - No signer + server
require_signature = true→ server returnstn_local: unauthenticatedwithreason: "missing _auth". - Wrong signer + server
require_signature = true→ server returnstn_local: unauthenticatedwithreason: "signer is not this node's operator". - Operator signer + server
require_signature = true→ everylocal.*call succeeds;DataProvideris the operator address.
The admin server has its own transport-level auth (unix socket, mTLS, or
--admin.pass) completely independent of tn_local's
require_signature. Pick a transport that matches your trust model:
loopback TCP + --admin.notls is fine for on-host dev work but is not
equivalent to the default unix socket; any other process running as any
user on the host can still reach it. For production, use the unix socket,
mTLS, or --admin.pass. See
node/docs/development.md
for the full matrix.
A working program that creates a primitive stream, a composed stream with
a taxonomy, queries both, and demonstrates wrong-key rejection lives at
examples/local_actions_example/.
Run it against either a flag-off or a flag-on node; its README walks
through both invocation shapes.