From db324ee90f28c30c3008295aae7242ab80384cf6 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Fri, 15 May 2026 19:21:50 +0700 Subject: [PATCH] chore: limit insert_records to 10 rows per transaction --- .../003-primitive-insertion.prod.sql | 12 +++-- .../migrations/003-primitive-insertion.sql | 12 +++-- tests/streams/utils/setup/primitive.go | 54 +++++++++++++------ 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/internal/migrations/003-primitive-insertion.prod.sql b/internal/migrations/003-primitive-insertion.prod.sql index 368291b2..cbfc01e6 100644 --- a/internal/migrations/003-primitive-insertion.prod.sql +++ b/internal/migrations/003-primitive-insertion.prod.sql @@ -33,10 +33,14 @@ CREATE OR REPLACE ACTION insert_records( -- Cap batch size to prevent superlinear block execution time. -- With per-tx PG isolation, blocks handle thousands of small txns efficiently. - -- TODO: Commented out for now to allow larger batch sizes, MUST be re-enabled once ingestors are updated!!! - -- if $num_records > 10 { - -- ERROR('insert_records: batch size exceeds maximum of 10 records'); - -- } + -- All shipping clients (sdk-go BulkInserter, sdk-py BulkInserter, the + -- truf-data-provider Go cron, and the tsn-adapters Python pipeline) + -- already chunk at 10 rows/tx, so this cap is a defense-in-depth bound + -- against compromised/buggy callers and not a constraint on legitimate + -- batching. See truflation/website#3887. + if $num_records > 10 { + ERROR('insert_records: batch size exceeds maximum of 10 records'); + } -- Cheap input validation runs before any precompile call so malformed -- requests (NULL/empty/length-mismatched arrays) fail fast instead of diff --git a/internal/migrations/003-primitive-insertion.sql b/internal/migrations/003-primitive-insertion.sql index cb41622d..c58b9de7 100644 --- a/internal/migrations/003-primitive-insertion.sql +++ b/internal/migrations/003-primitive-insertion.sql @@ -40,10 +40,14 @@ CREATE OR REPLACE ACTION insert_records( -- Cap batch size to prevent superlinear block execution time. -- With per-tx PG isolation, blocks handle thousands of small txns efficiently. - -- TODO: Commented out for now to allow larger batch sizes, MUST be re-enabled once ingestors are updated!!! - -- if $num_records > 10 { - -- ERROR('insert_records: batch size exceeds maximum of 10 records'); - -- } + -- All shipping clients (sdk-go BulkInserter, sdk-py BulkInserter, the + -- truf-data-provider Go cron, and the tsn-adapters Python pipeline) + -- already chunk at 10 rows/tx, so this cap is a defense-in-depth bound + -- against compromised/buggy callers and not a constraint on legitimate + -- batching. See truflation/website#3887. + if $num_records > 10 { + ERROR('insert_records: batch size exceeds maximum of 10 records'); + } -- Cheap input validation runs before any precompile call so malformed -- requests (NULL/empty/length-mismatched arrays) fail fast instead of diff --git a/tests/streams/utils/setup/primitive.go b/tests/streams/utils/setup/primitive.go index c2a26fc1..0fb52b02 100644 --- a/tests/streams/utils/setup/primitive.go +++ b/tests/streams/utils/setup/primitive.go @@ -295,8 +295,15 @@ type InsertMultiPrimitiveDataInput struct { Streams []PrimitiveStreamWithData } +// insertRecordsMaxBatchSize mirrors the on-chain cap enforced by insert_records +// (migration 003-primitive-insertion.sql). Helpers that build a single engine +// call must keep each call at or below this many records. +const insertRecordsMaxBatchSize = 10 + // InsertPrimitiveDataMultiBatch inserts records for multiple streams using the insert_records action. -// It groups records by data provider to minimize calls and ensure correct signing. +// It groups records by data provider to minimize calls and ensure correct signing, +// then chunks each provider's batch to stay within the on-chain insert_records cap +// (insertRecordsMaxBatchSize). func InsertPrimitiveDataMultiBatch(ctx context.Context, input InsertMultiPrimitiveDataInput) error { if len(input.Streams) == 0 { return nil @@ -333,7 +340,8 @@ func InsertPrimitiveDataMultiBatch(ctx context.Context, input InsertMultiPrimiti txid := input.Platform.Txid() - // Execute one call per provider group with correct signer + // Execute one or more calls per provider group with correct signer, + // chunking at insertRecordsMaxBatchSize to satisfy the on-chain cap. for provider, g := range byProvider { if len(g.dataProviders) != len(g.streamIds) || len(g.streamIds) != len(g.eventTimes) || len(g.eventTimes) != len(g.values) { return errors.Errorf("array length mismatch for provider %s: dp=%d sid=%d ts=%d val=%d", @@ -341,23 +349,39 @@ func InsertPrimitiveDataMultiBatch(ctx context.Context, input InsertMultiPrimiti } signerAddr := util.Unsafe_NewEthereumAddressFromString(provider) - // InsertPrimitiveDataMultiBatch issues exactly one insert_records call - // per provider group, regardless of how many records it contains. The - // write fee is per-tx, so fund a single 1 TRUF (not len(records) × 1). - if err := fundForInsertCalls(ctx, input.Platform, signerAddr.Address(), 1); err != nil { + total := len(g.dataProviders) + if total == 0 { + continue + } + numCalls := (total + insertRecordsMaxBatchSize - 1) / insertRecordsMaxBatchSize + + // Fee is per-tx; fund one 1-TRUF call for each chunk we'll send. + if err := fundForInsertCalls(ctx, input.Platform, signerAddr.Address(), numCalls); err != nil { return errors.Wrapf(err, "fund %s for insert_records batch", provider) } - engineContext := newEthEngineContext(ctx, input.Platform, signerAddr, input.Height) - engineContext.TxContext.TxID = txid + for start := 0; start < total; start += insertRecordsMaxBatchSize { + end := start + insertRecordsMaxBatchSize + if end > total { + end = total + } - args := []any{g.dataProviders, g.streamIds, g.eventTimes, g.values} - r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_records", args, func(row *common.Row) error { return nil }) - if err != nil { - return errors.Wrapf(err, "insert_records call failed for provider %s", provider) - } - if r.Error != nil { - return errors.Wrapf(r.Error, "insert_records result failed for provider %s", provider) + engineContext := newEthEngineContext(ctx, input.Platform, signerAddr, input.Height) + engineContext.TxContext.TxID = txid + + args := []any{ + g.dataProviders[start:end], + g.streamIds[start:end], + g.eventTimes[start:end], + g.values[start:end], + } + r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "insert_records", args, func(row *common.Row) error { return nil }) + if err != nil { + return errors.Wrapf(err, "insert_records call failed for provider %s", provider) + } + if r.Error != nil { + return errors.Wrapf(r.Error, "insert_records result failed for provider %s", provider) + } } }