Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/ethereum/event_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ func mockStreamLoopEmpty(mRPC *rpcbackendmocks.Backend) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*string) = testBlockFilterID1
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]ethtypes.HexBytes0xPrefix) = nil
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*string) = testLogsFilterID1
}).Maybe()
Expand Down
49 changes: 49 additions & 0 deletions internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*string) = testBlockFilterID1
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]ethtypes.HexBytes0xPrefix) = nil
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
*args[1].(*string) = testLogsFilterID1
Expand Down Expand Up @@ -521,6 +528,13 @@ func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexInteger64(10)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*string) = testBlockFilterID1
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]ethtypes.HexBytes0xPrefix) = nil
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
assert.Equal(t, int64(0), args[3].(*ethrpc.LogFilterJSONRPC).FromBlock.BigInt().Int64())
Expand Down Expand Up @@ -562,6 +576,13 @@ func TestLeadGroupCatchupRetry(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*string) = testBlockFilterID1
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]ethtypes.HexBytes0xPrefix) = nil
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "pop"}).
Run(func(args mock.Arguments) {
close(retried)
Expand Down Expand Up @@ -620,6 +641,13 @@ func TestStreamLoopNewFilterFail(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*string) = testBlockFilterID1
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]ethtypes.HexBytes0xPrefix) = nil
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(&rpcbackend.RPCError{Message: "pop"}).
Run(func(args mock.Arguments) {
close(retried)
Expand Down Expand Up @@ -701,6 +729,13 @@ func TestStreamLoopChangeFilter(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*string) = testBlockFilterID1
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]ethtypes.HexBytes0xPrefix) = nil
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
es := <-esChl
Expand Down Expand Up @@ -750,6 +785,13 @@ func TestStreamLoopFilterReset(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*string) = testBlockFilterID1
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]ethtypes.HexBytes0xPrefix) = nil
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
*args[1].(*string) = testLogsFilterID1
Expand Down Expand Up @@ -797,6 +839,13 @@ func TestStreamLoopEnrichFail(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*string) = testBlockFilterID1
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]ethtypes.HexBytes0xPrefix) = nil
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
*args[1].(*string) = testLogsFilterID1
Expand Down
1 change: 1 addition & 0 deletions internal/ethereum/new_block_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestNewBlockListenerOK(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(1000)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = testBlockFilterID1
Expand Down
1 change: 1 addition & 0 deletions internal/msgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,5 @@ var (
MsgInvalidChainTrackingMode = ffe("FF23069", "Invalid chain tracking mode '%s': must be 'light' or 'full'")
MsgTransactionNotIncludedInChainHead = ffe("FF23070", "Transaction '%s' cannot be reconciled because chain head %d is before receipt block %s")
MsgTransactionEstimateTooLargeForBlock = ffe("FF23071", "Gas estimate %s (scaled at %.2f from estimate %s) too large for the current block gas limit %s")
MsgMonitoredHeadLengthInvalid = ffe("FF23072", "Monitored head length must be greater than or equal to 1 value=%d")
)
71 changes: 60 additions & 11 deletions pkg/ethblocklistener/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type blockListener struct {
BlockListenerConfig

// canonical chain
monitoredHeadLength uint64
canonicalChainLock sync.RWMutex // covers highestBlock and canonicalChain
canonicalChain *list.List
highestBlockSet bool
Expand Down Expand Up @@ -168,6 +169,10 @@ func NewBlockListenerSupplyBackend(ctx context.Context, retry *retry.Retry, conf
blockFetchConcurrencyThrottle: make(chan *blockReceiptRequest, conf.MaxAsyncBlockFetchConcurrency),
BlockListenerConfig: *conf,
}
if conf.MonitoredHeadLength <= 0 {
return nil, i18n.WrapError(ctx, err, msgs.MsgMonitoredHeadLengthInvalid, conf.MonitoredHeadLength)
}
bl.monitoredHeadLength = uint64(conf.MonitoredHeadLength)
bl.blockCache, err = lru.New(conf.BlockCacheSize)
if err != nil {
return nil, i18n.WrapError(ctx, err, msgs.MsgCacheInitFail, "block")
Expand All @@ -187,6 +192,35 @@ func (bl *blockListener) GetMonitoredHeadLength() int {
return bl.BlockListenerConfig.MonitoredHeadLength
}

// seedMonitoredHead fetches the single anchor block at highestBlock-MonitoredHeadLength+1.
// The returned block is used by the listen loop to seed the canonical chain on the first
// iteration via reconcileCanonicalChain, so that the chain is populated before the first
// filter poll and confirmations can be delivered as soon as they arrive.
func (bl *blockListener) seedMonitoredHead() *ethrpc.BlockInfoJSONRPC {
bl.canonicalChainLock.RLock()
highestBlockSet := bl.highestBlockSet
startBlock := uint64(0)
if bl.highestBlock >= bl.monitoredHeadLength {
startBlock = bl.highestBlock - bl.monitoredHeadLength + 1
}
bl.canonicalChainLock.RUnlock()

if !highestBlockSet {
return nil
}

var bi *ethrpc.BlockInfoJSONRPC
if err := bl.retry.Do(bl.ctx, "seed monitored head", func(_ int) (retry bool, err error) {
bi, err = bl.GetBlockInfoByNumber(bl.ctx, startBlock, false, "", "")
return err != nil, err
}); err != nil || bi == nil {
log.L(bl.ctx).Warnf("Failed to seed monitored head at block %d: %v", startBlock, err)
return nil
}
log.L(bl.ctx).Infof("Seeded monitored head at block %d", startBlock)
return bi
}

// setting block filter status updates that new block filter has been created
func (bl *blockListener) markStarted() {
if !bl.isStarted {
Expand Down Expand Up @@ -289,6 +323,14 @@ func (bl *blockListener) listenLoop() {
log.L(bl.ctx).Warnf("Block listener exiting before establishing initial block height: %s", err)
}

// Seed the canonical chain before starting the filter loop (not applicable in light mode).
// The seed block is reconciled on the first loop iteration instead of polling the filter,
// so the in-memory chain is pre-populated and confirmations can be delivered immediately.
var seedBi *ethrpc.BlockInfoJSONRPC
if bl.ChainTrackingMode != ffcapi.ChainTrackingModeLight {
seedBi = bl.seedMonitoredHead()
}

var filter string
failCount := 0
gapPotential := true
Expand Down Expand Up @@ -321,19 +363,27 @@ func (bl *blockListener) listenLoop() {
bl.markStarted()
}

// On the first iteration use the seed block (leaves blockHashes nil).
// On subsequent iterations poll the filter for new block hashes.
var blockHashes []ethtypes.HexBytes0xPrefix
rpcErr := bl.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter)
if rpcErr != nil {
if etherrors.MapError(etherrors.FilterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(bl.ctx).Warnf("Block filter '%v' no longer valid. Recreating filter: %s", filter, rpcErr.Message)
filter = ""
gapPotential = true
var notifyPos *list.Element
if seedBi != nil {
notifyPos = bl.reconcileCanonicalChain(seedBi)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Given the rebuild/catch-up of the in-memory chain relies on the new block not matching the existing tail block in the in-memory chain. So this first iteration here will only initiate the in-memory chain. The actual rebuild happens in the next iteration if at least 1 block hash has been discovered.

seedBi = nil
} else {
rpcErr := bl.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter)
if rpcErr != nil {
if etherrors.MapError(etherrors.FilterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(bl.ctx).Warnf("Block filter '%v' no longer valid. Recreating filter: %s", filter, rpcErr.Message)
filter = ""
gapPotential = true
}
log.L(bl.ctx).Errorf("Failed to query block filter changes: %s", rpcErr.Message)
failCount++
continue
}
log.L(bl.ctx).Errorf("Failed to query block filter changes: %s", rpcErr.Message)
failCount++
continue
log.L(bl.ctx).Debugf("Block filter received new block hashes: %+v", blockHashes)
Comment thread
Chengxuan marked this conversation as resolved.
}
log.L(bl.ctx).Debugf("Block filter received new block hashes: %+v", blockHashes)

if bl.ChainTrackingMode == ffcapi.ChainTrackingModeLight {
head, err := bl.refreshHighestBlockFromRPC()
Expand All @@ -360,7 +410,6 @@ func (bl *blockListener) listenLoop() {
}

update := &ffcapi.BlockHashEvent{GapPotential: gapPotential, Created: fftypes.Now()}
var notifyPos *list.Element
for _, h := range blockHashes {
if len(h) != 32 {
if !bl.HederaCompatibilityMode {
Expand Down
Loading
Loading