From f20dee0fc8323de2dc2461b7923ccb975ff7678f Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 21 Apr 2026 12:19:41 +0100 Subject: [PATCH 1/7] introduce a lightweight tracking mode for confirmation Signed-off-by: Chengxuan Xing --- internal/blocklistener/blocklistener.go | 6 +- internal/blocklistener/blocklistener_test.go | 2 +- internal/confirmations/confirmations.go | 212 +++++-- internal/confirmations/confirmations_test.go | 527 +++++++++++++++++- .../confirmations/confirmed_block_listener.go | 7 +- .../confirmed_block_listener_test.go | 18 +- internal/events/eventstream_test.go | 39 +- internal/tmmsgs/en_error_messages.go | 3 + mocks/apiclientmocks/fftm_client.go | 2 +- mocks/confirmationsmocks/manager.go | 8 +- mocks/eventsmocks/stream.go | 2 +- mocks/ffcapimocks/api.go | 20 +- mocks/metricsmocks/event_metrics_emitter.go | 2 +- .../transaction_handler_metrics.go | 2 +- mocks/persistencemocks/persistence.go | 2 +- mocks/persistencemocks/rich_query.go | 2 +- .../transaction_persistence.go | 2 +- .../managed_tx_event_handler.go | 2 +- mocks/txhandlermocks/transaction_handler.go | 2 +- mocks/wsmocks/web_socket_channels.go | 2 +- mocks/wsmocks/web_socket_server.go | 2 +- pkg/apitypes/api_types.go | 8 +- pkg/ffcapi/api.go | 23 +- pkg/fftm/manager_test.go | 35 +- 24 files changed, 817 insertions(+), 113 deletions(-) diff --git a/internal/blocklistener/blocklistener.go b/internal/blocklistener/blocklistener.go index 98c0c55a..6f97d7d7 100644 --- a/internal/blocklistener/blocklistener.go +++ b/internal/blocklistener/blocklistener.go @@ -24,7 +24,7 @@ import ( ) type NewBlockHashConsumer interface { - NewBlockHashes() chan<- *ffcapi.BlockHashEvent + GetReceiveChannel() chan<- *ffcapi.BlockHashEvent } // BufferChannel ensures it always pulls blocks from the channel passed to the connector @@ -49,7 +49,7 @@ func BufferChannel(ctx context.Context, target NewBlockHashConsumer) (buffered c // Have to discard this blockedUpdate.GapPotential = true // there is a gap for sure at this point log.L(ctx).Debugf("Blocked event stream missed new block event: %v", blockUpdate.BlockHashes) - case target.NewBlockHashes() <- blockedUpdate: + case target.GetReceiveChannel() <- blockedUpdate: // We're not blocked any more log.L(ctx).Infof("Event stream block-listener unblocked") blockedUpdate = nil @@ -64,7 +64,7 @@ func BufferChannel(ctx context.Context, target NewBlockHashConsumer) (buffered c // Nothing to do unless we have confirmations turned on if target != nil { select { - case target.NewBlockHashes() <- update: + case target.GetReceiveChannel() <- update: // all good, we passed it on default: // we can't deliver it immediately, we switch to blocked mode diff --git a/internal/blocklistener/blocklistener_test.go b/internal/blocklistener/blocklistener_test.go index b4d8ed29..13a13a36 100644 --- a/internal/blocklistener/blocklistener_test.go +++ b/internal/blocklistener/blocklistener_test.go @@ -28,7 +28,7 @@ type testBlockConsumer struct { c chan *ffcapi.BlockHashEvent } -func (tbc *testBlockConsumer) NewBlockHashes() chan<- *ffcapi.BlockHashEvent { +func (tbc *testBlockConsumer) GetReceiveChannel() chan<- *ffcapi.BlockHashEvent { return tbc.c } diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index 175ff0cf..21593898 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "sort" + "strings" "sync" "time" @@ -43,7 +44,7 @@ type Manager interface { Notify(n *Notification) error Start() Stop() - NewBlockHashes() chan<- *ffcapi.BlockHashEvent + GetReceiveChannel() chan<- *ffcapi.BlockHashEvent CheckInFlight(listenerID *fftypes.UUID) bool StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error StopConfirmedBlockListener(ctx context.Context, id *fftypes.UUID) error @@ -86,25 +87,27 @@ type RemovedListenerInfo struct { } type blockConfirmationManager struct { - baseContext context.Context - ctx context.Context - cancelFunc func() - newBlockHashes chan *ffcapi.BlockHashEvent - connector ffcapi.API - blockListenerStale bool - metricsEmitter metrics.ConfirmationMetricsEmitter - requiredConfirmations int - staleReceiptTimeout time.Duration - bcmNotifications chan *Notification - highestBlockSeen uint64 - pending map[string]*pendingItem - pendingMux sync.Mutex - receiptChecker *receiptChecker - retry *retry.Retry - cblLock sync.Mutex - cbls map[fftypes.UUID]*confirmedBlockListener - fetchReceiptUponEntry bool - done chan struct{} + baseContext context.Context + ctx context.Context + cancelFunc func() + newBlockHashEvents chan *ffcapi.BlockHashEvent + connector ffcapi.API + blockListenerStale bool + metricsEmitter metrics.ConfirmationMetricsEmitter + requiredConfirmations uint64 + blockListenerTrackingMode ffcapi.BlockListenerTrackingMode + staleReceiptTimeout time.Duration + bcmNotifications chan *Notification + highestBlockSeen uint64 + headBlockNumber uint64 // used for confirmation when block listener is in headBlockNumber mode + pending map[string]*pendingItem + pendingMux sync.Mutex + receiptChecker *receiptChecker + retry *retry.Retry + cblLock sync.Mutex + cbls map[fftypes.UUID]*confirmedBlockListener + fetchReceiptUponEntry bool + done chan struct{} } func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.API, desc string, @@ -114,12 +117,16 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A connector: connector, cbls: make(map[fftypes.UUID]*confirmedBlockListener), blockListenerStale: true, - requiredConfirmations: config.GetInt(tmconfig.ConfirmationsRequired), - staleReceiptTimeout: config.GetDuration(tmconfig.ConfirmationsStaleReceiptTimeout), - bcmNotifications: make(chan *Notification, config.GetInt(tmconfig.ConfirmationsNotificationQueueLength)), - pending: make(map[string]*pendingItem), - newBlockHashes: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)), - metricsEmitter: cme, + requiredConfirmations: config.GetUint64(tmconfig.ConfirmationsRequired), + // currently the confirmation result is driven by the block listener mode, + // because when the block listener is in the headBlockNumber mode, the block information is not fetched and cached by the connector + // Therefore, it will be super inefficient to build the in memory partial chain in confirmations manager + blockListenerTrackingMode: connector.GetBlockListenerTrackingMode(baseContext), + staleReceiptTimeout: config.GetDuration(tmconfig.ConfirmationsStaleReceiptTimeout), + bcmNotifications: make(chan *Notification, config.GetInt(tmconfig.ConfirmationsNotificationQueueLength)), + pending: make(map[string]*pendingItem), + newBlockHashEvents: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)), + metricsEmitter: cme, retry: &retry.Retry{ InitialDelay: config.GetDuration(tmconfig.ConfirmationsRetryInitDelay), MaximumDelay: config.GetDuration(tmconfig.ConfirmationsRetryMaxDelay), @@ -143,22 +150,23 @@ const ( // pendingItem could be a specific event that has been detected, but not confirmed yet. // Or it could be a transaction type pendingItem struct { - pType pendingType - added time.Time - notifiedConfirmations []*apitypes.Confirmation - confirmations []*apitypes.Confirmation - scheduledAtLeastOnce bool - confirmed bool - queuedStale *list.Element // protected by receiptChecker mux - lastReceiptCheck time.Time // protected by receiptChecker mux - receiptCallback func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse) - confirmationsCallback func(ctx context.Context, notification *apitypes.ConfirmationsNotification) - transactionHash string - blockHash string // can be notified of changes to this for receipts - blockNumber uint64 // known at creation time for event logs - transactionIndex uint64 // known at creation time for event logs - logIndex uint64 // events only - listenerID *fftypes.UUID // events only + pType pendingType + added time.Time + notifiedConfirmations []*apitypes.Confirmation + confirmations []*apitypes.Confirmation + scheduledAtLeastOnce bool + confirmed bool + previousConfirmationCount *uint64 // headBlockNumber mode: last dispatched ActualConfirmationCount + queuedStale *list.Element // protected by receiptChecker mux + lastReceiptCheck time.Time // protected by receiptChecker mux + receiptCallback func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse) + confirmationsCallback func(ctx context.Context, notification *apitypes.ConfirmationsNotification) + transactionHash string + blockHash string // can be notified of changes to this for receipts + blockNumber uint64 // known at creation time for event logs + transactionIndex uint64 // known at creation time for event logs + logIndex uint64 // events only + listenerID *fftypes.UUID // events only } func pendingKeyForTX(txHash string) string { @@ -243,8 +251,8 @@ func (bcm *blockConfirmationManager) Stop() { } } -func (bcm *blockConfirmationManager) NewBlockHashes() chan<- *ffcapi.BlockHashEvent { - return bcm.newBlockHashes +func (bcm *blockConfirmationManager) GetReceiveChannel() chan<- *ffcapi.BlockHashEvent { + return bcm.newBlockHashEvents } // Notify is used to notify the confirmation manager of detection of a new logEntry addition or removal @@ -365,12 +373,12 @@ func (bcm *blockConfirmationManager) confirmationsListener() { receivedFirstBlock := false for { select { - case bhe := <-bcm.newBlockHashes: + case bhe := <-bcm.newBlockHashEvents: if bhe.GapPotential { bcm.blockListenerStale = true } blockHashes = append(blockHashes, bhe.BlockHashes...) - + bcm.headBlockNumber = bhe.HeadBlockNumber // always update the head block number, NOTE: the number can decrease during a re-org // Need to also pass this event to any confirmed block listeners // (they promise to always be efficient in handling these, having a go-routine // dedicated to spinning fast just processing those separate to dispatching them) @@ -406,9 +414,12 @@ func (bcm *blockConfirmationManager) confirmationsListener() { blocks := bcm.newBlockState() if bcm.blockListenerStale { - if err := bcm.walkChain(blocks); err != nil { - log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err) - continue + if bcm.blockListenerTrackingMode == ffcapi.BlockListenerTrackingModeInMemoryPartialChain { + // only need to build the in memory partial chain when the block listener supports it + if err := bcm.walkChain(blocks); err != nil { + log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err) + continue + } } bcm.blockListenerStale = false } @@ -548,6 +559,12 @@ func (bcm *blockConfirmationManager) removeItem(pending *pendingItem, stale bool } func (bcm *blockConfirmationManager) processBlockHashes(blockHashes []string) { + if bcm.blockListenerTrackingMode == ffcapi.BlockListenerTrackingModeHeadBlockNumber { + // for headBlockNumber mode, we don't need to fetch blocks to form a local in memory partial chain + // we just need to do confirmation check and dispatch confirmations for any applicable pending items + bcm.checkAndDispatchConfirmationsUsingBlockHeight() + return + } batchSize := len(blockHashes) if batchSize > 0 { log.L(bcm.ctx).Debugf("New block notifications %v", blockHashes) @@ -599,7 +616,6 @@ func (bcm *blockConfirmationManager) processBlock(block *apitypes.BlockInfo) { var notifications pendingItems for pendingKey, pending := range bcm.pending { if pending.blockHash != "" { - // The block might appear at any point in the confirmation list newConfirmations := false expectedParentHash := pending.blockHash @@ -622,7 +638,7 @@ func (bcm *blockConfirmationManager) processBlock(block *apitypes.BlockInfo) { } expectedBlockNumber++ } - if len(pending.confirmations) >= bcm.requiredConfirmations { + if uint64(len(pending.confirmations)) >= bcm.requiredConfirmations { pending.confirmed = true } if bcm.requiredConfirmations > 0 && (pending.confirmed || newConfirmations) { @@ -768,6 +784,10 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc return nil } + if bcm.blockListenerTrackingMode == ffcapi.BlockListenerTrackingModeHeadBlockNumber { + return bcm.confirmationCheckUsingHighestBlock(pending) + } + pendingKey := pending.getKey() blockNumber := pending.blockNumber + 1 @@ -796,7 +816,7 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc break } pending.confirmations = append(pending.confirmations, apitypes.ConfirmationFromBlock(block)) - if len(pending.confirmations) >= bcm.requiredConfirmations { + if uint64(len(pending.confirmations)) >= bcm.requiredConfirmations { pending.confirmed = true break } @@ -811,3 +831,91 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc return nil } + +func (bcm *blockConfirmationManager) checkAndDispatchConfirmationsUsingBlockHeight() { + bcm.pendingMux.Lock() + items := make([]*pendingItem, 0, len(bcm.pending)) + for _, p := range bcm.pending { + items = append(items, p) + } + bcm.pendingMux.Unlock() + for _, p := range items { + if err := bcm.confirmationCheckUsingHighestBlock(p); err != nil { + log.L(bcm.ctx).Errorf("Block height confirmation refresh failed for %s: %s", p.getKey(), err) + } + } +} + +func (bcm *blockConfirmationManager) confirmationCheckUsingHighestBlock(pending *pendingItem) error { + if pending.blockHash == "" { + // no receipt yet, so no confirmation check + return nil + } + + actualConfirmationCount := uint64(0) + if bcm.headBlockNumber > pending.blockNumber { + actualConfirmationCount = bcm.headBlockNumber - pending.blockNumber + } + + return bcm.dispatchBlockHeightConfirmations(pending, actualConfirmationCount) + +} + +func (bcm *blockConfirmationManager) dispatchBlockHeightConfirmations(pending *pendingItem, count uint64) error { + if pending.previousConfirmationCount != nil && count == *pending.previousConfirmationCount { + // no ops as there isn't any changes detected + return nil + } + + confirmationCount := count + if confirmationCount > bcm.requiredConfirmations { + confirmationCount = bcm.requiredConfirmations + } + + confirmed := confirmationCount == bcm.requiredConfirmations + if confirmed { + // do confirmation check here to ensure the transaction receipt is still valid + res, reason, err := bcm.connector.TransactionReceipt(bcm.ctx, &ffcapi.TransactionReceiptRequest{ + TransactionHash: pending.transactionHash, + }) + if err != nil { + if reason == ffcapi.ErrorReasonNotFound { + // need to schedule the receipt check again as the receipt is no longer valid + pending.blockHash = "" + pending.blockNumber = 0 + pending.previousConfirmationCount = nil + bcm.receiptChecker.schedule(pending, true) + } + return err + } + if res == nil || res.BlockHash == "" { + return i18n.NewError(bcm.ctx, tmmsgs.MsgTransactionReceiptMissingBlockHash) + } + if !strings.EqualFold(strings.ToLower(res.BlockHash), strings.ToLower(pending.blockHash)) { + // set the new block hash and number and requeue for confirmation check + pending.blockHash = res.BlockHash + pending.blockNumber = res.BlockNumber.Uint64() + pending.previousConfirmationCount = nil + return i18n.NewError(bcm.ctx, tmmsgs.MsgTransactionReceiptBlockHashMismatch, pending.blockHash, res.BlockHash) + } + } + // when not confirmed, we assume the receipt is still valid and return the actual confirmation count + + notification := &apitypes.ConfirmationsNotification{ + Confirmed: confirmed, + NewFork: pending.previousConfirmationCount != nil && count < *pending.previousConfirmationCount, // the only fork situation we can detect is a reduction in confirmation count + ActualConfirmationCount: confirmationCount, // NOTE: we returns the actual confirmation count, which can be higher than the required confirmation count + TargetConfirmationCount: bcm.requiredConfirmations, + } + pending.previousConfirmationCount = &count + + log.L(bcm.ctx).Infof("Block height confirmation notification item=%s confirmed=%t count=%d/%d newFork=%t", + pending.getKey(), notification.Confirmed, notification.ActualConfirmationCount, notification.TargetConfirmationCount, notification.NewFork) + pending.confirmationsCallback(bcm.ctx, notification) + bcm.metricsEmitter.RecordConfirmationMetrics(bcm.ctx, time.Since(pending.added).Seconds()) + + if confirmed { + bcm.removeItem(pending, false) + } + return nil +} diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index 2c4b5a43..9b3e6496 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -16,6 +16,7 @@ package confirmations import ( "context" + "errors" "fmt" "sort" "testing" @@ -43,6 +44,7 @@ func newTestBlockConfirmationManager() (*blockConfirmationManager, *ffcapimocks. func newTestBlockConfirmationManagerCustomConfig() (*blockConfirmationManager, *ffcapimocks.API) { logrus.SetLevel(logrus.DebugLevel) mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() emm := &metricsmocks.EventMetricsEmitter{} emm.On("RecordNotificationQueueingMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() emm.On("RecordBlockHashProcessMetrics", mock.Anything, mock.Anything).Maybe() @@ -52,11 +54,35 @@ func newTestBlockConfirmationManagerCustomConfig() (*blockConfirmationManager, * emm.On("RecordConfirmationMetrics", mock.Anything, mock.Anything).Maybe() emm.On("RecordBlockHashQueueingMetrics", mock.Anything, mock.Anything).Maybe() emm.On("RecordBlockHashBatchSizeMetric", mock.Anything, mock.Anything).Maybe() + bcm := NewBlockConfirmationManager(context.Background(), mca, "ut", emm).(*blockConfirmationManager) bcm.receiptChecker = newReceiptChecker(bcm, 0, emm) // no workers, but non-nil return bcm, mca } +func newTestBlockConfirmationManagerHeadBlockNumber() (*blockConfirmationManager, *ffcapimocks.API) { + tmconfig.Reset() + config.Set(tmconfig.ConfirmationsRequired, 3) + config.Set(tmconfig.ConfirmationsNotificationQueueLength, 10) + config.Set(tmconfig.ConfirmationsReceiptWorkers, 0) + config.Set(tmconfig.ConfirmationsFetchReceiptUponEntry, false) + logrus.SetLevel(logrus.DebugLevel) + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeHeadBlockNumber).Maybe() + emm := &metricsmocks.EventMetricsEmitter{} + emm.On("RecordNotificationQueueingMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() + emm.On("RecordBlockHashProcessMetrics", mock.Anything, mock.Anything).Maybe() + emm.On("RecordNotificationProcessMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() + emm.On("RecordReceiptCheckMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() + emm.On("RecordReceiptMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() + emm.On("RecordConfirmationMetrics", mock.Anything, mock.Anything).Maybe() + emm.On("RecordBlockHashQueueingMetrics", mock.Anything, mock.Anything).Maybe() + emm.On("RecordBlockHashBatchSizeMetric", mock.Anything, mock.Anything).Maybe() + + bcm := NewBlockConfirmationManager(context.Background(), mca, "ut", emm).(*blockConfirmationManager) + return bcm, mca +} + func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { bcm, mca := newTestBlockConfirmationManager() @@ -76,7 +102,7 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { } // First poll for changes gives nothing, but we load up the event at this point for the next round - blockHashes := bcm.NewBlockHashes() + blockHashes := bcm.GetReceiveChannel() // Next time round gives a block that is in the confirmation chain, but one block ahead block1003 := &apitypes.BlockInfo{ @@ -131,6 +157,7 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { // Then we should walk the chain by number to fill in 1003, because our HWM is 1003. mca.On("BlockInfoByNumber", mock.Anything, mock.MatchedBy(func(r *ffcapi.BlockInfoByNumberRequest) bool { + fmt.Println("BlockInfoByNumber", r.BlockNumber.Uint64()) return r.BlockNumber.Uint64() == 1003 })).Run(func(args mock.Arguments) { blockHashes <- &ffcapi.BlockHashEvent{ @@ -209,7 +236,7 @@ func TestBlockConfirmationManagerE2EFork(t *testing.T) { ParentHash: "0x64fd8179b80dd255d52ce60d7f265c0506be810e2f3df52463fadeb44bb4d2df", } - blockHashes := bcm.NewBlockHashes() + blockHashes := bcm.GetReceiveChannel() blockHashes <- &ffcapi.BlockHashEvent{ BlockHashes: []string{ block1002.BlockHash, @@ -357,7 +384,7 @@ func TestBlockConfirmationManagerE2EForkReNotifyConfirmations(t *testing.T) { ParentHash: "0x64fd8179b80dd255d52ce60d7f265c0506be810e2f3df52463fadeb44bb4d2df", } - blockHashes := bcm.NewBlockHashes() + blockHashes := bcm.GetReceiveChannel() // Have the event notification in flight from the beginning err := bcm.Notify(&Notification{ @@ -500,7 +527,7 @@ func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) { } // The next filter gives us 1002a, which will later be removed - blockHashes := bcm.NewBlockHashes() + blockHashes := bcm.GetReceiveChannel() // First check while walking the chain does not yield a block mca.On("BlockInfoByNumber", mock.Anything, mock.MatchedBy(func(r *ffcapi.BlockInfoByNumberRequest) bool { @@ -921,7 +948,7 @@ func TestConfirmationsFailWalkChainAfterBlockGap(t *testing.T) { assert.NoError(t, err) mca.On("BlockInfoByNumber", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReasonNotFound, fmt.Errorf("not found")).Run(func(args mock.Arguments) { - bcm.NewBlockHashes() <- &ffcapi.BlockHashEvent{ + bcm.GetReceiveChannel() <- &ffcapi.BlockHashEvent{ GapPotential: true, } }).Once() @@ -1305,3 +1332,493 @@ func TestBlockState(t *testing.T) { mca.AssertExpectations(t) } + +func TestBlockConfirmationManagerHeadBlockNumberConfirmsEvent(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Return(&ffcapi.TransactionReceiptResponse{ + TransactionReceiptResponseBase: ffcapi.TransactionReceiptResponseBase{ + BlockNumber: fftypes.NewFFBigInt(1001), + BlockHash: blockHash, + }, + }, ffcapi.ErrorReason(""), nil).Once() + + bcm.Start() + blockEvents := bcm.GetReceiveChannel() + + blockEvents <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + + err := bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + }) + assert.NoError(t, err) + + n0 := <-confirmed + assert.False(t, n0.Confirmed) + assert.False(t, n0.NewFork) + assert.Equal(t, uint64(0), n0.ActualConfirmationCount) + assert.Equal(t, uint64(3), n0.TargetConfirmationCount) + assert.Empty(t, n0.Confirmations) + + blockEvents <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + + n1 := <-confirmed + assert.True(t, n1.Confirmed) + assert.False(t, n1.NewFork) + assert.Equal(t, uint64(3), n1.ActualConfirmationCount) + assert.Equal(t, uint64(3), n1.TargetConfirmationCount) + assert.Empty(t, n1.Confirmations) + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberNewForkOnHeadDrop(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + bcm.Start() + ch := bcm.GetReceiveChannel() + + // Prime head so this iteration completes before Notify is queued; otherwise the select + // may handle Notify first while head is still zero (same pattern as TestBlockConfirmationManagerHeadBlockNumberConfirmsEvent). + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + + err := bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + }) + assert.NoError(t, err) + + n0 := <-confirmed + assert.False(t, n0.Confirmed) + assert.False(t, n0.NewFork) + assert.Equal(t, uint64(0), n0.ActualConfirmationCount) + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1003} + + n1 := <-confirmed + assert.False(t, n1.Confirmed) + assert.False(t, n1.NewFork) + assert.Equal(t, uint64(2), n1.ActualConfirmationCount) + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1002} + + n2 := <-confirmed + assert.False(t, n2.Confirmed) + assert.True(t, n2.NewFork) + assert.Equal(t, uint64(1), n2.ActualConfirmationCount) + + // validate confirmation for the new head block + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Return(&ffcapi.TransactionReceiptResponse{ + TransactionReceiptResponseBase: ffcapi.TransactionReceiptResponseBase{ + BlockNumber: fftypes.NewFFBigInt(1001), + BlockHash: blockHash, + }, + }, ffcapi.ErrorReason(""), nil).Once() + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 2002} + n3 := <-confirmed + assert.True(t, n3.Confirmed) + assert.False(t, n3.NewFork) + assert.Equal(t, uint64(3), n3.ActualConfirmationCount) + assert.Equal(t, uint64(3), n3.TargetConfirmationCount) + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptNotFoundReschedules(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + receiptChecked := make(chan struct{}, 1) + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Run(func(mock.Arguments) { + receiptChecked <- struct{}{} + }).Return(nil, ffcapi.ErrorReasonNotFound, errors.New("not found")).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + select { + case <-receiptChecked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for TransactionReceipt") + } + + bcm.pendingMux.Lock() + var cleared *pendingItem + for _, pi := range bcm.pending { + cleared = pi + break + } + bcm.pendingMux.Unlock() + assert.NotNil(t, cleared) + assert.Equal(t, "", cleared.blockHash) + assert.Equal(t, uint64(0), cleared.blockNumber) + assert.Nil(t, cleared.previousConfirmationCount) + assert.Equal(t, 1, bcm.receiptChecker.entries.Len()) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptMissingBlockHash(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + receiptChecked := make(chan struct{}, 1) + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Run(func(mock.Arguments) { + receiptChecked <- struct{}{} + }).Return(&ffcapi.TransactionReceiptResponse{ + TransactionReceiptResponseBase: ffcapi.TransactionReceiptResponseBase{ + BlockNumber: fftypes.NewFFBigInt(1001), + BlockHash: "", + }, + }, ffcapi.ErrorReason(""), nil).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + select { + case <-receiptChecked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for TransactionReceipt") + } + + pendingKey := (&Notification{NotificationType: NewEventLog, Event: eventToConfirm}).eventPendingItem().getKey() + bcm.pendingMux.Lock() + p := bcm.pending[pendingKey] + bcm.pendingMux.Unlock() + assert.NotNil(t, p) + assert.Equal(t, blockHash, p.blockHash) + assert.Equal(t, uint64(1001), p.blockNumber) + assert.Equal(t, 0, bcm.receiptChecker.entries.Len()) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptNilResponse(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + receiptChecked := make(chan struct{}, 1) + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Run(func(mock.Arguments) { + receiptChecked <- struct{}{} + }).Return(nil, ffcapi.ErrorReason(""), nil).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + select { + case <-receiptChecked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for TransactionReceipt") + } + + pendingKey := (&Notification{NotificationType: NewEventLog, Event: eventToConfirm}).eventPendingItem().getKey() + bcm.pendingMux.Lock() + p := bcm.pending[pendingKey] + bcm.pendingMux.Unlock() + assert.NotNil(t, p) + assert.Equal(t, blockHash, p.blockHash) + assert.Equal(t, 0, bcm.receiptChecker.entries.Len()) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptBlockHashMismatch(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + receiptHash := "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + receiptChecked := make(chan struct{}, 1) + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Run(func(mock.Arguments) { + receiptChecked <- struct{}{} + }).Return(&ffcapi.TransactionReceiptResponse{ + TransactionReceiptResponseBase: ffcapi.TransactionReceiptResponseBase{ + BlockNumber: fftypes.NewFFBigInt(1005), + BlockHash: receiptHash, + }, + }, ffcapi.ErrorReason(""), nil).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + select { + case <-receiptChecked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for TransactionReceipt") + } + + pendingKey := (&Notification{NotificationType: NewEventLog, Event: eventToConfirm}).eventPendingItem().getKey() + bcm.pendingMux.Lock() + p := bcm.pending[pendingKey] + bcm.pendingMux.Unlock() + assert.NotNil(t, p) + assert.Equal(t, receiptHash, p.blockHash) + assert.Equal(t, uint64(1005), p.blockNumber) + assert.Nil(t, p.previousConfirmationCount) + assert.Equal(t, 0, bcm.receiptChecker.entries.Len()) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptOtherErrorNoReschedule(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + receiptChecked := make(chan struct{}, 1) + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Run(func(mock.Arguments) { + receiptChecked <- struct{}{} + }).Return(nil, ffcapi.ErrorReason(""), errors.New("rpc unavailable")).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + select { + case <-receiptChecked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for TransactionReceipt") + } + + pendingKey := (&Notification{NotificationType: NewEventLog, Event: eventToConfirm}).eventPendingItem().getKey() + bcm.pendingMux.Lock() + p := bcm.pending[pendingKey] + bcm.pendingMux.Unlock() + assert.NotNil(t, p) + assert.Equal(t, blockHash, p.blockHash) + assert.Equal(t, uint64(1001), p.blockNumber) + assert.Equal(t, 0, bcm.receiptChecker.entries.Len()) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + mca.AssertExpectations(t) +} + +// TestBlockConfirmationManagerHeadBlockNumberNoOpWithoutReceipt covers confirmationCheckUsingHighestBlock +// when a pending transaction has no block hash yet (no receipt): head updates must not call dispatch or TransactionReceipt. +func TestBlockConfirmationManagerHeadBlockNumberNoOpWithoutReceipt(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewTransaction, + Transaction: &TransactionInfo{ + TransactionHash: txHash, + }, + })) + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1010} + + pendingKey := pendingKeyForTX(txHash) + assert.Eventually(t, func() bool { + bcm.pendingMux.Lock() + defer bcm.pendingMux.Unlock() + p := bcm.pending[pendingKey] + return p != nil && p.blockHash == "" && p.blockNumber == 0 + }, time.Second, 5*time.Millisecond) + + bcm.Stop() + mca.AssertExpectations(t) +} diff --git a/internal/confirmations/confirmed_block_listener.go b/internal/confirmations/confirmed_block_listener.go index 4d54408c..9e058200 100644 --- a/internal/confirmations/confirmed_block_listener.go +++ b/internal/confirmations/confirmed_block_listener.go @@ -57,7 +57,7 @@ type confirmedBlockListener struct { dispatcherTap chan struct{} eventStream chan<- *ffcapi.ListenerEvent connector ffcapi.API - requiredConfirmations int + requiredConfirmations uint64 retry *retry.Retry processorDone chan struct{} dispatcherDone chan struct{} @@ -69,6 +69,9 @@ func (bcm *blockConfirmationManager) StartConfirmedBlockListener(ctx context.Con } func (bcm *blockConfirmationManager) startConfirmedBlockListener(fgCtx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) (cbl *confirmedBlockListener, err error) { + if bcm.blockListenerTrackingMode == ffcapi.BlockListenerTrackingModeHeadBlockNumber { + return nil, i18n.NewError(fgCtx, tmmsgs.MsgConfirmedBlockListenerUnsupportedMode, bcm.blockListenerTrackingMode) + } cbl = &confirmedBlockListener{ bcm: bcm, // We need our own listener for each confirmed block stream, and the bcm has to fan out @@ -346,7 +349,7 @@ func (cbl *confirmedBlockListener) dispatchAllConfirmed() { for { var toDispatch *ffcapi.ListenerEvent cbl.stateLock.Lock() - if len(cbl.blocksSinceCheckpoint) > cbl.requiredConfirmations { + if uint64(len(cbl.blocksSinceCheckpoint)) > cbl.requiredConfirmations { block := cbl.blocksSinceCheckpoint[0] // don't want memory to grow indefinitely by shifting right, so we create a new slice here cbl.blocksSinceCheckpoint = append([]*apitypes.BlockInfo{}, cbl.blocksSinceCheckpoint[1:]...) diff --git a/internal/confirmations/confirmed_block_listener_test.go b/internal/confirmations/confirmed_block_listener_test.go index 65164e87..2c0b698f 100644 --- a/internal/confirmations/confirmed_block_listener_test.go +++ b/internal/confirmations/confirmed_block_listener_test.go @@ -45,7 +45,7 @@ func TestCBLCatchUpToHeadFromZeroNoConfirmations(t *testing.T) { cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch) assert.NoError(t, err) - for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ { + for i := 0; i < len(blocks)-int(bcm.requiredConfirmations); i++ { b := <-esDispatch assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) } @@ -73,13 +73,13 @@ func TestCBLCatchUpToHeadFromZeroWithConfirmations(t *testing.T) { cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch) assert.NoError(t, err) - for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ { + for i := 0; i < len(blocks)-int(bcm.requiredConfirmations); i++ { b := <-esDispatch assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) } time.Sleep(1 * time.Millisecond) - assert.Len(t, cbl.blocksSinceCheckpoint, bcm.requiredConfirmations) + assert.Len(t, cbl.blocksSinceCheckpoint, int(bcm.requiredConfirmations)) select { case <-esDispatch: assert.Fail(t, "should not have received block in confirmation window") @@ -120,14 +120,14 @@ func TestCBLListenFromCurrentBlock(t *testing.T) { BlockHashes: []string{blocks[1].BlockHash}, }) - for i := 5; i < len(blocks)-bcm.requiredConfirmations; i++ { + for i := 5; i < len(blocks)-int(bcm.requiredConfirmations); i++ { b := <-esDispatch assert.Equal(t, b.BlockEvent.BlockNumber, blocks[i].BlockNumber) assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) } time.Sleep(1 * time.Millisecond) - assert.Len(t, cbl.blocksSinceCheckpoint, bcm.requiredConfirmations) + assert.Len(t, cbl.blocksSinceCheckpoint, int(bcm.requiredConfirmations)) select { case <-esDispatch: assert.Fail(t, "should not have received block in confirmation window") @@ -248,11 +248,11 @@ func testCBLHandleReorgInConfirmationWindow(t *testing.T, blockLenBeforeReorg, o mbiHash := mca.On("BlockInfoByHash", mock.Anything, mock.Anything) mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocksAfterReorg) }) - bcm.requiredConfirmations = reqConf + bcm.requiredConfirmations = uint64(reqConf) cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch) assert.NoError(t, err) - for i := 0; i < len(blocksAfterReorg)-bcm.requiredConfirmations; i++ { + for i := 0; i < len(blocksAfterReorg)-int(bcm.requiredConfirmations); i++ { b := <-esDispatch dangerArea := len(blocksAfterReorg) - overlap if i >= overlap && i < (dangerArea-reqConf) { @@ -265,7 +265,7 @@ func testCBLHandleReorgInConfirmationWindow(t *testing.T, blockLenBeforeReorg, o } time.Sleep(1 * time.Millisecond) - assert.LessOrEqual(t, len(cbl.blocksSinceCheckpoint), bcm.requiredConfirmations) + assert.LessOrEqual(t, len(cbl.blocksSinceCheckpoint), int(bcm.requiredConfirmations)) select { case b := <-esDispatch: assert.Fail(t, fmt.Sprintf("should not have received block in confirmation window: %d/%s", b.BlockEvent.BlockNumber.Int64(), b.BlockEvent.BlockHash)) @@ -318,7 +318,7 @@ func TestCBLHandleRandomConflictingBlockNotification(t *testing.T) { assert.NoError(t, err) cbl.requiredConfirmations = 5 - for i := 0; i < len(blocks)-cbl.requiredConfirmations; i++ { + for i := 0; i < len(blocks)-int(cbl.requiredConfirmations); i++ { b := <-esDispatch assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) } diff --git a/internal/events/eventstream_test.go b/internal/events/eventstream_test.go index 2ea735e7..33e99400 100644 --- a/internal/events/eventstream_test.go +++ b/internal/events/eventstream_test.go @@ -70,7 +70,8 @@ func testESConf(t *testing.T, j string) (spec *apitypes.EventStream) { func newTestEventStream(t *testing.T, conf string) (es *eventStream) { tmconfig.Reset() - es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf) + mca := &ffcapimocks.API{} + es, err := newTestEventStreamWithListener(t, mca, conf) assert.NoError(t, err) return es } @@ -90,7 +91,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str tmconfig.Reset() config.Set(tmconfig.EventStreamsDefaultsBatchTimeout, "1us") InitDefaults() - + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() ees, err := NewEventStream(context.Background(), testESConf(t, conf), mfc, &persistencemocks.Persistence{}, @@ -131,8 +132,10 @@ func TestNewTestEventStreamMissingID(t *testing.T) { tmconfig.Reset() InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() _, err := NewEventStream(context.Background(), &apitypes.EventStream{}, - &ffcapimocks.API{}, + mca, &persistencemocks.Persistence{}, &wsmocks.WebSocketChannels{}, []*apitypes.Listener{}, @@ -145,8 +148,10 @@ func TestNewTestEventStreamBadConfig(t *testing.T) { tmconfig.Reset() InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() _, err := NewEventStream(context.Background(), testESConf(t, `{}`), - &ffcapimocks.API{}, + mca, &persistencemocks.Persistence{}, &wsmocks.WebSocketChannels{}, []*apitypes.Listener{}, @@ -544,6 +549,7 @@ func TestAPIManagedEventStreamMissingListenerIDs(t *testing.T) { InitDefaults() mfc := &ffcapimocks.API{} + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() // Checkpoints are commonly tied to individual listeners, so it is critical that the caller manages // deterministically the IDs of the listeners passed in. They cannot be empty (an error will be returned) _, err := NewAPIManagedEventStream(context.Background(), @@ -571,6 +577,7 @@ func TestAPIManagedEventStreamE2E(t *testing.T) { } mfc := &ffcapimocks.API{} + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() ees, err := NewAPIManagedEventStream(context.Background(), testESConf(t, `{}`), mfc, @@ -750,7 +757,7 @@ func TestWebhookEventStreamsE2EAddAfterStart(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.MatchedBy(func(req *ffcapi.EventListenerVerifyOptionsRequest) bool { return req.FromBlock == "12345" && req.Options.JSONObject().GetString("option1") == "value1" })).Return(&ffcapi.EventListenerVerifyOptionsResponse{ @@ -858,7 +865,6 @@ func TestStartWithExistingStreamOk(t *testing.T) { } mfc := &ffcapimocks.API{} - mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) _, err := newTestEventStreamWithListener(t, mfc, `{ @@ -878,7 +884,6 @@ func TestStartWithExistingStreamFail(t *testing.T) { } mfc := &ffcapimocks.API{} - mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), fmt.Errorf("pop")) _, err := newTestEventStreamWithListener(t, mfc, `{ @@ -903,6 +908,7 @@ func TestUpdateStreamStarted(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -967,6 +973,7 @@ func TestAddRemoveListener(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1024,6 +1031,7 @@ func TestUpdateListenerAndDeleteStarted(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1095,6 +1103,7 @@ func TestUpdateListenerFail(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1175,6 +1184,7 @@ func TestUpdateStreamRestartFail(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1234,6 +1244,7 @@ func TestUpdateAttemptChangeSignature(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{ ResolvedSignature: "sig1", }, ffcapi.ErrorReason(""), nil).Once() @@ -1399,7 +1410,7 @@ func TestAttemptResetNonExistentListener(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) - + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) l := &apitypes.Listener{ @@ -1428,7 +1439,7 @@ func TestUpdateStreamStopFail(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1486,6 +1497,7 @@ func TestResetListenerRestartFail(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1538,6 +1550,7 @@ func TestResetListenerWriteCheckpointFail(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) msp := es.checkpointsDB.(*persistencemocks.Persistence) @@ -1586,7 +1599,7 @@ func TestWebSocketBroadcastActionCloseDuringCheckpoint(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1661,6 +1674,7 @@ func TestActionRetryOk(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -1712,6 +1726,7 @@ func TestActionRetrySkip(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -1754,6 +1769,7 @@ func TestActionRetryBlock(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -1805,6 +1821,7 @@ func TestDeleteFail(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -2223,6 +2240,7 @@ func TestStartAPIEventStreamStartFail(t *testing.T) { } mfc := &ffcapimocks.API{} + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() ees, err := NewAPIManagedEventStream(context.Background(), testESConf(t, `{}`), mfc, @@ -2255,6 +2273,7 @@ func TestStartAPIEventStreamPollContextCancelled(t *testing.T) { } mfc := &ffcapimocks.API{} + mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() ees, err := NewAPIManagedEventStream(context.Background(), testESConf(t, `{}`), mfc, diff --git a/internal/tmmsgs/en_error_messages.go b/internal/tmmsgs/en_error_messages.go index 9fa2146e..5ceea0a9 100644 --- a/internal/tmmsgs/en_error_messages.go +++ b/internal/tmmsgs/en_error_messages.go @@ -109,4 +109,7 @@ var ( MsgStreamAPIManagedNameNoIDOrType = ffe("FF21092", "API managed streams must have a name, but no ID or type", http.StatusBadRequest) MsgUpdatePayloadEmpty = ffe("FF21093", "Update transaction must have a non-empty payload", http.StatusBadRequest) MsgTxHandlerUnsupportedFieldForUpdate = ffe("FF21094", "Update '%s' in the transaction is not supported by the transaction handler", http.StatusBadRequest) + MsgConfirmedBlockListenerUnsupportedMode = ffe("FF21095", "Confirmed block listeners is not supported when block listener tracking mode is '%s'", http.StatusBadRequest) + MsgTransactionReceiptMissingBlockHash = ffe("FF21096", "Transaction receipt missing block hash") + MsgTransactionReceiptBlockHashMismatch = ffe("FF21097", "Transaction receipt block hash mismatch. Expected %s, got %s") ) diff --git a/mocks/apiclientmocks/fftm_client.go b/mocks/apiclientmocks/fftm_client.go index 56b05cd4..e4db93b7 100644 --- a/mocks/apiclientmocks/fftm_client.go +++ b/mocks/apiclientmocks/fftm_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package apiclientmocks diff --git a/mocks/confirmationsmocks/manager.go b/mocks/confirmationsmocks/manager.go index 249dc4bc..f71b3a71 100644 --- a/mocks/confirmationsmocks/manager.go +++ b/mocks/confirmationsmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package confirmationsmocks @@ -37,12 +37,12 @@ func (_m *Manager) CheckInFlight(listenerID *fftypes.UUID) bool { return r0 } -// NewBlockHashes provides a mock function with no fields -func (_m *Manager) NewBlockHashes() chan<- *ffcapi.BlockHashEvent { +// GetReceiveChannel provides a mock function with no fields +func (_m *Manager) GetReceiveChannel() chan<- *ffcapi.BlockHashEvent { ret := _m.Called() if len(ret) == 0 { - panic("no return value specified for NewBlockHashes") + panic("no return value specified for GetReceiveChannel") } var r0 chan<- *ffcapi.BlockHashEvent diff --git a/mocks/eventsmocks/stream.go b/mocks/eventsmocks/stream.go index e41536bc..db0c72f1 100644 --- a/mocks/eventsmocks/stream.go +++ b/mocks/eventsmocks/stream.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package eventsmocks diff --git a/mocks/ffcapimocks/api.go b/mocks/ffcapimocks/api.go index aa767de6..d283dcc2 100644 --- a/mocks/ffcapimocks/api.go +++ b/mocks/ffcapimocks/api.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package ffcapimocks @@ -478,6 +478,24 @@ func (_m *API) GasPriceEstimate(ctx context.Context, req *ffcapi.GasPriceEstimat return r0, r1, r2 } +// GetBlockListenerTrackingMode provides a mock function with given fields: ctx +func (_m *API) GetBlockListenerTrackingMode(ctx context.Context) ffcapi.BlockListenerTrackingMode { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetBlockListenerTrackingMode") + } + + var r0 ffcapi.BlockListenerTrackingMode + if rf, ok := ret.Get(0).(func(context.Context) ffcapi.BlockListenerTrackingMode); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(ffcapi.BlockListenerTrackingMode) + } + + return r0 +} + // IsLive provides a mock function with given fields: ctx func (_m *API) IsLive(ctx context.Context) (*ffcapi.LiveResponse, ffcapi.ErrorReason, error) { ret := _m.Called(ctx) diff --git a/mocks/metricsmocks/event_metrics_emitter.go b/mocks/metricsmocks/event_metrics_emitter.go index 94c0cf19..eb92f91e 100644 --- a/mocks/metricsmocks/event_metrics_emitter.go +++ b/mocks/metricsmocks/event_metrics_emitter.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package metricsmocks diff --git a/mocks/metricsmocks/transaction_handler_metrics.go b/mocks/metricsmocks/transaction_handler_metrics.go index a4102a7c..f6033653 100644 --- a/mocks/metricsmocks/transaction_handler_metrics.go +++ b/mocks/metricsmocks/transaction_handler_metrics.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package metricsmocks diff --git a/mocks/persistencemocks/persistence.go b/mocks/persistencemocks/persistence.go index 64fbb461..78e07958 100644 --- a/mocks/persistencemocks/persistence.go +++ b/mocks/persistencemocks/persistence.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package persistencemocks diff --git a/mocks/persistencemocks/rich_query.go b/mocks/persistencemocks/rich_query.go index 9253d685..5cd5f614 100644 --- a/mocks/persistencemocks/rich_query.go +++ b/mocks/persistencemocks/rich_query.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package persistencemocks diff --git a/mocks/persistencemocks/transaction_persistence.go b/mocks/persistencemocks/transaction_persistence.go index bb630875..8e277c23 100644 --- a/mocks/persistencemocks/transaction_persistence.go +++ b/mocks/persistencemocks/transaction_persistence.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package persistencemocks diff --git a/mocks/txhandlermocks/managed_tx_event_handler.go b/mocks/txhandlermocks/managed_tx_event_handler.go index 9206d4c3..0f1f2183 100644 --- a/mocks/txhandlermocks/managed_tx_event_handler.go +++ b/mocks/txhandlermocks/managed_tx_event_handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package txhandlermocks diff --git a/mocks/txhandlermocks/transaction_handler.go b/mocks/txhandlermocks/transaction_handler.go index d61bef85..f743cac5 100644 --- a/mocks/txhandlermocks/transaction_handler.go +++ b/mocks/txhandlermocks/transaction_handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package txhandlermocks diff --git a/mocks/wsmocks/web_socket_channels.go b/mocks/wsmocks/web_socket_channels.go index c16d00f4..825c5799 100644 --- a/mocks/wsmocks/web_socket_channels.go +++ b/mocks/wsmocks/web_socket_channels.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package wsmocks diff --git a/mocks/wsmocks/web_socket_server.go b/mocks/wsmocks/web_socket_server.go index fdaf1d68..7bd01348 100644 --- a/mocks/wsmocks/web_socket_server.go +++ b/mocks/wsmocks/web_socket_server.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package wsmocks diff --git a/pkg/apitypes/api_types.go b/pkg/apitypes/api_types.go index 2d46e54c..a2944721 100644 --- a/pkg/apitypes/api_types.go +++ b/pkg/apitypes/api_types.go @@ -437,8 +437,14 @@ type ConfirmationsNotification struct { // NewFork is true when NewConfirmations is a complete list of confirmations. // Otherwise, Confirmations is an additive delta on top of a previous list of confirmations. NewFork bool - // Confirmations is the list of confirmations being notified - assured to be non-nil, but might be empty. + // Confirmations is the list of confirmations being notified - assured to be non-nil. + // The list will only be populated if confirmation mode is set to validatedBlocks. Confirmations []*Confirmation + // ActualConfirmationCount is the actual number of confirmations for the transaction + ActualConfirmationCount uint64 `json:"actualConfirmationCount"` + + // TargetConfirmationCount is the target number of confirmations to confirm the transaction + TargetConfirmationCount uint64 `json:"targetConfirmationCount"` } type Confirmation struct { diff --git a/pkg/ffcapi/api.go b/pkg/ffcapi/api.go index 88c93953..f6c3a70a 100644 --- a/pkg/ffcapi/api.go +++ b/pkg/ffcapi/api.go @@ -61,6 +61,9 @@ type API interface { // DeployContractPrepare DeployContractPrepare(ctx context.Context, req *ContractDeployPrepareRequest) (*TransactionPrepareResponse, ErrorReason, error) + // GetBlockListenerTrackingMode gets the tracking mode of the block listener + GetBlockListenerTrackingMode(ctx context.Context) BlockListenerTrackingMode + // EventStreamStart starts an event stream with an initial set of listeners (which might be empty), a channel to deliver events, and a context that will close to stop the stream EventStreamStart(ctx context.Context, req *EventStreamStartRequest) (*EventStreamStartResponse, ErrorReason, error) @@ -95,6 +98,15 @@ type API interface { IsReady(ctx context.Context) (*ReadyResponse, ErrorReason, error) } +type BlockListenerTrackingMode string + +const ( + // BlockListenerTrackingModeHeadBlockNumber in this mode, the block listener tracks the head block number of the blockchain only without fetching any block details + BlockListenerTrackingModeHeadBlockNumber BlockListenerTrackingMode = "headBlockNumber" + // BlockListenerTrackingModeInMemoryPartialChain in this mode, the block listener tracks the in-memory partial chain of blocks, fetching block details as needed + BlockListenerTrackingModeInMemoryPartialChain BlockListenerTrackingMode = "inMemoryPartialChain" +) + type ConfirmationUpdateResult struct { // a linked list of accumulated confirmations for a transaction // the list is sorted by block number @@ -104,7 +116,9 @@ type ConfirmationUpdateResult struct { // in the in-memory partial chain // WARNING: mutation to this list is not expected, invalid modifications will cause inefficiencies in the reconciliation process // `rebuilt` will be true if an invalid confirmation list is detected by the reconciliation process - Confirmations []*MinimalBlockInfo `json:"confirmations,omitempty"` + Confirmations []*MinimalBlockInfo `json:"confirmations,omitempty"` // the current list of confirmations for this reconcile request, only returned when confirmation mode is set to validatedBlocks + ActualConfirmationCount uint64 `json:"actualConfirmationCount"` // the current number of confirmations for this reconcile request + Receipt *TransactionReceiptResponse `json:"receipt,omitempty"` // receipt for the transaction Rebuilt bool `json:"rebuilt,omitempty"` // when true, it means the existing confirmations contained invalid blocks, the new confirmations are rebuilt from scratch NewFork bool `json:"newFork,omitempty"` // when true, it means a new fork was detected based on the existing confirmations @@ -126,9 +140,10 @@ func (c *MinimalBlockInfo) IsParentOf(other *MinimalBlockInfo) bool { } type BlockHashEvent struct { - BlockHashes []string `json:"blockHash"` // zero or more hashes (can be nil) - GapPotential bool `json:"gapPotential,omitempty"` // when true, the caller cannot be sure if blocks have been missed (use on reconnect of a websocket for example) - Created *fftypes.FFTime `json:"created,omitempty"` // timestamp when the blockhash event is created + BlockHashes []string `json:"blockHash"` // zero or more hashes (can be nil) + GapPotential bool `json:"gapPotential,omitempty"` // when true, the caller cannot be sure if blocks have been missed (use on reconnect of a websocket for example) + Created *fftypes.FFTime `json:"created,omitempty"` // timestamp when the blockhash event is created + HeadBlockNumber uint64 `json:"headBlockNumber"` // the highest block seen by the connector } // EventID are the set of required fields an FFCAPI compatible connector needs to map to the underlying blockchain constructs, to uniquely identify an event diff --git a/pkg/fftm/manager_test.go b/pkg/fftm/manager_test.go index 7e91ff66..0fa4657d 100644 --- a/pkg/fftm/manager_test.go +++ b/pkg/fftm/manager_test.go @@ -82,6 +82,7 @@ func newTestManager(t *testing.T) (string, *manager, func()) { mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mm, err := NewManager(context.Background(), mca) assert.NoError(t, err) @@ -105,7 +106,7 @@ func newTestManagerMockNoRichDB(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t) mca := &ffcapimocks.API{} - + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() m := newManager(context.Background(), mca) mpm := &persistencemocks.Persistence{} @@ -133,7 +134,7 @@ func newTestManagerMockRichDB(t *testing.T) (string, *manager, *persistencemocks url := testManagerCommonInit(t) mca := &ffcapimocks.API{} - + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() m := newManager(context.Background(), mca) mpm := &persistencemocks.Persistence{} @@ -179,6 +180,7 @@ func newTestManagerWithMetrics(t *testing.T, deprecated bool) (string, *manager, mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() mm, err := NewManager(context.Background(), mca) assert.NoError(t, err) @@ -198,7 +200,9 @@ func newTestManagerMockPersistence(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t) - m := newManager(context.Background(), &ffcapimocks.API{}) + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + m := newManager(context.Background(), mca) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() m.persistence = mp @@ -241,8 +245,9 @@ func TestNewManagerWithLegacyConfiguration(t *testing.T) { tmconfig.APIConfig.Set(httpserver.HTTPConfAddress, "127.0.0.1") tmconfig.DeprecatedPolicyEngineBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - - m := newManager(context.Background(), &ffcapimocks.API{}) + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + m := newManager(context.Background(), mca) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() m.persistence = mp @@ -262,7 +267,9 @@ func TestNewManagerBadHttpConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + _, err := NewManager(context.Background(), mca) assert.Error(t, err) assert.Regexp(t, "FF00151", err) @@ -281,7 +288,9 @@ func TestNewManagerBadLevelDBConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err = NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + _, err = NewManager(context.Background(), mca) assert.Regexp(t, "FF21049", err) } @@ -295,7 +304,9 @@ func TestNewManagerBadPersistenceConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + _, err := NewManager(context.Background(), mca) assert.Regexp(t, "FF21043", err) } @@ -307,7 +318,9 @@ func TestNewManagerInvalidTransactionHandlerName(t *testing.T) { config.Set(tmconfig.PersistenceLevelDBPath, dir) config.Set(tmconfig.TransactionsHandlerName, "wrong") - _, err := NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + _, err := NewManager(context.Background(), mca) assert.Regexp(t, "FF21070", err) } @@ -351,7 +364,9 @@ func TestNewManagerWithMetricsBadConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + _, err := NewManager(context.Background(), mca) assert.Error(t, err) assert.Regexp(t, "FF00151", err) } From 3c7e067ab4754297e301dc32d0003f31f10a247d Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 22 Apr 2026 11:31:31 +0100 Subject: [PATCH 2/7] pass over confirmation counters in both mode Signed-off-by: Chengxuan Xing --- internal/confirmations/confirmations.go | 13 ++++++++++--- internal/confirmations/confirmations_test.go | 4 ++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index 21593898..f63bd4f4 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -682,14 +682,21 @@ func (bcm *blockConfirmationManager) dispatchConfirmations(item *pendingItem) { } } + confirmationCount := uint64(len(item.confirmations)) + if confirmationCount > bcm.requiredConfirmations { + confirmationCount = bcm.requiredConfirmations + } + // Possible for us to re-dispatch the same confirmations, if we are notified about a block later after // after we previously did a crawl for blocks. // So we protect here against dispatching an empty array if len(notificationConfirmations) > 0 || item.confirmed { notification := &apitypes.ConfirmationsNotification{ - Confirmed: item.confirmed, - NewFork: newFork, - Confirmations: notificationConfirmations, + Confirmed: item.confirmed, + ActualConfirmationCount: confirmationCount, + TargetConfirmationCount: bcm.requiredConfirmations, + NewFork: newFork, + Confirmations: notificationConfirmations, } // Take a copy of the notification confirmations so we know what we have previously notified next time round // (not safe to keep a reference, in case it's modified by the callback). diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index 9b3e6496..7d7a029c 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -190,6 +190,8 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { apitypes.ConfirmationFromBlock(block1002), apitypes.ConfirmationFromBlock(block1003), }, dispatched.Confirmations) + assert.Equal(t, uint64(2), dispatched.ActualConfirmationCount) + assert.Equal(t, uint64(3), dispatched.TargetConfirmationCount) assert.True(t, dispatched.NewFork) assert.False(t, dispatched.Confirmed) @@ -198,6 +200,8 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { assert.Equal(t, []*apitypes.Confirmation{ apitypes.ConfirmationFromBlock(block1004), }, dispatched.Confirmations) + assert.Equal(t, uint64(3), dispatched.ActualConfirmationCount) + assert.Equal(t, uint64(3), dispatched.TargetConfirmationCount) assert.False(t, dispatched.NewFork) assert.True(t, dispatched.Confirmed) From d08c82a8967b8b057788008d156cf48406eed3c0 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 22 Apr 2026 12:52:57 +0100 Subject: [PATCH 3/7] renaming Signed-off-by: Chengxuan Xing --- internal/confirmations/confirmations.go | 28 ++++++++++---------- internal/confirmations/confirmations_test.go | 16 +++++------ pkg/apitypes/api_types.go | 4 +-- pkg/ffcapi/api.go | 4 +-- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index f63bd4f4..4115a65e 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -156,7 +156,7 @@ type pendingItem struct { confirmations []*apitypes.Confirmation scheduledAtLeastOnce bool confirmed bool - previousConfirmationCount *uint64 // headBlockNumber mode: last dispatched ActualConfirmationCount + previousConfirmationCount *uint64 // headBlockNumber mode: last dispatched CurrentConfirmationCount queuedStale *list.Element // protected by receiptChecker mux lastReceiptCheck time.Time // protected by receiptChecker mux receiptCallback func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse) @@ -692,11 +692,11 @@ func (bcm *blockConfirmationManager) dispatchConfirmations(item *pendingItem) { // So we protect here against dispatching an empty array if len(notificationConfirmations) > 0 || item.confirmed { notification := &apitypes.ConfirmationsNotification{ - Confirmed: item.confirmed, - ActualConfirmationCount: confirmationCount, - TargetConfirmationCount: bcm.requiredConfirmations, - NewFork: newFork, - Confirmations: notificationConfirmations, + Confirmed: item.confirmed, + CurrentConfirmationCount: confirmationCount, + TargetConfirmationCount: bcm.requiredConfirmations, + NewFork: newFork, + Confirmations: notificationConfirmations, } // Take a copy of the notification confirmations so we know what we have previously notified next time round // (not safe to keep a reference, in case it's modified by the callback). @@ -859,12 +859,12 @@ func (bcm *blockConfirmationManager) confirmationCheckUsingHighestBlock(pending return nil } - actualConfirmationCount := uint64(0) + currentConfirmationCount := uint64(0) if bcm.headBlockNumber > pending.blockNumber { - actualConfirmationCount = bcm.headBlockNumber - pending.blockNumber + currentConfirmationCount = bcm.headBlockNumber - pending.blockNumber } - return bcm.dispatchBlockHeightConfirmations(pending, actualConfirmationCount) + return bcm.dispatchBlockHeightConfirmations(pending, currentConfirmationCount) } @@ -909,15 +909,15 @@ func (bcm *blockConfirmationManager) dispatchBlockHeightConfirmations(pending *p // when not confirmed, we assume the receipt is still valid and return the actual confirmation count notification := &apitypes.ConfirmationsNotification{ - Confirmed: confirmed, - NewFork: pending.previousConfirmationCount != nil && count < *pending.previousConfirmationCount, // the only fork situation we can detect is a reduction in confirmation count - ActualConfirmationCount: confirmationCount, // NOTE: we returns the actual confirmation count, which can be higher than the required confirmation count - TargetConfirmationCount: bcm.requiredConfirmations, + Confirmed: confirmed, + NewFork: pending.previousConfirmationCount != nil && count < *pending.previousConfirmationCount, // the only fork situation we can detect is a reduction in confirmation count + CurrentConfirmationCount: confirmationCount, // NOTE: we returns the actual confirmation count, which can be higher than the required confirmation count + TargetConfirmationCount: bcm.requiredConfirmations, } pending.previousConfirmationCount = &count log.L(bcm.ctx).Infof("Block height confirmation notification item=%s confirmed=%t count=%d/%d newFork=%t", - pending.getKey(), notification.Confirmed, notification.ActualConfirmationCount, notification.TargetConfirmationCount, notification.NewFork) + pending.getKey(), notification.Confirmed, notification.CurrentConfirmationCount, notification.TargetConfirmationCount, notification.NewFork) pending.confirmationsCallback(bcm.ctx, notification) bcm.metricsEmitter.RecordConfirmationMetrics(bcm.ctx, time.Since(pending.added).Seconds()) diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index 7d7a029c..f56b8ca8 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -190,7 +190,7 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { apitypes.ConfirmationFromBlock(block1002), apitypes.ConfirmationFromBlock(block1003), }, dispatched.Confirmations) - assert.Equal(t, uint64(2), dispatched.ActualConfirmationCount) + assert.Equal(t, uint64(2), dispatched.CurrentConfirmationCount) assert.Equal(t, uint64(3), dispatched.TargetConfirmationCount) assert.True(t, dispatched.NewFork) assert.False(t, dispatched.Confirmed) @@ -200,7 +200,7 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { assert.Equal(t, []*apitypes.Confirmation{ apitypes.ConfirmationFromBlock(block1004), }, dispatched.Confirmations) - assert.Equal(t, uint64(3), dispatched.ActualConfirmationCount) + assert.Equal(t, uint64(3), dispatched.CurrentConfirmationCount) assert.Equal(t, uint64(3), dispatched.TargetConfirmationCount) assert.False(t, dispatched.NewFork) assert.True(t, dispatched.Confirmed) @@ -1380,7 +1380,7 @@ func TestBlockConfirmationManagerHeadBlockNumberConfirmsEvent(t *testing.T) { n0 := <-confirmed assert.False(t, n0.Confirmed) assert.False(t, n0.NewFork) - assert.Equal(t, uint64(0), n0.ActualConfirmationCount) + assert.Equal(t, uint64(0), n0.CurrentConfirmationCount) assert.Equal(t, uint64(3), n0.TargetConfirmationCount) assert.Empty(t, n0.Confirmations) @@ -1389,7 +1389,7 @@ func TestBlockConfirmationManagerHeadBlockNumberConfirmsEvent(t *testing.T) { n1 := <-confirmed assert.True(t, n1.Confirmed) assert.False(t, n1.NewFork) - assert.Equal(t, uint64(3), n1.ActualConfirmationCount) + assert.Equal(t, uint64(3), n1.CurrentConfirmationCount) assert.Equal(t, uint64(3), n1.TargetConfirmationCount) assert.Empty(t, n1.Confirmations) @@ -1433,21 +1433,21 @@ func TestBlockConfirmationManagerHeadBlockNumberNewForkOnHeadDrop(t *testing.T) n0 := <-confirmed assert.False(t, n0.Confirmed) assert.False(t, n0.NewFork) - assert.Equal(t, uint64(0), n0.ActualConfirmationCount) + assert.Equal(t, uint64(0), n0.CurrentConfirmationCount) ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1003} n1 := <-confirmed assert.False(t, n1.Confirmed) assert.False(t, n1.NewFork) - assert.Equal(t, uint64(2), n1.ActualConfirmationCount) + assert.Equal(t, uint64(2), n1.CurrentConfirmationCount) ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1002} n2 := <-confirmed assert.False(t, n2.Confirmed) assert.True(t, n2.NewFork) - assert.Equal(t, uint64(1), n2.ActualConfirmationCount) + assert.Equal(t, uint64(1), n2.CurrentConfirmationCount) // validate confirmation for the new head block mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { @@ -1463,7 +1463,7 @@ func TestBlockConfirmationManagerHeadBlockNumberNewForkOnHeadDrop(t *testing.T) n3 := <-confirmed assert.True(t, n3.Confirmed) assert.False(t, n3.NewFork) - assert.Equal(t, uint64(3), n3.ActualConfirmationCount) + assert.Equal(t, uint64(3), n3.CurrentConfirmationCount) assert.Equal(t, uint64(3), n3.TargetConfirmationCount) bcm.Stop() diff --git a/pkg/apitypes/api_types.go b/pkg/apitypes/api_types.go index a2944721..0f1e0ab5 100644 --- a/pkg/apitypes/api_types.go +++ b/pkg/apitypes/api_types.go @@ -440,8 +440,8 @@ type ConfirmationsNotification struct { // Confirmations is the list of confirmations being notified - assured to be non-nil. // The list will only be populated if confirmation mode is set to validatedBlocks. Confirmations []*Confirmation - // ActualConfirmationCount is the actual number of confirmations for the transaction - ActualConfirmationCount uint64 `json:"actualConfirmationCount"` + // CurrentConfirmationCount is the actual number of confirmations for the transaction + CurrentConfirmationCount uint64 `json:"currentConfirmationCount"` // TargetConfirmationCount is the target number of confirmations to confirm the transaction TargetConfirmationCount uint64 `json:"targetConfirmationCount"` diff --git a/pkg/ffcapi/api.go b/pkg/ffcapi/api.go index f6c3a70a..b315aebf 100644 --- a/pkg/ffcapi/api.go +++ b/pkg/ffcapi/api.go @@ -116,8 +116,8 @@ type ConfirmationUpdateResult struct { // in the in-memory partial chain // WARNING: mutation to this list is not expected, invalid modifications will cause inefficiencies in the reconciliation process // `rebuilt` will be true if an invalid confirmation list is detected by the reconciliation process - Confirmations []*MinimalBlockInfo `json:"confirmations,omitempty"` // the current list of confirmations for this reconcile request, only returned when confirmation mode is set to validatedBlocks - ActualConfirmationCount uint64 `json:"actualConfirmationCount"` // the current number of confirmations for this reconcile request + Confirmations []*MinimalBlockInfo `json:"confirmations,omitempty"` // the current list of confirmations for this reconcile request, only returned when confirmation mode is set to validatedBlocks + CurrentConfirmationCount uint64 `json:"currentConfirmationCount"` // the current number of confirmations for this reconcile request Receipt *TransactionReceiptResponse `json:"receipt,omitempty"` // receipt for the transaction Rebuilt bool `json:"rebuilt,omitempty"` // when true, it means the existing confirmations contained invalid blocks, the new confirmations are rebuilt from scratch From 05942546925412e18b5f19bce33fcae1afd453d8 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Mon, 27 Apr 2026 08:42:07 +0100 Subject: [PATCH 4/7] renaming Signed-off-by: Chengxuan Xing --- internal/confirmations/confirmations.go | 63 +++++++++---------- internal/confirmations/confirmations_test.go | 36 +++++------ .../confirmations/confirmed_block_listener.go | 4 +- internal/events/eventstream_test.go | 46 +++++++------- internal/tmmsgs/en_error_messages.go | 2 +- mocks/ffcapimocks/api.go | 12 ++-- pkg/apitypes/api_types.go | 2 +- pkg/ffcapi/api.go | 19 +++--- pkg/fftm/manager_test.go | 22 +++---- 9 files changed, 102 insertions(+), 104 deletions(-) diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index 4115a65e..389f64ac 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -87,27 +87,27 @@ type RemovedListenerInfo struct { } type blockConfirmationManager struct { - baseContext context.Context - ctx context.Context - cancelFunc func() - newBlockHashEvents chan *ffcapi.BlockHashEvent - connector ffcapi.API - blockListenerStale bool - metricsEmitter metrics.ConfirmationMetricsEmitter - requiredConfirmations uint64 - blockListenerTrackingMode ffcapi.BlockListenerTrackingMode - staleReceiptTimeout time.Duration - bcmNotifications chan *Notification - highestBlockSeen uint64 - headBlockNumber uint64 // used for confirmation when block listener is in headBlockNumber mode - pending map[string]*pendingItem - pendingMux sync.Mutex - receiptChecker *receiptChecker - retry *retry.Retry - cblLock sync.Mutex - cbls map[fftypes.UUID]*confirmedBlockListener - fetchReceiptUponEntry bool - done chan struct{} + baseContext context.Context + ctx context.Context + cancelFunc func() + newBlockHashEvents chan *ffcapi.BlockHashEvent + connector ffcapi.API + blockListenerStale bool + metricsEmitter metrics.ConfirmationMetricsEmitter + requiredConfirmations uint64 + chainTrackingMode ffcapi.ChainTrackingMode + staleReceiptTimeout time.Duration + bcmNotifications chan *Notification + highestBlockSeen uint64 + headBlockNumber uint64 // used for confirmation when block listener is in headBlockNumber mode + pending map[string]*pendingItem + pendingMux sync.Mutex + receiptChecker *receiptChecker + retry *retry.Retry + cblLock sync.Mutex + cbls map[fftypes.UUID]*confirmedBlockListener + fetchReceiptUponEntry bool + done chan struct{} } func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.API, desc string, @@ -118,15 +118,12 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A cbls: make(map[fftypes.UUID]*confirmedBlockListener), blockListenerStale: true, requiredConfirmations: config.GetUint64(tmconfig.ConfirmationsRequired), - // currently the confirmation result is driven by the block listener mode, - // because when the block listener is in the headBlockNumber mode, the block information is not fetched and cached by the connector - // Therefore, it will be super inefficient to build the in memory partial chain in confirmations manager - blockListenerTrackingMode: connector.GetBlockListenerTrackingMode(baseContext), - staleReceiptTimeout: config.GetDuration(tmconfig.ConfirmationsStaleReceiptTimeout), - bcmNotifications: make(chan *Notification, config.GetInt(tmconfig.ConfirmationsNotificationQueueLength)), - pending: make(map[string]*pendingItem), - newBlockHashEvents: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)), - metricsEmitter: cme, + chainTrackingMode: connector.GetChainTrackingMode(baseContext), + staleReceiptTimeout: config.GetDuration(tmconfig.ConfirmationsStaleReceiptTimeout), + bcmNotifications: make(chan *Notification, config.GetInt(tmconfig.ConfirmationsNotificationQueueLength)), + pending: make(map[string]*pendingItem), + newBlockHashEvents: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)), + metricsEmitter: cme, retry: &retry.Retry{ InitialDelay: config.GetDuration(tmconfig.ConfirmationsRetryInitDelay), MaximumDelay: config.GetDuration(tmconfig.ConfirmationsRetryMaxDelay), @@ -414,7 +411,7 @@ func (bcm *blockConfirmationManager) confirmationsListener() { blocks := bcm.newBlockState() if bcm.blockListenerStale { - if bcm.blockListenerTrackingMode == ffcapi.BlockListenerTrackingModeInMemoryPartialChain { + if bcm.chainTrackingMode == ffcapi.ChainTrackingModeFull { // only need to build the in memory partial chain when the block listener supports it if err := bcm.walkChain(blocks); err != nil { log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err) @@ -559,7 +556,7 @@ func (bcm *blockConfirmationManager) removeItem(pending *pendingItem, stale bool } func (bcm *blockConfirmationManager) processBlockHashes(blockHashes []string) { - if bcm.blockListenerTrackingMode == ffcapi.BlockListenerTrackingModeHeadBlockNumber { + if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight { // for headBlockNumber mode, we don't need to fetch blocks to form a local in memory partial chain // we just need to do confirmation check and dispatch confirmations for any applicable pending items bcm.checkAndDispatchConfirmationsUsingBlockHeight() @@ -791,7 +788,7 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc return nil } - if bcm.blockListenerTrackingMode == ffcapi.BlockListenerTrackingModeHeadBlockNumber { + if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight { return bcm.confirmationCheckUsingHighestBlock(pending) } diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index f56b8ca8..40581747 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -44,7 +44,7 @@ func newTestBlockConfirmationManager() (*blockConfirmationManager, *ffcapimocks. func newTestBlockConfirmationManagerCustomConfig() (*blockConfirmationManager, *ffcapimocks.API) { logrus.SetLevel(logrus.DebugLevel) mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() emm := &metricsmocks.EventMetricsEmitter{} emm.On("RecordNotificationQueueingMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() emm.On("RecordBlockHashProcessMetrics", mock.Anything, mock.Anything).Maybe() @@ -68,7 +68,7 @@ func newTestBlockConfirmationManagerHeadBlockNumber() (*blockConfirmationManager config.Set(tmconfig.ConfirmationsFetchReceiptUponEntry, false) logrus.SetLevel(logrus.DebugLevel) mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeHeadBlockNumber).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeLight).Maybe() emm := &metricsmocks.EventMetricsEmitter{} emm.On("RecordNotificationQueueingMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() emm.On("RecordBlockHashProcessMetrics", mock.Anything, mock.Anything).Maybe() @@ -1490,12 +1490,9 @@ func TestBlockConfirmationManagerHeadBlockNumberReceiptNotFoundReschedules(t *te }, } - receiptChecked := make(chan struct{}, 1) mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { return r.TransactionHash == txHash - })).Run(func(mock.Arguments) { - receiptChecked <- struct{}{} - }).Return(nil, ffcapi.ErrorReasonNotFound, errors.New("not found")).Once() + })).Return(nil, ffcapi.ErrorReasonNotFound, errors.New("not found")).Once() bcm.Start() ch := bcm.GetReceiveChannel() @@ -1507,12 +1504,24 @@ func TestBlockConfirmationManagerHeadBlockNumberReceiptNotFoundReschedules(t *te <-confirmed ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + // Wait until the not-found receipt path has re-queued a check (safe to read under + // receiptChecker.cond). pendingItem fields are updated by the listener without + // pendingMux, so assert those only after Stop freezes the listener. + assert.Eventually(t, func() bool { + bcm.receiptChecker.cond.L.Lock() + l := bcm.receiptChecker.entries.Len() + bcm.receiptChecker.cond.L.Unlock() + return l == 1 + }, time.Second, 5*time.Millisecond) + select { - case <-receiptChecked: - case <-time.After(time.Second): - t.Fatal("timeout waiting for TransactionReceipt") + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): } + bcm.Stop() + bcm.pendingMux.Lock() var cleared *pendingItem for _, pi := range bcm.pending { @@ -1524,15 +1533,6 @@ func TestBlockConfirmationManagerHeadBlockNumberReceiptNotFoundReschedules(t *te assert.Equal(t, "", cleared.blockHash) assert.Equal(t, uint64(0), cleared.blockNumber) assert.Nil(t, cleared.previousConfirmationCount) - assert.Equal(t, 1, bcm.receiptChecker.entries.Len()) - - select { - case n := <-confirmed: - t.Fatalf("unexpected confirmation notification: %+v", n) - case <-time.After(50 * time.Millisecond): - } - - bcm.Stop() mca.AssertExpectations(t) } diff --git a/internal/confirmations/confirmed_block_listener.go b/internal/confirmations/confirmed_block_listener.go index 9e058200..8b976d8f 100644 --- a/internal/confirmations/confirmed_block_listener.go +++ b/internal/confirmations/confirmed_block_listener.go @@ -69,8 +69,8 @@ func (bcm *blockConfirmationManager) StartConfirmedBlockListener(ctx context.Con } func (bcm *blockConfirmationManager) startConfirmedBlockListener(fgCtx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) (cbl *confirmedBlockListener, err error) { - if bcm.blockListenerTrackingMode == ffcapi.BlockListenerTrackingModeHeadBlockNumber { - return nil, i18n.NewError(fgCtx, tmmsgs.MsgConfirmedBlockListenerUnsupportedMode, bcm.blockListenerTrackingMode) + if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight { + return nil, i18n.NewError(fgCtx, tmmsgs.MsgConfirmedBlockListenerUnsupportedMode, bcm.chainTrackingMode) } cbl = &confirmedBlockListener{ bcm: bcm, diff --git a/internal/events/eventstream_test.go b/internal/events/eventstream_test.go index 33e99400..6a702f54 100644 --- a/internal/events/eventstream_test.go +++ b/internal/events/eventstream_test.go @@ -91,7 +91,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str tmconfig.Reset() config.Set(tmconfig.EventStreamsDefaultsBatchTimeout, "1us") InitDefaults() - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() ees, err := NewEventStream(context.Background(), testESConf(t, conf), mfc, &persistencemocks.Persistence{}, @@ -133,7 +133,7 @@ func TestNewTestEventStreamMissingID(t *testing.T) { InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() _, err := NewEventStream(context.Background(), &apitypes.EventStream{}, mca, &persistencemocks.Persistence{}, @@ -149,7 +149,7 @@ func TestNewTestEventStreamBadConfig(t *testing.T) { InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() _, err := NewEventStream(context.Background(), testESConf(t, `{}`), mca, &persistencemocks.Persistence{}, @@ -549,7 +549,7 @@ func TestAPIManagedEventStreamMissingListenerIDs(t *testing.T) { InitDefaults() mfc := &ffcapimocks.API{} - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() // Checkpoints are commonly tied to individual listeners, so it is critical that the caller manages // deterministically the IDs of the listeners passed in. They cannot be empty (an error will be returned) _, err := NewAPIManagedEventStream(context.Background(), @@ -577,7 +577,7 @@ func TestAPIManagedEventStreamE2E(t *testing.T) { } mfc := &ffcapimocks.API{} - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() ees, err := NewAPIManagedEventStream(context.Background(), testESConf(t, `{}`), mfc, @@ -757,7 +757,7 @@ func TestWebhookEventStreamsE2EAddAfterStart(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.MatchedBy(func(req *ffcapi.EventListenerVerifyOptionsRequest) bool { return req.FromBlock == "12345" && req.Options.JSONObject().GetString("option1") == "value1" })).Return(&ffcapi.EventListenerVerifyOptionsResponse{ @@ -908,7 +908,7 @@ func TestUpdateStreamStarted(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -973,7 +973,7 @@ func TestAddRemoveListener(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1031,7 +1031,7 @@ func TestUpdateListenerAndDeleteStarted(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1103,7 +1103,7 @@ func TestUpdateListenerFail(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1184,7 +1184,7 @@ func TestUpdateStreamRestartFail(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1244,7 +1244,7 @@ func TestUpdateAttemptChangeSignature(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{ ResolvedSignature: "sig1", }, ffcapi.ErrorReason(""), nil).Once() @@ -1410,7 +1410,7 @@ func TestAttemptResetNonExistentListener(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) l := &apitypes.Listener{ @@ -1439,7 +1439,7 @@ func TestUpdateStreamStopFail(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1497,7 +1497,7 @@ func TestResetListenerRestartFail(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1550,7 +1550,7 @@ func TestResetListenerWriteCheckpointFail(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) msp := es.checkpointsDB.(*persistencemocks.Persistence) @@ -1599,7 +1599,7 @@ func TestWebSocketBroadcastActionCloseDuringCheckpoint(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1674,7 +1674,7 @@ func TestActionRetryOk(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -1726,7 +1726,7 @@ func TestActionRetrySkip(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -1769,7 +1769,7 @@ func TestActionRetryBlock(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -1821,7 +1821,7 @@ func TestDeleteFail(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -2240,7 +2240,7 @@ func TestStartAPIEventStreamStartFail(t *testing.T) { } mfc := &ffcapimocks.API{} - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() ees, err := NewAPIManagedEventStream(context.Background(), testESConf(t, `{}`), mfc, @@ -2273,7 +2273,7 @@ func TestStartAPIEventStreamPollContextCancelled(t *testing.T) { } mfc := &ffcapimocks.API{} - mfc.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() ees, err := NewAPIManagedEventStream(context.Background(), testESConf(t, `{}`), mfc, diff --git a/internal/tmmsgs/en_error_messages.go b/internal/tmmsgs/en_error_messages.go index 5ceea0a9..d61b4ca4 100644 --- a/internal/tmmsgs/en_error_messages.go +++ b/internal/tmmsgs/en_error_messages.go @@ -109,7 +109,7 @@ var ( MsgStreamAPIManagedNameNoIDOrType = ffe("FF21092", "API managed streams must have a name, but no ID or type", http.StatusBadRequest) MsgUpdatePayloadEmpty = ffe("FF21093", "Update transaction must have a non-empty payload", http.StatusBadRequest) MsgTxHandlerUnsupportedFieldForUpdate = ffe("FF21094", "Update '%s' in the transaction is not supported by the transaction handler", http.StatusBadRequest) - MsgConfirmedBlockListenerUnsupportedMode = ffe("FF21095", "Confirmed block listeners is not supported when block listener tracking mode is '%s'", http.StatusBadRequest) + MsgConfirmedBlockListenerUnsupportedMode = ffe("FF21095", "Block listener is not supported when chain tracking mode is '%s'", http.StatusBadRequest) MsgTransactionReceiptMissingBlockHash = ffe("FF21096", "Transaction receipt missing block hash") MsgTransactionReceiptBlockHashMismatch = ffe("FF21097", "Transaction receipt block hash mismatch. Expected %s, got %s") ) diff --git a/mocks/ffcapimocks/api.go b/mocks/ffcapimocks/api.go index d283dcc2..c4b1264f 100644 --- a/mocks/ffcapimocks/api.go +++ b/mocks/ffcapimocks/api.go @@ -478,19 +478,19 @@ func (_m *API) GasPriceEstimate(ctx context.Context, req *ffcapi.GasPriceEstimat return r0, r1, r2 } -// GetBlockListenerTrackingMode provides a mock function with given fields: ctx -func (_m *API) GetBlockListenerTrackingMode(ctx context.Context) ffcapi.BlockListenerTrackingMode { +// GetChainTrackingMode provides a mock function with given fields: ctx +func (_m *API) GetChainTrackingMode(ctx context.Context) ffcapi.ChainTrackingMode { ret := _m.Called(ctx) if len(ret) == 0 { - panic("no return value specified for GetBlockListenerTrackingMode") + panic("no return value specified for GetChainTrackingMode") } - var r0 ffcapi.BlockListenerTrackingMode - if rf, ok := ret.Get(0).(func(context.Context) ffcapi.BlockListenerTrackingMode); ok { + var r0 ffcapi.ChainTrackingMode + if rf, ok := ret.Get(0).(func(context.Context) ffcapi.ChainTrackingMode); ok { r0 = rf(ctx) } else { - r0 = ret.Get(0).(ffcapi.BlockListenerTrackingMode) + r0 = ret.Get(0).(ffcapi.ChainTrackingMode) } return r0 diff --git a/pkg/apitypes/api_types.go b/pkg/apitypes/api_types.go index 0f1e0ab5..569eb6f3 100644 --- a/pkg/apitypes/api_types.go +++ b/pkg/apitypes/api_types.go @@ -438,7 +438,7 @@ type ConfirmationsNotification struct { // Otherwise, Confirmations is an additive delta on top of a previous list of confirmations. NewFork bool // Confirmations is the list of confirmations being notified - assured to be non-nil. - // The list will only be populated if confirmation mode is set to validatedBlocks. + // The list will only be populated if chainTrackingMode is set to full. Confirmations []*Confirmation // CurrentConfirmationCount is the actual number of confirmations for the transaction CurrentConfirmationCount uint64 `json:"currentConfirmationCount"` diff --git a/pkg/ffcapi/api.go b/pkg/ffcapi/api.go index b315aebf..82417405 100644 --- a/pkg/ffcapi/api.go +++ b/pkg/ffcapi/api.go @@ -61,8 +61,8 @@ type API interface { // DeployContractPrepare DeployContractPrepare(ctx context.Context, req *ContractDeployPrepareRequest) (*TransactionPrepareResponse, ErrorReason, error) - // GetBlockListenerTrackingMode gets the tracking mode of the block listener - GetBlockListenerTrackingMode(ctx context.Context) BlockListenerTrackingMode + // GetChainTrackingMode gets the tracking mode of the chain progress, which determines the level of detail of the confirmation result and the support of block listener + GetChainTrackingMode(ctx context.Context) ChainTrackingMode // EventStreamStart starts an event stream with an initial set of listeners (which might be empty), a channel to deliver events, and a context that will close to stop the stream EventStreamStart(ctx context.Context, req *EventStreamStartRequest) (*EventStreamStartResponse, ErrorReason, error) @@ -98,13 +98,13 @@ type API interface { IsReady(ctx context.Context) (*ReadyResponse, ErrorReason, error) } -type BlockListenerTrackingMode string +type ChainTrackingMode string const ( - // BlockListenerTrackingModeHeadBlockNumber in this mode, the block listener tracks the head block number of the blockchain only without fetching any block details - BlockListenerTrackingModeHeadBlockNumber BlockListenerTrackingMode = "headBlockNumber" - // BlockListenerTrackingModeInMemoryPartialChain in this mode, the block listener tracks the in-memory partial chain of blocks, fetching block details as needed - BlockListenerTrackingModeInMemoryPartialChain BlockListenerTrackingMode = "inMemoryPartialChain" + // ChainTrackingModeLight - in this mode, the connector fetches the head block number only, no block details are fetched. Therefore, block listener is not supported, confirmation result contains only the number of the confirmation. + ChainTrackingModeLight ChainTrackingMode = "light" + // ChainTrackingModeFull - (default) in this mode, the connector fetches the head block number and downloads block details and maintain a consistent in-memory partial chain. Block listener is supported, and confirmation result contains extra block details as well as the number of the confirmation. + ChainTrackingModeFull ChainTrackingMode = "full" ) type ConfirmationUpdateResult struct { @@ -116,8 +116,9 @@ type ConfirmationUpdateResult struct { // in the in-memory partial chain // WARNING: mutation to this list is not expected, invalid modifications will cause inefficiencies in the reconciliation process // `rebuilt` will be true if an invalid confirmation list is detected by the reconciliation process - Confirmations []*MinimalBlockInfo `json:"confirmations,omitempty"` // the current list of confirmations for this reconcile request, only returned when confirmation mode is set to validatedBlocks - CurrentConfirmationCount uint64 `json:"currentConfirmationCount"` // the current number of confirmations for this reconcile request + Confirmations []*MinimalBlockInfo `json:"confirmations,omitempty"` // the current list of confirmations for this reconcile request, only returned when chainTrackingMode is set to full + + CurrentConfirmationCount uint64 `json:"currentConfirmationCount"` // the current number of confirmations for this reconcile request Receipt *TransactionReceiptResponse `json:"receipt,omitempty"` // receipt for the transaction Rebuilt bool `json:"rebuilt,omitempty"` // when true, it means the existing confirmations contained invalid blocks, the new confirmations are rebuilt from scratch diff --git a/pkg/fftm/manager_test.go b/pkg/fftm/manager_test.go index 0fa4657d..0348095c 100644 --- a/pkg/fftm/manager_test.go +++ b/pkg/fftm/manager_test.go @@ -82,7 +82,7 @@ func newTestManager(t *testing.T) (string, *manager, func()) { mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mm, err := NewManager(context.Background(), mca) assert.NoError(t, err) @@ -106,7 +106,7 @@ func newTestManagerMockNoRichDB(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t) mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() m := newManager(context.Background(), mca) mpm := &persistencemocks.Persistence{} @@ -134,7 +134,7 @@ func newTestManagerMockRichDB(t *testing.T) (string, *manager, *persistencemocks url := testManagerCommonInit(t) mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() m := newManager(context.Background(), mca) mpm := &persistencemocks.Persistence{} @@ -180,7 +180,7 @@ func newTestManagerWithMetrics(t *testing.T, deprecated bool) (string, *manager, mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mm, err := NewManager(context.Background(), mca) assert.NoError(t, err) @@ -201,7 +201,7 @@ func newTestManagerMockPersistence(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t) mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() m := newManager(context.Background(), mca) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() @@ -246,7 +246,7 @@ func TestNewManagerWithLegacyConfiguration(t *testing.T) { tmconfig.DeprecatedPolicyEngineBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() m := newManager(context.Background(), mca) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() @@ -268,7 +268,7 @@ func TestNewManagerBadHttpConfig(t *testing.T) { tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() _, err := NewManager(context.Background(), mca) assert.Error(t, err) assert.Regexp(t, "FF00151", err) @@ -289,7 +289,7 @@ func TestNewManagerBadLevelDBConfig(t *testing.T) { tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() _, err = NewManager(context.Background(), mca) assert.Regexp(t, "FF21049", err) @@ -305,7 +305,7 @@ func TestNewManagerBadPersistenceConfig(t *testing.T) { tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() _, err := NewManager(context.Background(), mca) assert.Regexp(t, "FF21043", err) @@ -319,7 +319,7 @@ func TestNewManagerInvalidTransactionHandlerName(t *testing.T) { config.Set(tmconfig.TransactionsHandlerName, "wrong") mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() _, err := NewManager(context.Background(), mca) assert.Regexp(t, "FF21070", err) @@ -365,7 +365,7 @@ func TestNewManagerWithMetricsBadConfig(t *testing.T) { tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") mca := &ffcapimocks.API{} - mca.On("GetBlockListenerTrackingMode", mock.Anything).Return(ffcapi.BlockListenerTrackingModeInMemoryPartialChain, nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() _, err := NewManager(context.Background(), mca) assert.Error(t, err) assert.Regexp(t, "FF00151", err) From 337e9160b1413b6ecdb98045634707a1efa1bb6b Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Mon, 27 Apr 2026 08:46:54 +0100 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: Chengxuan Xing Signed-off-by: Chengxuan Xing --- internal/confirmations/confirmations.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index 389f64ac..63eed71c 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -412,7 +412,7 @@ func (bcm *blockConfirmationManager) confirmationsListener() { if bcm.blockListenerStale { if bcm.chainTrackingMode == ffcapi.ChainTrackingModeFull { - // only need to build the in memory partial chain when the block listener supports it + // only need to build the in memory partial chain when the block details are available if err := bcm.walkChain(blocks); err != nil { log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err) continue @@ -557,8 +557,7 @@ func (bcm *blockConfirmationManager) removeItem(pending *pendingItem, stale bool func (bcm *blockConfirmationManager) processBlockHashes(blockHashes []string) { if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight { - // for headBlockNumber mode, we don't need to fetch blocks to form a local in memory partial chain - // we just need to do confirmation check and dispatch confirmations for any applicable pending items + // for light chain tracking mode, no block details are available, only need to calculate the number of confirmations using head block number bcm.checkAndDispatchConfirmationsUsingBlockHeight() return } From d2308e95aac4d2564e69c2eaec2f302d12ff681d Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Mon, 27 Apr 2026 08:47:13 +0100 Subject: [PATCH 6/7] tidy up function name Signed-off-by: Chengxuan Xing --- internal/confirmations/confirmations.go | 6 +++--- internal/confirmations/confirmations_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index 63eed71c..03066c93 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -788,7 +788,7 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc } if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight { - return bcm.confirmationCheckUsingHighestBlock(pending) + return bcm.confirmationCheckUsingHeadBlockNumber(pending) } pendingKey := pending.getKey() @@ -843,13 +843,13 @@ func (bcm *blockConfirmationManager) checkAndDispatchConfirmationsUsingBlockHeig } bcm.pendingMux.Unlock() for _, p := range items { - if err := bcm.confirmationCheckUsingHighestBlock(p); err != nil { + if err := bcm.confirmationCheckUsingHeadBlockNumber(p); err != nil { log.L(bcm.ctx).Errorf("Block height confirmation refresh failed for %s: %s", p.getKey(), err) } } } -func (bcm *blockConfirmationManager) confirmationCheckUsingHighestBlock(pending *pendingItem) error { +func (bcm *blockConfirmationManager) confirmationCheckUsingHeadBlockNumber(pending *pendingItem) error { if pending.blockHash == "" { // no receipt yet, so no confirmation check return nil diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index 40581747..8dbbb75b 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -1795,7 +1795,7 @@ func TestBlockConfirmationManagerHeadBlockNumberReceiptOtherErrorNoReschedule(t mca.AssertExpectations(t) } -// TestBlockConfirmationManagerHeadBlockNumberNoOpWithoutReceipt covers confirmationCheckUsingHighestBlock +// TestBlockConfirmationManagerHeadBlockNumberNoOpWithoutReceipt tests confirmationCheckUsingHeadBlockNumber // when a pending transaction has no block hash yet (no receipt): head updates must not call dispatch or TransactionReceipt. func TestBlockConfirmationManagerHeadBlockNumberNoOpWithoutReceipt(t *testing.T) { bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() From 852459b5a29d758290410fb27617a66d6fd28cf6 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Mon, 27 Apr 2026 08:48:17 +0100 Subject: [PATCH 7/7] Apply suggestions from code review Co-authored-by: Chengxuan Xing Signed-off-by: Chengxuan Xing --- internal/confirmations/confirmations.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index 03066c93..803841dc 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -99,7 +99,7 @@ type blockConfirmationManager struct { staleReceiptTimeout time.Duration bcmNotifications chan *Notification highestBlockSeen uint64 - headBlockNumber uint64 // used for confirmation when block listener is in headBlockNumber mode + headBlockNumber uint64 // used for confirmation when the connector is running in light mode pending map[string]*pendingItem pendingMux sync.Mutex receiptChecker *receiptChecker