Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions internal/migrations/003-primitive-insertion.prod.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ CREATE OR REPLACE ACTION insert_records(

-- Cap batch size to prevent superlinear block execution time.
-- With per-tx PG isolation, blocks handle thousands of small txns efficiently.
-- TODO: Commented out for now to allow larger batch sizes, MUST be re-enabled once ingestors are updated!!!
-- if $num_records > 10 {
-- ERROR('insert_records: batch size exceeds maximum of 10 records');
-- }
-- All shipping clients (sdk-go BulkInserter, sdk-py BulkInserter, the
-- truf-data-provider Go cron, and the tsn-adapters Python pipeline)
-- already chunk at 10 rows/tx, so this cap is a defense-in-depth bound
-- against compromised/buggy callers and not a constraint on legitimate
-- batching. See truflation/website#3887.
if $num_records > 10 {
ERROR('insert_records: batch size exceeds maximum of 10 records');
}

-- Cheap input validation runs before any precompile call so malformed
-- requests (NULL/empty/length-mismatched arrays) fail fast instead of
Expand Down
12 changes: 8 additions & 4 deletions internal/migrations/003-primitive-insertion.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ CREATE OR REPLACE ACTION insert_records(

-- Cap batch size to prevent superlinear block execution time.
-- With per-tx PG isolation, blocks handle thousands of small txns efficiently.
-- TODO: Commented out for now to allow larger batch sizes, MUST be re-enabled once ingestors are updated!!!
-- if $num_records > 10 {
-- ERROR('insert_records: batch size exceeds maximum of 10 records');
-- }
-- All shipping clients (sdk-go BulkInserter, sdk-py BulkInserter, the
-- truf-data-provider Go cron, and the tsn-adapters Python pipeline)
-- already chunk at 10 rows/tx, so this cap is a defense-in-depth bound
-- against compromised/buggy callers and not a constraint on legitimate
-- batching. See truflation/website#3887.
if $num_records > 10 {
ERROR('insert_records: batch size exceeds maximum of 10 records');
}

-- Cheap input validation runs before any precompile call so malformed
-- requests (NULL/empty/length-mismatched arrays) fail fast instead of
Expand Down
54 changes: 39 additions & 15 deletions tests/streams/utils/setup/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,15 @@ type InsertMultiPrimitiveDataInput struct {
Streams []PrimitiveStreamWithData
}

// insertRecordsMaxBatchSize mirrors the on-chain cap enforced by insert_records
// (migration 003-primitive-insertion.sql). Helpers that build a single engine
// call must keep each call at or below this many records.
const insertRecordsMaxBatchSize = 10

// InsertPrimitiveDataMultiBatch inserts records for multiple streams using the insert_records action.
// It groups records by data provider to minimize calls and ensure correct signing.
// It groups records by data provider to minimize calls and ensure correct signing,
// then chunks each provider's batch to stay within the on-chain insert_records cap
// (insertRecordsMaxBatchSize).
func InsertPrimitiveDataMultiBatch(ctx context.Context, input InsertMultiPrimitiveDataInput) error {
if len(input.Streams) == 0 {
return nil
Expand Down Expand Up @@ -333,31 +340,48 @@ func InsertPrimitiveDataMultiBatch(ctx context.Context, input InsertMultiPrimiti

txid := input.Platform.Txid()

// Execute one call per provider group with correct signer
// Execute one or more calls per provider group with correct signer,
// chunking at insertRecordsMaxBatchSize to satisfy the on-chain cap.
for provider, g := range byProvider {
if len(g.dataProviders) != len(g.streamIds) || len(g.streamIds) != len(g.eventTimes) || len(g.eventTimes) != len(g.values) {
return errors.Errorf("array length mismatch for provider %s: dp=%d sid=%d ts=%d val=%d",
provider, len(g.dataProviders), len(g.streamIds), len(g.eventTimes), len(g.values))
}
signerAddr := util.Unsafe_NewEthereumAddressFromString(provider)

// InsertPrimitiveDataMultiBatch issues exactly one insert_records call
// per provider group, regardless of how many records it contains. The
// write fee is per-tx, so fund a single 1 TRUF (not len(records) × 1).
if err := fundForInsertCalls(ctx, input.Platform, signerAddr.Address(), 1); err != nil {
total := len(g.dataProviders)
if total == 0 {
continue
}
numCalls := (total + insertRecordsMaxBatchSize - 1) / insertRecordsMaxBatchSize

// Fee is per-tx; fund one 1-TRUF call for each chunk we'll send.
if err := fundForInsertCalls(ctx, input.Platform, signerAddr.Address(), numCalls); err != nil {
return errors.Wrapf(err, "fund %s for insert_records batch", provider)
}

engineContext := newEthEngineContext(ctx, input.Platform, signerAddr, input.Height)
engineContext.TxContext.TxID = txid
for start := 0; start < total; start += insertRecordsMaxBatchSize {
end := start + insertRecordsMaxBatchSize
if end > total {
end = total
}

args := []any{g.dataProviders, g.streamIds, g.eventTimes, g.values}
r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_records", args, func(row *common.Row) error { return nil })
if err != nil {
return errors.Wrapf(err, "insert_records call failed for provider %s", provider)
}
if r.Error != nil {
return errors.Wrapf(r.Error, "insert_records result failed for provider %s", provider)
engineContext := newEthEngineContext(ctx, input.Platform, signerAddr, input.Height)
engineContext.TxContext.TxID = txid

args := []any{
g.dataProviders[start:end],
g.streamIds[start:end],
g.eventTimes[start:end],
g.values[start:end],
}
r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_records", args, func(row *common.Row) error { return nil })
if err != nil {
return errors.Wrapf(err, "insert_records call failed for provider %s", provider)
}
if r.Error != nil {
return errors.Wrapf(r.Error, "insert_records result failed for provider %s", provider)
}
}
}

Expand Down
Loading