diff --git a/internal/blocklistener/blocklistener.go b/internal/blocklistener/blocklistener.go index 98c0c55a..6f97d7d7 100644 --- a/internal/blocklistener/blocklistener.go +++ b/internal/blocklistener/blocklistener.go @@ -24,7 +24,7 @@ import ( ) type NewBlockHashConsumer interface { - NewBlockHashes() chan<- *ffcapi.BlockHashEvent + GetReceiveChannel() chan<- *ffcapi.BlockHashEvent } // BufferChannel ensures it always pulls blocks from the channel passed to the connector @@ -49,7 +49,7 @@ func BufferChannel(ctx context.Context, target NewBlockHashConsumer) (buffered c // Have to discard this blockedUpdate.GapPotential = true // there is a gap for sure at this point log.L(ctx).Debugf("Blocked event stream missed new block event: %v", blockUpdate.BlockHashes) - case target.NewBlockHashes() <- blockedUpdate: + case target.GetReceiveChannel() <- blockedUpdate: // We're not blocked any more log.L(ctx).Infof("Event stream block-listener unblocked") blockedUpdate = nil @@ -64,7 +64,7 @@ func BufferChannel(ctx context.Context, target NewBlockHashConsumer) (buffered c // Nothing to do unless we have confirmations turned on if target != nil { select { - case target.NewBlockHashes() <- update: + case target.GetReceiveChannel() <- update: // all good, we passed it on default: // we can't deliver it immediately, we switch to blocked mode diff --git a/internal/blocklistener/blocklistener_test.go b/internal/blocklistener/blocklistener_test.go index b4d8ed29..13a13a36 100644 --- a/internal/blocklistener/blocklistener_test.go +++ b/internal/blocklistener/blocklistener_test.go @@ -28,7 +28,7 @@ type testBlockConsumer struct { c chan *ffcapi.BlockHashEvent } -func (tbc *testBlockConsumer) NewBlockHashes() chan<- *ffcapi.BlockHashEvent { +func (tbc *testBlockConsumer) GetReceiveChannel() chan<- *ffcapi.BlockHashEvent { return tbc.c } diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index 175ff0cf..803841dc 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "sort" + "strings" "sync" "time" @@ -43,7 +44,7 @@ type Manager interface { Notify(n *Notification) error Start() Stop() - NewBlockHashes() chan<- *ffcapi.BlockHashEvent + GetReceiveChannel() chan<- *ffcapi.BlockHashEvent CheckInFlight(listenerID *fftypes.UUID) bool StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error StopConfirmedBlockListener(ctx context.Context, id *fftypes.UUID) error @@ -89,14 +90,16 @@ type blockConfirmationManager struct { baseContext context.Context ctx context.Context cancelFunc func() - newBlockHashes chan *ffcapi.BlockHashEvent + newBlockHashEvents chan *ffcapi.BlockHashEvent connector ffcapi.API blockListenerStale bool metricsEmitter metrics.ConfirmationMetricsEmitter - requiredConfirmations int + requiredConfirmations uint64 + chainTrackingMode ffcapi.ChainTrackingMode staleReceiptTimeout time.Duration bcmNotifications chan *Notification highestBlockSeen uint64 + headBlockNumber uint64 // used for confirmation when the connector is running in light mode pending map[string]*pendingItem pendingMux sync.Mutex receiptChecker *receiptChecker @@ -114,11 +117,12 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A connector: connector, cbls: make(map[fftypes.UUID]*confirmedBlockListener), blockListenerStale: true, - requiredConfirmations: config.GetInt(tmconfig.ConfirmationsRequired), + requiredConfirmations: config.GetUint64(tmconfig.ConfirmationsRequired), + chainTrackingMode: connector.GetChainTrackingMode(baseContext), staleReceiptTimeout: config.GetDuration(tmconfig.ConfirmationsStaleReceiptTimeout), bcmNotifications: make(chan *Notification, config.GetInt(tmconfig.ConfirmationsNotificationQueueLength)), pending: make(map[string]*pendingItem), - newBlockHashes: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)), + newBlockHashEvents: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)), metricsEmitter: cme, retry: &retry.Retry{ InitialDelay: config.GetDuration(tmconfig.ConfirmationsRetryInitDelay), @@ -143,22 +147,23 @@ const ( // pendingItem could be a specific event that has been detected, but not confirmed yet. // Or it could be a transaction type pendingItem struct { - pType pendingType - added time.Time - notifiedConfirmations []*apitypes.Confirmation - confirmations []*apitypes.Confirmation - scheduledAtLeastOnce bool - confirmed bool - queuedStale *list.Element // protected by receiptChecker mux - lastReceiptCheck time.Time // protected by receiptChecker mux - receiptCallback func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse) - confirmationsCallback func(ctx context.Context, notification *apitypes.ConfirmationsNotification) - transactionHash string - blockHash string // can be notified of changes to this for receipts - blockNumber uint64 // known at creation time for event logs - transactionIndex uint64 // known at creation time for event logs - logIndex uint64 // events only - listenerID *fftypes.UUID // events only + pType pendingType + added time.Time + notifiedConfirmations []*apitypes.Confirmation + confirmations []*apitypes.Confirmation + scheduledAtLeastOnce bool + confirmed bool + previousConfirmationCount *uint64 // headBlockNumber mode: last dispatched CurrentConfirmationCount + queuedStale *list.Element // protected by receiptChecker mux + lastReceiptCheck time.Time // protected by receiptChecker mux + receiptCallback func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse) + confirmationsCallback func(ctx context.Context, notification *apitypes.ConfirmationsNotification) + transactionHash string + blockHash string // can be notified of changes to this for receipts + blockNumber uint64 // known at creation time for event logs + transactionIndex uint64 // known at creation time for event logs + logIndex uint64 // events only + listenerID *fftypes.UUID // events only } func pendingKeyForTX(txHash string) string { @@ -243,8 +248,8 @@ func (bcm *blockConfirmationManager) Stop() { } } -func (bcm *blockConfirmationManager) NewBlockHashes() chan<- *ffcapi.BlockHashEvent { - return bcm.newBlockHashes +func (bcm *blockConfirmationManager) GetReceiveChannel() chan<- *ffcapi.BlockHashEvent { + return bcm.newBlockHashEvents } // Notify is used to notify the confirmation manager of detection of a new logEntry addition or removal @@ -365,12 +370,12 @@ func (bcm *blockConfirmationManager) confirmationsListener() { receivedFirstBlock := false for { select { - case bhe := <-bcm.newBlockHashes: + case bhe := <-bcm.newBlockHashEvents: if bhe.GapPotential { bcm.blockListenerStale = true } blockHashes = append(blockHashes, bhe.BlockHashes...) - + bcm.headBlockNumber = bhe.HeadBlockNumber // always update the head block number, NOTE: the number can decrease during a re-org // Need to also pass this event to any confirmed block listeners // (they promise to always be efficient in handling these, having a go-routine // dedicated to spinning fast just processing those separate to dispatching them) @@ -406,9 +411,12 @@ func (bcm *blockConfirmationManager) confirmationsListener() { blocks := bcm.newBlockState() if bcm.blockListenerStale { - if err := bcm.walkChain(blocks); err != nil { - log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err) - continue + if bcm.chainTrackingMode == ffcapi.ChainTrackingModeFull { + // only need to build the in memory partial chain when the block details are available + if err := bcm.walkChain(blocks); err != nil { + log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err) + continue + } } bcm.blockListenerStale = false } @@ -548,6 +556,11 @@ func (bcm *blockConfirmationManager) removeItem(pending *pendingItem, stale bool } func (bcm *blockConfirmationManager) processBlockHashes(blockHashes []string) { + if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight { + // for light chain tracking mode, no block details are available, only need to calculate the number of confirmations using head block number + bcm.checkAndDispatchConfirmationsUsingBlockHeight() + return + } batchSize := len(blockHashes) if batchSize > 0 { log.L(bcm.ctx).Debugf("New block notifications %v", blockHashes) @@ -599,7 +612,6 @@ func (bcm *blockConfirmationManager) processBlock(block *apitypes.BlockInfo) { var notifications pendingItems for pendingKey, pending := range bcm.pending { if pending.blockHash != "" { - // The block might appear at any point in the confirmation list newConfirmations := false expectedParentHash := pending.blockHash @@ -622,7 +634,7 @@ func (bcm *blockConfirmationManager) processBlock(block *apitypes.BlockInfo) { } expectedBlockNumber++ } - if len(pending.confirmations) >= bcm.requiredConfirmations { + if uint64(len(pending.confirmations)) >= bcm.requiredConfirmations { pending.confirmed = true } if bcm.requiredConfirmations > 0 && (pending.confirmed || newConfirmations) { @@ -666,14 +678,21 @@ func (bcm *blockConfirmationManager) dispatchConfirmations(item *pendingItem) { } } + confirmationCount := uint64(len(item.confirmations)) + if confirmationCount > bcm.requiredConfirmations { + confirmationCount = bcm.requiredConfirmations + } + // Possible for us to re-dispatch the same confirmations, if we are notified about a block later after // after we previously did a crawl for blocks. // So we protect here against dispatching an empty array if len(notificationConfirmations) > 0 || item.confirmed { notification := &apitypes.ConfirmationsNotification{ - Confirmed: item.confirmed, - NewFork: newFork, - Confirmations: notificationConfirmations, + Confirmed: item.confirmed, + CurrentConfirmationCount: confirmationCount, + TargetConfirmationCount: bcm.requiredConfirmations, + NewFork: newFork, + Confirmations: notificationConfirmations, } // Take a copy of the notification confirmations so we know what we have previously notified next time round // (not safe to keep a reference, in case it's modified by the callback). @@ -768,6 +787,10 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc return nil } + if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight { + return bcm.confirmationCheckUsingHeadBlockNumber(pending) + } + pendingKey := pending.getKey() blockNumber := pending.blockNumber + 1 @@ -796,7 +819,7 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc break } pending.confirmations = append(pending.confirmations, apitypes.ConfirmationFromBlock(block)) - if len(pending.confirmations) >= bcm.requiredConfirmations { + if uint64(len(pending.confirmations)) >= bcm.requiredConfirmations { pending.confirmed = true break } @@ -811,3 +834,91 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc return nil } + +func (bcm *blockConfirmationManager) checkAndDispatchConfirmationsUsingBlockHeight() { + bcm.pendingMux.Lock() + items := make([]*pendingItem, 0, len(bcm.pending)) + for _, p := range bcm.pending { + items = append(items, p) + } + bcm.pendingMux.Unlock() + for _, p := range items { + if err := bcm.confirmationCheckUsingHeadBlockNumber(p); err != nil { + log.L(bcm.ctx).Errorf("Block height confirmation refresh failed for %s: %s", p.getKey(), err) + } + } +} + +func (bcm *blockConfirmationManager) confirmationCheckUsingHeadBlockNumber(pending *pendingItem) error { + if pending.blockHash == "" { + // no receipt yet, so no confirmation check + return nil + } + + currentConfirmationCount := uint64(0) + if bcm.headBlockNumber > pending.blockNumber { + currentConfirmationCount = bcm.headBlockNumber - pending.blockNumber + } + + return bcm.dispatchBlockHeightConfirmations(pending, currentConfirmationCount) + +} + +func (bcm *blockConfirmationManager) dispatchBlockHeightConfirmations(pending *pendingItem, count uint64) error { + if pending.previousConfirmationCount != nil && count == *pending.previousConfirmationCount { + // no ops as there isn't any changes detected + return nil + } + + confirmationCount := count + if confirmationCount > bcm.requiredConfirmations { + confirmationCount = bcm.requiredConfirmations + } + + confirmed := confirmationCount == bcm.requiredConfirmations + if confirmed { + // do confirmation check here to ensure the transaction receipt is still valid + res, reason, err := bcm.connector.TransactionReceipt(bcm.ctx, &ffcapi.TransactionReceiptRequest{ + TransactionHash: pending.transactionHash, + }) + if err != nil { + if reason == ffcapi.ErrorReasonNotFound { + // need to schedule the receipt check again as the receipt is no longer valid + pending.blockHash = "" + pending.blockNumber = 0 + pending.previousConfirmationCount = nil + bcm.receiptChecker.schedule(pending, true) + } + return err + } + if res == nil || res.BlockHash == "" { + return i18n.NewError(bcm.ctx, tmmsgs.MsgTransactionReceiptMissingBlockHash) + } + if !strings.EqualFold(strings.ToLower(res.BlockHash), strings.ToLower(pending.blockHash)) { + // set the new block hash and number and requeue for confirmation check + pending.blockHash = res.BlockHash + pending.blockNumber = res.BlockNumber.Uint64() + pending.previousConfirmationCount = nil + return i18n.NewError(bcm.ctx, tmmsgs.MsgTransactionReceiptBlockHashMismatch, pending.blockHash, res.BlockHash) + } + } + // when not confirmed, we assume the receipt is still valid and return the actual confirmation count + + notification := &apitypes.ConfirmationsNotification{ + Confirmed: confirmed, + NewFork: pending.previousConfirmationCount != nil && count < *pending.previousConfirmationCount, // the only fork situation we can detect is a reduction in confirmation count + CurrentConfirmationCount: confirmationCount, // NOTE: we returns the actual confirmation count, which can be higher than the required confirmation count + TargetConfirmationCount: bcm.requiredConfirmations, + } + pending.previousConfirmationCount = &count + + log.L(bcm.ctx).Infof("Block height confirmation notification item=%s confirmed=%t count=%d/%d newFork=%t", + pending.getKey(), notification.Confirmed, notification.CurrentConfirmationCount, notification.TargetConfirmationCount, notification.NewFork) + pending.confirmationsCallback(bcm.ctx, notification) + bcm.metricsEmitter.RecordConfirmationMetrics(bcm.ctx, time.Since(pending.added).Seconds()) + + if confirmed { + bcm.removeItem(pending, false) + } + return nil +} diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index 2c4b5a43..8dbbb75b 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -16,6 +16,7 @@ package confirmations import ( "context" + "errors" "fmt" "sort" "testing" @@ -43,6 +44,7 @@ func newTestBlockConfirmationManager() (*blockConfirmationManager, *ffcapimocks. func newTestBlockConfirmationManagerCustomConfig() (*blockConfirmationManager, *ffcapimocks.API) { logrus.SetLevel(logrus.DebugLevel) mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() emm := &metricsmocks.EventMetricsEmitter{} emm.On("RecordNotificationQueueingMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() emm.On("RecordBlockHashProcessMetrics", mock.Anything, mock.Anything).Maybe() @@ -52,11 +54,35 @@ func newTestBlockConfirmationManagerCustomConfig() (*blockConfirmationManager, * emm.On("RecordConfirmationMetrics", mock.Anything, mock.Anything).Maybe() emm.On("RecordBlockHashQueueingMetrics", mock.Anything, mock.Anything).Maybe() emm.On("RecordBlockHashBatchSizeMetric", mock.Anything, mock.Anything).Maybe() + bcm := NewBlockConfirmationManager(context.Background(), mca, "ut", emm).(*blockConfirmationManager) bcm.receiptChecker = newReceiptChecker(bcm, 0, emm) // no workers, but non-nil return bcm, mca } +func newTestBlockConfirmationManagerHeadBlockNumber() (*blockConfirmationManager, *ffcapimocks.API) { + tmconfig.Reset() + config.Set(tmconfig.ConfirmationsRequired, 3) + config.Set(tmconfig.ConfirmationsNotificationQueueLength, 10) + config.Set(tmconfig.ConfirmationsReceiptWorkers, 0) + config.Set(tmconfig.ConfirmationsFetchReceiptUponEntry, false) + logrus.SetLevel(logrus.DebugLevel) + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeLight).Maybe() + emm := &metricsmocks.EventMetricsEmitter{} + emm.On("RecordNotificationQueueingMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() + emm.On("RecordBlockHashProcessMetrics", mock.Anything, mock.Anything).Maybe() + emm.On("RecordNotificationProcessMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() + emm.On("RecordReceiptCheckMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() + emm.On("RecordReceiptMetrics", mock.Anything, mock.Anything, mock.Anything).Maybe() + emm.On("RecordConfirmationMetrics", mock.Anything, mock.Anything).Maybe() + emm.On("RecordBlockHashQueueingMetrics", mock.Anything, mock.Anything).Maybe() + emm.On("RecordBlockHashBatchSizeMetric", mock.Anything, mock.Anything).Maybe() + + bcm := NewBlockConfirmationManager(context.Background(), mca, "ut", emm).(*blockConfirmationManager) + return bcm, mca +} + func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { bcm, mca := newTestBlockConfirmationManager() @@ -76,7 +102,7 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { } // First poll for changes gives nothing, but we load up the event at this point for the next round - blockHashes := bcm.NewBlockHashes() + blockHashes := bcm.GetReceiveChannel() // Next time round gives a block that is in the confirmation chain, but one block ahead block1003 := &apitypes.BlockInfo{ @@ -131,6 +157,7 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { // Then we should walk the chain by number to fill in 1003, because our HWM is 1003. mca.On("BlockInfoByNumber", mock.Anything, mock.MatchedBy(func(r *ffcapi.BlockInfoByNumberRequest) bool { + fmt.Println("BlockInfoByNumber", r.BlockNumber.Uint64()) return r.BlockNumber.Uint64() == 1003 })).Run(func(args mock.Arguments) { blockHashes <- &ffcapi.BlockHashEvent{ @@ -163,6 +190,8 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { apitypes.ConfirmationFromBlock(block1002), apitypes.ConfirmationFromBlock(block1003), }, dispatched.Confirmations) + assert.Equal(t, uint64(2), dispatched.CurrentConfirmationCount) + assert.Equal(t, uint64(3), dispatched.TargetConfirmationCount) assert.True(t, dispatched.NewFork) assert.False(t, dispatched.Confirmed) @@ -171,6 +200,8 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) { assert.Equal(t, []*apitypes.Confirmation{ apitypes.ConfirmationFromBlock(block1004), }, dispatched.Confirmations) + assert.Equal(t, uint64(3), dispatched.CurrentConfirmationCount) + assert.Equal(t, uint64(3), dispatched.TargetConfirmationCount) assert.False(t, dispatched.NewFork) assert.True(t, dispatched.Confirmed) @@ -209,7 +240,7 @@ func TestBlockConfirmationManagerE2EFork(t *testing.T) { ParentHash: "0x64fd8179b80dd255d52ce60d7f265c0506be810e2f3df52463fadeb44bb4d2df", } - blockHashes := bcm.NewBlockHashes() + blockHashes := bcm.GetReceiveChannel() blockHashes <- &ffcapi.BlockHashEvent{ BlockHashes: []string{ block1002.BlockHash, @@ -357,7 +388,7 @@ func TestBlockConfirmationManagerE2EForkReNotifyConfirmations(t *testing.T) { ParentHash: "0x64fd8179b80dd255d52ce60d7f265c0506be810e2f3df52463fadeb44bb4d2df", } - blockHashes := bcm.NewBlockHashes() + blockHashes := bcm.GetReceiveChannel() // Have the event notification in flight from the beginning err := bcm.Notify(&Notification{ @@ -500,7 +531,7 @@ func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) { } // The next filter gives us 1002a, which will later be removed - blockHashes := bcm.NewBlockHashes() + blockHashes := bcm.GetReceiveChannel() // First check while walking the chain does not yield a block mca.On("BlockInfoByNumber", mock.Anything, mock.MatchedBy(func(r *ffcapi.BlockInfoByNumberRequest) bool { @@ -921,7 +952,7 @@ func TestConfirmationsFailWalkChainAfterBlockGap(t *testing.T) { assert.NoError(t, err) mca.On("BlockInfoByNumber", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReasonNotFound, fmt.Errorf("not found")).Run(func(args mock.Arguments) { - bcm.NewBlockHashes() <- &ffcapi.BlockHashEvent{ + bcm.GetReceiveChannel() <- &ffcapi.BlockHashEvent{ GapPotential: true, } }).Once() @@ -1305,3 +1336,493 @@ func TestBlockState(t *testing.T) { mca.AssertExpectations(t) } + +func TestBlockConfirmationManagerHeadBlockNumberConfirmsEvent(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Return(&ffcapi.TransactionReceiptResponse{ + TransactionReceiptResponseBase: ffcapi.TransactionReceiptResponseBase{ + BlockNumber: fftypes.NewFFBigInt(1001), + BlockHash: blockHash, + }, + }, ffcapi.ErrorReason(""), nil).Once() + + bcm.Start() + blockEvents := bcm.GetReceiveChannel() + + blockEvents <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + + err := bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + }) + assert.NoError(t, err) + + n0 := <-confirmed + assert.False(t, n0.Confirmed) + assert.False(t, n0.NewFork) + assert.Equal(t, uint64(0), n0.CurrentConfirmationCount) + assert.Equal(t, uint64(3), n0.TargetConfirmationCount) + assert.Empty(t, n0.Confirmations) + + blockEvents <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + + n1 := <-confirmed + assert.True(t, n1.Confirmed) + assert.False(t, n1.NewFork) + assert.Equal(t, uint64(3), n1.CurrentConfirmationCount) + assert.Equal(t, uint64(3), n1.TargetConfirmationCount) + assert.Empty(t, n1.Confirmations) + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberNewForkOnHeadDrop(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + bcm.Start() + ch := bcm.GetReceiveChannel() + + // Prime head so this iteration completes before Notify is queued; otherwise the select + // may handle Notify first while head is still zero (same pattern as TestBlockConfirmationManagerHeadBlockNumberConfirmsEvent). + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + + err := bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + }) + assert.NoError(t, err) + + n0 := <-confirmed + assert.False(t, n0.Confirmed) + assert.False(t, n0.NewFork) + assert.Equal(t, uint64(0), n0.CurrentConfirmationCount) + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1003} + + n1 := <-confirmed + assert.False(t, n1.Confirmed) + assert.False(t, n1.NewFork) + assert.Equal(t, uint64(2), n1.CurrentConfirmationCount) + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1002} + + n2 := <-confirmed + assert.False(t, n2.Confirmed) + assert.True(t, n2.NewFork) + assert.Equal(t, uint64(1), n2.CurrentConfirmationCount) + + // validate confirmation for the new head block + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Return(&ffcapi.TransactionReceiptResponse{ + TransactionReceiptResponseBase: ffcapi.TransactionReceiptResponseBase{ + BlockNumber: fftypes.NewFFBigInt(1001), + BlockHash: blockHash, + }, + }, ffcapi.ErrorReason(""), nil).Once() + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 2002} + n3 := <-confirmed + assert.True(t, n3.Confirmed) + assert.False(t, n3.NewFork) + assert.Equal(t, uint64(3), n3.CurrentConfirmationCount) + assert.Equal(t, uint64(3), n3.TargetConfirmationCount) + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptNotFoundReschedules(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Return(nil, ffcapi.ErrorReasonNotFound, errors.New("not found")).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + // Wait until the not-found receipt path has re-queued a check (safe to read under + // receiptChecker.cond). pendingItem fields are updated by the listener without + // pendingMux, so assert those only after Stop freezes the listener. + assert.Eventually(t, func() bool { + bcm.receiptChecker.cond.L.Lock() + l := bcm.receiptChecker.entries.Len() + bcm.receiptChecker.cond.L.Unlock() + return l == 1 + }, time.Second, 5*time.Millisecond) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + + bcm.pendingMux.Lock() + var cleared *pendingItem + for _, pi := range bcm.pending { + cleared = pi + break + } + bcm.pendingMux.Unlock() + assert.NotNil(t, cleared) + assert.Equal(t, "", cleared.blockHash) + assert.Equal(t, uint64(0), cleared.blockNumber) + assert.Nil(t, cleared.previousConfirmationCount) + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptMissingBlockHash(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + receiptChecked := make(chan struct{}, 1) + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Run(func(mock.Arguments) { + receiptChecked <- struct{}{} + }).Return(&ffcapi.TransactionReceiptResponse{ + TransactionReceiptResponseBase: ffcapi.TransactionReceiptResponseBase{ + BlockNumber: fftypes.NewFFBigInt(1001), + BlockHash: "", + }, + }, ffcapi.ErrorReason(""), nil).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + select { + case <-receiptChecked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for TransactionReceipt") + } + + pendingKey := (&Notification{NotificationType: NewEventLog, Event: eventToConfirm}).eventPendingItem().getKey() + bcm.pendingMux.Lock() + p := bcm.pending[pendingKey] + bcm.pendingMux.Unlock() + assert.NotNil(t, p) + assert.Equal(t, blockHash, p.blockHash) + assert.Equal(t, uint64(1001), p.blockNumber) + assert.Equal(t, 0, bcm.receiptChecker.entries.Len()) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptNilResponse(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + receiptChecked := make(chan struct{}, 1) + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Run(func(mock.Arguments) { + receiptChecked <- struct{}{} + }).Return(nil, ffcapi.ErrorReason(""), nil).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + select { + case <-receiptChecked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for TransactionReceipt") + } + + pendingKey := (&Notification{NotificationType: NewEventLog, Event: eventToConfirm}).eventPendingItem().getKey() + bcm.pendingMux.Lock() + p := bcm.pending[pendingKey] + bcm.pendingMux.Unlock() + assert.NotNil(t, p) + assert.Equal(t, blockHash, p.blockHash) + assert.Equal(t, 0, bcm.receiptChecker.entries.Len()) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptBlockHashMismatch(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + receiptHash := "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + receiptChecked := make(chan struct{}, 1) + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Run(func(mock.Arguments) { + receiptChecked <- struct{}{} + }).Return(&ffcapi.TransactionReceiptResponse{ + TransactionReceiptResponseBase: ffcapi.TransactionReceiptResponseBase{ + BlockNumber: fftypes.NewFFBigInt(1005), + BlockHash: receiptHash, + }, + }, ffcapi.ErrorReason(""), nil).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + select { + case <-receiptChecked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for TransactionReceipt") + } + + pendingKey := (&Notification{NotificationType: NewEventLog, Event: eventToConfirm}).eventPendingItem().getKey() + bcm.pendingMux.Lock() + p := bcm.pending[pendingKey] + bcm.pendingMux.Unlock() + assert.NotNil(t, p) + assert.Equal(t, receiptHash, p.blockHash) + assert.Equal(t, uint64(1005), p.blockNumber) + assert.Nil(t, p.previousConfirmationCount) + assert.Equal(t, 0, bcm.receiptChecker.entries.Len()) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + mca.AssertExpectations(t) +} + +func TestBlockConfirmationManagerHeadBlockNumberReceiptOtherErrorNoReschedule(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + blockHash := "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542" + confirmed := make(chan *apitypes.ConfirmationsNotification, 5) + eventToConfirm := &EventInfo{ + ID: &ffcapi.EventID{ + ListenerID: fftypes.NewUUID(), + TransactionHash: txHash, + BlockHash: blockHash, + BlockNumber: 1001, + TransactionIndex: 5, + LogIndex: 10, + }, + Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { + confirmed <- notification + }, + } + + receiptChecked := make(chan struct{}, 1) + mca.On("TransactionReceipt", mock.Anything, mock.MatchedBy(func(r *ffcapi.TransactionReceiptRequest) bool { + return r.TransactionHash == txHash + })).Run(func(mock.Arguments) { + receiptChecked <- struct{}{} + }).Return(nil, ffcapi.ErrorReason(""), errors.New("rpc unavailable")).Once() + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewEventLog, + Event: eventToConfirm, + })) + <-confirmed + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1004} + select { + case <-receiptChecked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for TransactionReceipt") + } + + pendingKey := (&Notification{NotificationType: NewEventLog, Event: eventToConfirm}).eventPendingItem().getKey() + bcm.pendingMux.Lock() + p := bcm.pending[pendingKey] + bcm.pendingMux.Unlock() + assert.NotNil(t, p) + assert.Equal(t, blockHash, p.blockHash) + assert.Equal(t, uint64(1001), p.blockNumber) + assert.Equal(t, 0, bcm.receiptChecker.entries.Len()) + + select { + case n := <-confirmed: + t.Fatalf("unexpected confirmation notification: %+v", n) + case <-time.After(50 * time.Millisecond): + } + + bcm.Stop() + mca.AssertExpectations(t) +} + +// TestBlockConfirmationManagerHeadBlockNumberNoOpWithoutReceipt tests confirmationCheckUsingHeadBlockNumber +// when a pending transaction has no block hash yet (no receipt): head updates must not call dispatch or TransactionReceipt. +func TestBlockConfirmationManagerHeadBlockNumberNoOpWithoutReceipt(t *testing.T) { + bcm, mca := newTestBlockConfirmationManagerHeadBlockNumber() + + txHash := "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7" + + bcm.Start() + ch := bcm.GetReceiveChannel() + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1001} + + assert.NoError(t, bcm.Notify(&Notification{ + NotificationType: NewTransaction, + Transaction: &TransactionInfo{ + TransactionHash: txHash, + }, + })) + + ch <- &ffcapi.BlockHashEvent{HeadBlockNumber: 1010} + + pendingKey := pendingKeyForTX(txHash) + assert.Eventually(t, func() bool { + bcm.pendingMux.Lock() + defer bcm.pendingMux.Unlock() + p := bcm.pending[pendingKey] + return p != nil && p.blockHash == "" && p.blockNumber == 0 + }, time.Second, 5*time.Millisecond) + + bcm.Stop() + mca.AssertExpectations(t) +} diff --git a/internal/confirmations/confirmed_block_listener.go b/internal/confirmations/confirmed_block_listener.go index 4d54408c..8b976d8f 100644 --- a/internal/confirmations/confirmed_block_listener.go +++ b/internal/confirmations/confirmed_block_listener.go @@ -57,7 +57,7 @@ type confirmedBlockListener struct { dispatcherTap chan struct{} eventStream chan<- *ffcapi.ListenerEvent connector ffcapi.API - requiredConfirmations int + requiredConfirmations uint64 retry *retry.Retry processorDone chan struct{} dispatcherDone chan struct{} @@ -69,6 +69,9 @@ func (bcm *blockConfirmationManager) StartConfirmedBlockListener(ctx context.Con } func (bcm *blockConfirmationManager) startConfirmedBlockListener(fgCtx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) (cbl *confirmedBlockListener, err error) { + if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight { + return nil, i18n.NewError(fgCtx, tmmsgs.MsgConfirmedBlockListenerUnsupportedMode, bcm.chainTrackingMode) + } cbl = &confirmedBlockListener{ bcm: bcm, // We need our own listener for each confirmed block stream, and the bcm has to fan out @@ -346,7 +349,7 @@ func (cbl *confirmedBlockListener) dispatchAllConfirmed() { for { var toDispatch *ffcapi.ListenerEvent cbl.stateLock.Lock() - if len(cbl.blocksSinceCheckpoint) > cbl.requiredConfirmations { + if uint64(len(cbl.blocksSinceCheckpoint)) > cbl.requiredConfirmations { block := cbl.blocksSinceCheckpoint[0] // don't want memory to grow indefinitely by shifting right, so we create a new slice here cbl.blocksSinceCheckpoint = append([]*apitypes.BlockInfo{}, cbl.blocksSinceCheckpoint[1:]...) diff --git a/internal/confirmations/confirmed_block_listener_test.go b/internal/confirmations/confirmed_block_listener_test.go index 65164e87..2c0b698f 100644 --- a/internal/confirmations/confirmed_block_listener_test.go +++ b/internal/confirmations/confirmed_block_listener_test.go @@ -45,7 +45,7 @@ func TestCBLCatchUpToHeadFromZeroNoConfirmations(t *testing.T) { cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch) assert.NoError(t, err) - for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ { + for i := 0; i < len(blocks)-int(bcm.requiredConfirmations); i++ { b := <-esDispatch assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) } @@ -73,13 +73,13 @@ func TestCBLCatchUpToHeadFromZeroWithConfirmations(t *testing.T) { cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch) assert.NoError(t, err) - for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ { + for i := 0; i < len(blocks)-int(bcm.requiredConfirmations); i++ { b := <-esDispatch assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) } time.Sleep(1 * time.Millisecond) - assert.Len(t, cbl.blocksSinceCheckpoint, bcm.requiredConfirmations) + assert.Len(t, cbl.blocksSinceCheckpoint, int(bcm.requiredConfirmations)) select { case <-esDispatch: assert.Fail(t, "should not have received block in confirmation window") @@ -120,14 +120,14 @@ func TestCBLListenFromCurrentBlock(t *testing.T) { BlockHashes: []string{blocks[1].BlockHash}, }) - for i := 5; i < len(blocks)-bcm.requiredConfirmations; i++ { + for i := 5; i < len(blocks)-int(bcm.requiredConfirmations); i++ { b := <-esDispatch assert.Equal(t, b.BlockEvent.BlockNumber, blocks[i].BlockNumber) assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) } time.Sleep(1 * time.Millisecond) - assert.Len(t, cbl.blocksSinceCheckpoint, bcm.requiredConfirmations) + assert.Len(t, cbl.blocksSinceCheckpoint, int(bcm.requiredConfirmations)) select { case <-esDispatch: assert.Fail(t, "should not have received block in confirmation window") @@ -248,11 +248,11 @@ func testCBLHandleReorgInConfirmationWindow(t *testing.T, blockLenBeforeReorg, o mbiHash := mca.On("BlockInfoByHash", mock.Anything, mock.Anything) mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocksAfterReorg) }) - bcm.requiredConfirmations = reqConf + bcm.requiredConfirmations = uint64(reqConf) cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch) assert.NoError(t, err) - for i := 0; i < len(blocksAfterReorg)-bcm.requiredConfirmations; i++ { + for i := 0; i < len(blocksAfterReorg)-int(bcm.requiredConfirmations); i++ { b := <-esDispatch dangerArea := len(blocksAfterReorg) - overlap if i >= overlap && i < (dangerArea-reqConf) { @@ -265,7 +265,7 @@ func testCBLHandleReorgInConfirmationWindow(t *testing.T, blockLenBeforeReorg, o } time.Sleep(1 * time.Millisecond) - assert.LessOrEqual(t, len(cbl.blocksSinceCheckpoint), bcm.requiredConfirmations) + assert.LessOrEqual(t, len(cbl.blocksSinceCheckpoint), int(bcm.requiredConfirmations)) select { case b := <-esDispatch: assert.Fail(t, fmt.Sprintf("should not have received block in confirmation window: %d/%s", b.BlockEvent.BlockNumber.Int64(), b.BlockEvent.BlockHash)) @@ -318,7 +318,7 @@ func TestCBLHandleRandomConflictingBlockNotification(t *testing.T) { assert.NoError(t, err) cbl.requiredConfirmations = 5 - for i := 0; i < len(blocks)-cbl.requiredConfirmations; i++ { + for i := 0; i < len(blocks)-int(cbl.requiredConfirmations); i++ { b := <-esDispatch assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) } diff --git a/internal/events/eventstream_test.go b/internal/events/eventstream_test.go index 2ea735e7..6a702f54 100644 --- a/internal/events/eventstream_test.go +++ b/internal/events/eventstream_test.go @@ -70,7 +70,8 @@ func testESConf(t *testing.T, j string) (spec *apitypes.EventStream) { func newTestEventStream(t *testing.T, conf string) (es *eventStream) { tmconfig.Reset() - es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf) + mca := &ffcapimocks.API{} + es, err := newTestEventStreamWithListener(t, mca, conf) assert.NoError(t, err) return es } @@ -90,7 +91,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str tmconfig.Reset() config.Set(tmconfig.EventStreamsDefaultsBatchTimeout, "1us") InitDefaults() - + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() ees, err := NewEventStream(context.Background(), testESConf(t, conf), mfc, &persistencemocks.Persistence{}, @@ -131,8 +132,10 @@ func TestNewTestEventStreamMissingID(t *testing.T) { tmconfig.Reset() InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() _, err := NewEventStream(context.Background(), &apitypes.EventStream{}, - &ffcapimocks.API{}, + mca, &persistencemocks.Persistence{}, &wsmocks.WebSocketChannels{}, []*apitypes.Listener{}, @@ -145,8 +148,10 @@ func TestNewTestEventStreamBadConfig(t *testing.T) { tmconfig.Reset() InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() _, err := NewEventStream(context.Background(), testESConf(t, `{}`), - &ffcapimocks.API{}, + mca, &persistencemocks.Persistence{}, &wsmocks.WebSocketChannels{}, []*apitypes.Listener{}, @@ -544,6 +549,7 @@ func TestAPIManagedEventStreamMissingListenerIDs(t *testing.T) { InitDefaults() mfc := &ffcapimocks.API{} + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() // Checkpoints are commonly tied to individual listeners, so it is critical that the caller manages // deterministically the IDs of the listeners passed in. They cannot be empty (an error will be returned) _, err := NewAPIManagedEventStream(context.Background(), @@ -571,6 +577,7 @@ func TestAPIManagedEventStreamE2E(t *testing.T) { } mfc := &ffcapimocks.API{} + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() ees, err := NewAPIManagedEventStream(context.Background(), testESConf(t, `{}`), mfc, @@ -750,7 +757,7 @@ func TestWebhookEventStreamsE2EAddAfterStart(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.MatchedBy(func(req *ffcapi.EventListenerVerifyOptionsRequest) bool { return req.FromBlock == "12345" && req.Options.JSONObject().GetString("option1") == "value1" })).Return(&ffcapi.EventListenerVerifyOptionsResponse{ @@ -858,7 +865,6 @@ func TestStartWithExistingStreamOk(t *testing.T) { } mfc := &ffcapimocks.API{} - mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) _, err := newTestEventStreamWithListener(t, mfc, `{ @@ -878,7 +884,6 @@ func TestStartWithExistingStreamFail(t *testing.T) { } mfc := &ffcapimocks.API{} - mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), fmt.Errorf("pop")) _, err := newTestEventStreamWithListener(t, mfc, `{ @@ -903,6 +908,7 @@ func TestUpdateStreamStarted(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -967,6 +973,7 @@ func TestAddRemoveListener(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1024,6 +1031,7 @@ func TestUpdateListenerAndDeleteStarted(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1095,6 +1103,7 @@ func TestUpdateListenerFail(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1175,6 +1184,7 @@ func TestUpdateStreamRestartFail(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1234,6 +1244,7 @@ func TestUpdateAttemptChangeSignature(t *testing.T) { mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{ ResolvedSignature: "sig1", }, ffcapi.ErrorReason(""), nil).Once() @@ -1399,7 +1410,7 @@ func TestAttemptResetNonExistentListener(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) - + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) l := &apitypes.Listener{ @@ -1428,7 +1439,7 @@ func TestUpdateStreamStopFail(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1486,6 +1497,7 @@ func TestResetListenerRestartFail(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1538,6 +1550,7 @@ func TestResetListenerWriteCheckpointFail(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) msp := es.checkpointsDB.(*persistencemocks.Persistence) @@ -1586,7 +1599,7 @@ func TestWebSocketBroadcastActionCloseDuringCheckpoint(t *testing.T) { } mfc := es.connector.(*ffcapimocks.API) - + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) started := make(chan *ffcapi.EventStreamStartRequest, 1) @@ -1661,6 +1674,7 @@ func TestActionRetryOk(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -1712,6 +1726,7 @@ func TestActionRetrySkip(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -1754,6 +1769,7 @@ func TestActionRetryBlock(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -1805,6 +1821,7 @@ func TestDeleteFail(t *testing.T) { }`) mfc := es.connector.(*ffcapimocks.API) + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { return r.ID.Equals(es.spec.ID) })).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil).Once() @@ -2223,6 +2240,7 @@ func TestStartAPIEventStreamStartFail(t *testing.T) { } mfc := &ffcapimocks.API{} + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() ees, err := NewAPIManagedEventStream(context.Background(), testESConf(t, `{}`), mfc, @@ -2255,6 +2273,7 @@ func TestStartAPIEventStreamPollContextCancelled(t *testing.T) { } mfc := &ffcapimocks.API{} + mfc.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() ees, err := NewAPIManagedEventStream(context.Background(), testESConf(t, `{}`), mfc, diff --git a/internal/tmmsgs/en_error_messages.go b/internal/tmmsgs/en_error_messages.go index 9fa2146e..d61b4ca4 100644 --- a/internal/tmmsgs/en_error_messages.go +++ b/internal/tmmsgs/en_error_messages.go @@ -109,4 +109,7 @@ var ( MsgStreamAPIManagedNameNoIDOrType = ffe("FF21092", "API managed streams must have a name, but no ID or type", http.StatusBadRequest) MsgUpdatePayloadEmpty = ffe("FF21093", "Update transaction must have a non-empty payload", http.StatusBadRequest) MsgTxHandlerUnsupportedFieldForUpdate = ffe("FF21094", "Update '%s' in the transaction is not supported by the transaction handler", http.StatusBadRequest) + MsgConfirmedBlockListenerUnsupportedMode = ffe("FF21095", "Block listener is not supported when chain tracking mode is '%s'", http.StatusBadRequest) + MsgTransactionReceiptMissingBlockHash = ffe("FF21096", "Transaction receipt missing block hash") + MsgTransactionReceiptBlockHashMismatch = ffe("FF21097", "Transaction receipt block hash mismatch. Expected %s, got %s") ) diff --git a/mocks/apiclientmocks/fftm_client.go b/mocks/apiclientmocks/fftm_client.go index 56b05cd4..e4db93b7 100644 --- a/mocks/apiclientmocks/fftm_client.go +++ b/mocks/apiclientmocks/fftm_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package apiclientmocks diff --git a/mocks/confirmationsmocks/manager.go b/mocks/confirmationsmocks/manager.go index 249dc4bc..f71b3a71 100644 --- a/mocks/confirmationsmocks/manager.go +++ b/mocks/confirmationsmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package confirmationsmocks @@ -37,12 +37,12 @@ func (_m *Manager) CheckInFlight(listenerID *fftypes.UUID) bool { return r0 } -// NewBlockHashes provides a mock function with no fields -func (_m *Manager) NewBlockHashes() chan<- *ffcapi.BlockHashEvent { +// GetReceiveChannel provides a mock function with no fields +func (_m *Manager) GetReceiveChannel() chan<- *ffcapi.BlockHashEvent { ret := _m.Called() if len(ret) == 0 { - panic("no return value specified for NewBlockHashes") + panic("no return value specified for GetReceiveChannel") } var r0 chan<- *ffcapi.BlockHashEvent diff --git a/mocks/eventsmocks/stream.go b/mocks/eventsmocks/stream.go index e41536bc..db0c72f1 100644 --- a/mocks/eventsmocks/stream.go +++ b/mocks/eventsmocks/stream.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package eventsmocks diff --git a/mocks/ffcapimocks/api.go b/mocks/ffcapimocks/api.go index aa767de6..c4b1264f 100644 --- a/mocks/ffcapimocks/api.go +++ b/mocks/ffcapimocks/api.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package ffcapimocks @@ -478,6 +478,24 @@ func (_m *API) GasPriceEstimate(ctx context.Context, req *ffcapi.GasPriceEstimat return r0, r1, r2 } +// GetChainTrackingMode provides a mock function with given fields: ctx +func (_m *API) GetChainTrackingMode(ctx context.Context) ffcapi.ChainTrackingMode { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetChainTrackingMode") + } + + var r0 ffcapi.ChainTrackingMode + if rf, ok := ret.Get(0).(func(context.Context) ffcapi.ChainTrackingMode); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(ffcapi.ChainTrackingMode) + } + + return r0 +} + // IsLive provides a mock function with given fields: ctx func (_m *API) IsLive(ctx context.Context) (*ffcapi.LiveResponse, ffcapi.ErrorReason, error) { ret := _m.Called(ctx) diff --git a/mocks/metricsmocks/event_metrics_emitter.go b/mocks/metricsmocks/event_metrics_emitter.go index 94c0cf19..eb92f91e 100644 --- a/mocks/metricsmocks/event_metrics_emitter.go +++ b/mocks/metricsmocks/event_metrics_emitter.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package metricsmocks diff --git a/mocks/metricsmocks/transaction_handler_metrics.go b/mocks/metricsmocks/transaction_handler_metrics.go index a4102a7c..f6033653 100644 --- a/mocks/metricsmocks/transaction_handler_metrics.go +++ b/mocks/metricsmocks/transaction_handler_metrics.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package metricsmocks diff --git a/mocks/persistencemocks/persistence.go b/mocks/persistencemocks/persistence.go index 64fbb461..78e07958 100644 --- a/mocks/persistencemocks/persistence.go +++ b/mocks/persistencemocks/persistence.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package persistencemocks diff --git a/mocks/persistencemocks/rich_query.go b/mocks/persistencemocks/rich_query.go index 9253d685..5cd5f614 100644 --- a/mocks/persistencemocks/rich_query.go +++ b/mocks/persistencemocks/rich_query.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package persistencemocks diff --git a/mocks/persistencemocks/transaction_persistence.go b/mocks/persistencemocks/transaction_persistence.go index bb630875..8e277c23 100644 --- a/mocks/persistencemocks/transaction_persistence.go +++ b/mocks/persistencemocks/transaction_persistence.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package persistencemocks diff --git a/mocks/txhandlermocks/managed_tx_event_handler.go b/mocks/txhandlermocks/managed_tx_event_handler.go index 9206d4c3..0f1f2183 100644 --- a/mocks/txhandlermocks/managed_tx_event_handler.go +++ b/mocks/txhandlermocks/managed_tx_event_handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package txhandlermocks diff --git a/mocks/txhandlermocks/transaction_handler.go b/mocks/txhandlermocks/transaction_handler.go index d61bef85..f743cac5 100644 --- a/mocks/txhandlermocks/transaction_handler.go +++ b/mocks/txhandlermocks/transaction_handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package txhandlermocks diff --git a/mocks/wsmocks/web_socket_channels.go b/mocks/wsmocks/web_socket_channels.go index c16d00f4..825c5799 100644 --- a/mocks/wsmocks/web_socket_channels.go +++ b/mocks/wsmocks/web_socket_channels.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package wsmocks diff --git a/mocks/wsmocks/web_socket_server.go b/mocks/wsmocks/web_socket_server.go index fdaf1d68..7bd01348 100644 --- a/mocks/wsmocks/web_socket_server.go +++ b/mocks/wsmocks/web_socket_server.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package wsmocks diff --git a/pkg/apitypes/api_types.go b/pkg/apitypes/api_types.go index 2d46e54c..569eb6f3 100644 --- a/pkg/apitypes/api_types.go +++ b/pkg/apitypes/api_types.go @@ -437,8 +437,14 @@ type ConfirmationsNotification struct { // NewFork is true when NewConfirmations is a complete list of confirmations. // Otherwise, Confirmations is an additive delta on top of a previous list of confirmations. NewFork bool - // Confirmations is the list of confirmations being notified - assured to be non-nil, but might be empty. + // Confirmations is the list of confirmations being notified - assured to be non-nil. + // The list will only be populated if chainTrackingMode is set to full. Confirmations []*Confirmation + // CurrentConfirmationCount is the actual number of confirmations for the transaction + CurrentConfirmationCount uint64 `json:"currentConfirmationCount"` + + // TargetConfirmationCount is the target number of confirmations to confirm the transaction + TargetConfirmationCount uint64 `json:"targetConfirmationCount"` } type Confirmation struct { diff --git a/pkg/ffcapi/api.go b/pkg/ffcapi/api.go index 88c93953..82417405 100644 --- a/pkg/ffcapi/api.go +++ b/pkg/ffcapi/api.go @@ -61,6 +61,9 @@ type API interface { // DeployContractPrepare DeployContractPrepare(ctx context.Context, req *ContractDeployPrepareRequest) (*TransactionPrepareResponse, ErrorReason, error) + // GetChainTrackingMode gets the tracking mode of the chain progress, which determines the level of detail of the confirmation result and the support of block listener + GetChainTrackingMode(ctx context.Context) ChainTrackingMode + // EventStreamStart starts an event stream with an initial set of listeners (which might be empty), a channel to deliver events, and a context that will close to stop the stream EventStreamStart(ctx context.Context, req *EventStreamStartRequest) (*EventStreamStartResponse, ErrorReason, error) @@ -95,6 +98,15 @@ type API interface { IsReady(ctx context.Context) (*ReadyResponse, ErrorReason, error) } +type ChainTrackingMode string + +const ( + // ChainTrackingModeLight - in this mode, the connector fetches the head block number only, no block details are fetched. Therefore, block listener is not supported, confirmation result contains only the number of the confirmation. + ChainTrackingModeLight ChainTrackingMode = "light" + // ChainTrackingModeFull - (default) in this mode, the connector fetches the head block number and downloads block details and maintain a consistent in-memory partial chain. Block listener is supported, and confirmation result contains extra block details as well as the number of the confirmation. + ChainTrackingModeFull ChainTrackingMode = "full" +) + type ConfirmationUpdateResult struct { // a linked list of accumulated confirmations for a transaction // the list is sorted by block number @@ -104,7 +116,10 @@ type ConfirmationUpdateResult struct { // in the in-memory partial chain // WARNING: mutation to this list is not expected, invalid modifications will cause inefficiencies in the reconciliation process // `rebuilt` will be true if an invalid confirmation list is detected by the reconciliation process - Confirmations []*MinimalBlockInfo `json:"confirmations,omitempty"` + Confirmations []*MinimalBlockInfo `json:"confirmations,omitempty"` // the current list of confirmations for this reconcile request, only returned when chainTrackingMode is set to full + + CurrentConfirmationCount uint64 `json:"currentConfirmationCount"` // the current number of confirmations for this reconcile request + Receipt *TransactionReceiptResponse `json:"receipt,omitempty"` // receipt for the transaction Rebuilt bool `json:"rebuilt,omitempty"` // when true, it means the existing confirmations contained invalid blocks, the new confirmations are rebuilt from scratch NewFork bool `json:"newFork,omitempty"` // when true, it means a new fork was detected based on the existing confirmations @@ -126,9 +141,10 @@ func (c *MinimalBlockInfo) IsParentOf(other *MinimalBlockInfo) bool { } type BlockHashEvent struct { - BlockHashes []string `json:"blockHash"` // zero or more hashes (can be nil) - GapPotential bool `json:"gapPotential,omitempty"` // when true, the caller cannot be sure if blocks have been missed (use on reconnect of a websocket for example) - Created *fftypes.FFTime `json:"created,omitempty"` // timestamp when the blockhash event is created + BlockHashes []string `json:"blockHash"` // zero or more hashes (can be nil) + GapPotential bool `json:"gapPotential,omitempty"` // when true, the caller cannot be sure if blocks have been missed (use on reconnect of a websocket for example) + Created *fftypes.FFTime `json:"created,omitempty"` // timestamp when the blockhash event is created + HeadBlockNumber uint64 `json:"headBlockNumber"` // the highest block seen by the connector } // EventID are the set of required fields an FFCAPI compatible connector needs to map to the underlying blockchain constructs, to uniquely identify an event diff --git a/pkg/fftm/manager_test.go b/pkg/fftm/manager_test.go index 7e91ff66..0348095c 100644 --- a/pkg/fftm/manager_test.go +++ b/pkg/fftm/manager_test.go @@ -82,6 +82,7 @@ func newTestManager(t *testing.T) (string, *manager, func()) { mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mm, err := NewManager(context.Background(), mca) assert.NoError(t, err) @@ -105,7 +106,7 @@ func newTestManagerMockNoRichDB(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t) mca := &ffcapimocks.API{} - + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() m := newManager(context.Background(), mca) mpm := &persistencemocks.Persistence{} @@ -133,7 +134,7 @@ func newTestManagerMockRichDB(t *testing.T) (string, *manager, *persistencemocks url := testManagerCommonInit(t) mca := &ffcapimocks.API{} - + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() m := newManager(context.Background(), mca) mpm := &persistencemocks.Persistence{} @@ -179,6 +180,7 @@ func newTestManagerWithMetrics(t *testing.T, deprecated bool) (string, *manager, mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() mm, err := NewManager(context.Background(), mca) assert.NoError(t, err) @@ -198,7 +200,9 @@ func newTestManagerMockPersistence(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t) - m := newManager(context.Background(), &ffcapimocks.API{}) + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() + m := newManager(context.Background(), mca) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() m.persistence = mp @@ -241,8 +245,9 @@ func TestNewManagerWithLegacyConfiguration(t *testing.T) { tmconfig.APIConfig.Set(httpserver.HTTPConfAddress, "127.0.0.1") tmconfig.DeprecatedPolicyEngineBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - - m := newManager(context.Background(), &ffcapimocks.API{}) + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() + m := newManager(context.Background(), mca) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() m.persistence = mp @@ -262,7 +267,9 @@ func TestNewManagerBadHttpConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() + _, err := NewManager(context.Background(), mca) assert.Error(t, err) assert.Regexp(t, "FF00151", err) @@ -281,7 +288,9 @@ func TestNewManagerBadLevelDBConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err = NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() + _, err = NewManager(context.Background(), mca) assert.Regexp(t, "FF21049", err) } @@ -295,7 +304,9 @@ func TestNewManagerBadPersistenceConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() + _, err := NewManager(context.Background(), mca) assert.Regexp(t, "FF21043", err) } @@ -307,7 +318,9 @@ func TestNewManagerInvalidTransactionHandlerName(t *testing.T) { config.Set(tmconfig.PersistenceLevelDBPath, dir) config.Set(tmconfig.TransactionsHandlerName, "wrong") - _, err := NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() + _, err := NewManager(context.Background(), mca) assert.Regexp(t, "FF21070", err) } @@ -351,7 +364,9 @@ func TestNewManagerWithMetricsBadConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + mca := &ffcapimocks.API{} + mca.On("GetChainTrackingMode", mock.Anything).Return(ffcapi.ChainTrackingModeFull, nil).Maybe() + _, err := NewManager(context.Background(), mca) assert.Error(t, err) assert.Regexp(t, "FF00151", err) }