Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/blocklistener/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type NewBlockHashConsumer interface {
NewBlockHashes() chan<- *ffcapi.BlockHashEvent
GetReceiveChannel() chan<- *ffcapi.BlockHashEvent
}

// BufferChannel ensures it always pulls blocks from the channel passed to the connector
Expand All @@ -49,7 +49,7 @@ func BufferChannel(ctx context.Context, target NewBlockHashConsumer) (buffered c
// Have to discard this
blockedUpdate.GapPotential = true // there is a gap for sure at this point
log.L(ctx).Debugf("Blocked event stream missed new block event: %v", blockUpdate.BlockHashes)
case target.NewBlockHashes() <- blockedUpdate:
case target.GetReceiveChannel() <- blockedUpdate:
// We're not blocked any more
log.L(ctx).Infof("Event stream block-listener unblocked")
blockedUpdate = nil
Expand All @@ -64,7 +64,7 @@ func BufferChannel(ctx context.Context, target NewBlockHashConsumer) (buffered c
// Nothing to do unless we have confirmations turned on
if target != nil {
select {
case target.NewBlockHashes() <- update:
case target.GetReceiveChannel() <- update:
// all good, we passed it on
default:
// we can't deliver it immediately, we switch to blocked mode
Expand Down
2 changes: 1 addition & 1 deletion internal/blocklistener/blocklistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type testBlockConsumer struct {
c chan *ffcapi.BlockHashEvent
}

func (tbc *testBlockConsumer) NewBlockHashes() chan<- *ffcapi.BlockHashEvent {
func (tbc *testBlockConsumer) GetReceiveChannel() chan<- *ffcapi.BlockHashEvent {
return tbc.c
}

Expand Down
179 changes: 145 additions & 34 deletions internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"

Expand All @@ -43,7 +44,7 @@ type Manager interface {
Notify(n *Notification) error
Start()
Stop()
NewBlockHashes() chan<- *ffcapi.BlockHashEvent
GetReceiveChannel() chan<- *ffcapi.BlockHashEvent
CheckInFlight(listenerID *fftypes.UUID) bool
StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error
StopConfirmedBlockListener(ctx context.Context, id *fftypes.UUID) error
Expand Down Expand Up @@ -89,14 +90,16 @@ type blockConfirmationManager struct {
baseContext context.Context
ctx context.Context
cancelFunc func()
newBlockHashes chan *ffcapi.BlockHashEvent
newBlockHashEvents chan *ffcapi.BlockHashEvent
connector ffcapi.API
blockListenerStale bool
metricsEmitter metrics.ConfirmationMetricsEmitter
requiredConfirmations int
requiredConfirmations uint64
chainTrackingMode ffcapi.ChainTrackingMode
staleReceiptTimeout time.Duration
bcmNotifications chan *Notification
highestBlockSeen uint64
headBlockNumber uint64 // used for confirmation when the connector is running in light mode
pending map[string]*pendingItem
pendingMux sync.Mutex
receiptChecker *receiptChecker
Expand All @@ -114,11 +117,12 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A
connector: connector,
cbls: make(map[fftypes.UUID]*confirmedBlockListener),
blockListenerStale: true,
requiredConfirmations: config.GetInt(tmconfig.ConfirmationsRequired),
requiredConfirmations: config.GetUint64(tmconfig.ConfirmationsRequired),
chainTrackingMode: connector.GetChainTrackingMode(baseContext),
staleReceiptTimeout: config.GetDuration(tmconfig.ConfirmationsStaleReceiptTimeout),
bcmNotifications: make(chan *Notification, config.GetInt(tmconfig.ConfirmationsNotificationQueueLength)),
pending: make(map[string]*pendingItem),
newBlockHashes: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)),
newBlockHashEvents: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)),
metricsEmitter: cme,
retry: &retry.Retry{
InitialDelay: config.GetDuration(tmconfig.ConfirmationsRetryInitDelay),
Expand All @@ -143,22 +147,23 @@ const (
// pendingItem could be a specific event that has been detected, but not confirmed yet.
// Or it could be a transaction
type pendingItem struct {
pType pendingType
added time.Time
notifiedConfirmations []*apitypes.Confirmation
confirmations []*apitypes.Confirmation
scheduledAtLeastOnce bool
confirmed bool
queuedStale *list.Element // protected by receiptChecker mux
lastReceiptCheck time.Time // protected by receiptChecker mux
receiptCallback func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse)
confirmationsCallback func(ctx context.Context, notification *apitypes.ConfirmationsNotification)
transactionHash string
blockHash string // can be notified of changes to this for receipts
blockNumber uint64 // known at creation time for event logs
transactionIndex uint64 // known at creation time for event logs
logIndex uint64 // events only
listenerID *fftypes.UUID // events only
pType pendingType
added time.Time
notifiedConfirmations []*apitypes.Confirmation
confirmations []*apitypes.Confirmation
scheduledAtLeastOnce bool
confirmed bool
previousConfirmationCount *uint64 // headBlockNumber mode: last dispatched CurrentConfirmationCount
queuedStale *list.Element // protected by receiptChecker mux
lastReceiptCheck time.Time // protected by receiptChecker mux
receiptCallback func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse)
confirmationsCallback func(ctx context.Context, notification *apitypes.ConfirmationsNotification)
transactionHash string
blockHash string // can be notified of changes to this for receipts
blockNumber uint64 // known at creation time for event logs
transactionIndex uint64 // known at creation time for event logs
logIndex uint64 // events only
listenerID *fftypes.UUID // events only
}

func pendingKeyForTX(txHash string) string {
Expand Down Expand Up @@ -243,8 +248,8 @@ func (bcm *blockConfirmationManager) Stop() {
}
}

func (bcm *blockConfirmationManager) NewBlockHashes() chan<- *ffcapi.BlockHashEvent {
return bcm.newBlockHashes
func (bcm *blockConfirmationManager) GetReceiveChannel() chan<- *ffcapi.BlockHashEvent {
return bcm.newBlockHashEvents
}

// Notify is used to notify the confirmation manager of detection of a new logEntry addition or removal
Expand Down Expand Up @@ -365,12 +370,12 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
receivedFirstBlock := false
for {
select {
case bhe := <-bcm.newBlockHashes:
case bhe := <-bcm.newBlockHashEvents:
if bhe.GapPotential {
bcm.blockListenerStale = true
}
blockHashes = append(blockHashes, bhe.BlockHashes...)

bcm.headBlockNumber = bhe.HeadBlockNumber // always update the head block number, NOTE: the number can decrease during a re-org
// Need to also pass this event to any confirmed block listeners
// (they promise to always be efficient in handling these, having a go-routine
// dedicated to spinning fast just processing those separate to dispatching them)
Expand Down Expand Up @@ -406,9 +411,12 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
blocks := bcm.newBlockState()

if bcm.blockListenerStale {
if err := bcm.walkChain(blocks); err != nil {
log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err)
continue
if bcm.chainTrackingMode == ffcapi.ChainTrackingModeFull {
// only need to build the in memory partial chain when the block details are available
if err := bcm.walkChain(blocks); err != nil {
log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err)
continue
}
}
bcm.blockListenerStale = false
}
Expand Down Expand Up @@ -548,6 +556,11 @@ func (bcm *blockConfirmationManager) removeItem(pending *pendingItem, stale bool
}

func (bcm *blockConfirmationManager) processBlockHashes(blockHashes []string) {
if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight {
// for light chain tracking mode, no block details are available, only need to calculate the number of confirmations using head block number
bcm.checkAndDispatchConfirmationsUsingBlockHeight()
return
}
batchSize := len(blockHashes)
if batchSize > 0 {
log.L(bcm.ctx).Debugf("New block notifications %v", blockHashes)
Expand Down Expand Up @@ -599,7 +612,6 @@ func (bcm *blockConfirmationManager) processBlock(block *apitypes.BlockInfo) {
var notifications pendingItems
for pendingKey, pending := range bcm.pending {
if pending.blockHash != "" {

// The block might appear at any point in the confirmation list
newConfirmations := false
expectedParentHash := pending.blockHash
Expand All @@ -622,7 +634,7 @@ func (bcm *blockConfirmationManager) processBlock(block *apitypes.BlockInfo) {
}
expectedBlockNumber++
}
if len(pending.confirmations) >= bcm.requiredConfirmations {
if uint64(len(pending.confirmations)) >= bcm.requiredConfirmations {
pending.confirmed = true
}
if bcm.requiredConfirmations > 0 && (pending.confirmed || newConfirmations) {
Expand Down Expand Up @@ -666,14 +678,21 @@ func (bcm *blockConfirmationManager) dispatchConfirmations(item *pendingItem) {
}
}

confirmationCount := uint64(len(item.confirmations))
if confirmationCount > bcm.requiredConfirmations {
confirmationCount = bcm.requiredConfirmations
}

// Possible for us to re-dispatch the same confirmations, if we are notified about a block later after
// after we previously did a crawl for blocks.
// So we protect here against dispatching an empty array
if len(notificationConfirmations) > 0 || item.confirmed {
notification := &apitypes.ConfirmationsNotification{
Confirmed: item.confirmed,
NewFork: newFork,
Confirmations: notificationConfirmations,
Confirmed: item.confirmed,
CurrentConfirmationCount: confirmationCount,
TargetConfirmationCount: bcm.requiredConfirmations,
NewFork: newFork,
Confirmations: notificationConfirmations,
}
// Take a copy of the notification confirmations so we know what we have previously notified next time round
// (not safe to keep a reference, in case it's modified by the callback).
Expand Down Expand Up @@ -768,6 +787,10 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc
return nil
}

if bcm.chainTrackingMode == ffcapi.ChainTrackingModeLight {
return bcm.confirmationCheckUsingHeadBlockNumber(pending)
}

pendingKey := pending.getKey()

blockNumber := pending.blockNumber + 1
Expand Down Expand Up @@ -796,7 +819,7 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc
break
}
pending.confirmations = append(pending.confirmations, apitypes.ConfirmationFromBlock(block))
if len(pending.confirmations) >= bcm.requiredConfirmations {
if uint64(len(pending.confirmations)) >= bcm.requiredConfirmations {
pending.confirmed = true
break
}
Expand All @@ -811,3 +834,91 @@ func (bcm *blockConfirmationManager) walkChainForItem(pending *pendingItem, bloc
return nil

}

func (bcm *blockConfirmationManager) checkAndDispatchConfirmationsUsingBlockHeight() {
bcm.pendingMux.Lock()
items := make([]*pendingItem, 0, len(bcm.pending))
for _, p := range bcm.pending {
items = append(items, p)
}
bcm.pendingMux.Unlock()
for _, p := range items {
if err := bcm.confirmationCheckUsingHeadBlockNumber(p); err != nil {
log.L(bcm.ctx).Errorf("Block height confirmation refresh failed for %s: %s", p.getKey(), err)
}
}
}

func (bcm *blockConfirmationManager) confirmationCheckUsingHeadBlockNumber(pending *pendingItem) error {
if pending.blockHash == "" {
// no receipt yet, so no confirmation check
return nil
}

currentConfirmationCount := uint64(0)
if bcm.headBlockNumber > pending.blockNumber {
currentConfirmationCount = bcm.headBlockNumber - pending.blockNumber
}

return bcm.dispatchBlockHeightConfirmations(pending, currentConfirmationCount)

}

func (bcm *blockConfirmationManager) dispatchBlockHeightConfirmations(pending *pendingItem, count uint64) error {
if pending.previousConfirmationCount != nil && count == *pending.previousConfirmationCount {
// no ops as there isn't any changes detected
return nil
}

confirmationCount := count
if confirmationCount > bcm.requiredConfirmations {
confirmationCount = bcm.requiredConfirmations
}

confirmed := confirmationCount == bcm.requiredConfirmations
if confirmed {
// do confirmation check here to ensure the transaction receipt is still valid
res, reason, err := bcm.connector.TransactionReceipt(bcm.ctx, &ffcapi.TransactionReceiptRequest{
TransactionHash: pending.transactionHash,
})
if err != nil {
if reason == ffcapi.ErrorReasonNotFound {
// need to schedule the receipt check again as the receipt is no longer valid
pending.blockHash = ""
pending.blockNumber = 0
pending.previousConfirmationCount = nil
bcm.receiptChecker.schedule(pending, true)
}
return err
}
if res == nil || res.BlockHash == "" {
return i18n.NewError(bcm.ctx, tmmsgs.MsgTransactionReceiptMissingBlockHash)
}
if !strings.EqualFold(strings.ToLower(res.BlockHash), strings.ToLower(pending.blockHash)) {
// set the new block hash and number and requeue for confirmation check
pending.blockHash = res.BlockHash
pending.blockNumber = res.BlockNumber.Uint64()
pending.previousConfirmationCount = nil
return i18n.NewError(bcm.ctx, tmmsgs.MsgTransactionReceiptBlockHashMismatch, pending.blockHash, res.BlockHash)
}
}
// when not confirmed, we assume the receipt is still valid and return the actual confirmation count

notification := &apitypes.ConfirmationsNotification{
Confirmed: confirmed,
NewFork: pending.previousConfirmationCount != nil && count < *pending.previousConfirmationCount, // the only fork situation we can detect is a reduction in confirmation count
CurrentConfirmationCount: confirmationCount, // NOTE: we returns the actual confirmation count, which can be higher than the required confirmation count
TargetConfirmationCount: bcm.requiredConfirmations,
}
pending.previousConfirmationCount = &count

log.L(bcm.ctx).Infof("Block height confirmation notification item=%s confirmed=%t count=%d/%d newFork=%t",
pending.getKey(), notification.Confirmed, notification.CurrentConfirmationCount, notification.TargetConfirmationCount, notification.NewFork)
pending.confirmationsCallback(bcm.ctx, notification)
bcm.metricsEmitter.RecordConfirmationMetrics(bcm.ctx, time.Since(pending.added).Seconds())

if confirmed {
bcm.removeItem(pending, false)
}
return nil
}
Loading