diff --git a/internal/ethereum/event_actions_test.go b/internal/ethereum/event_actions_test.go index 914cb03..f5eb120 100644 --- a/internal/ethereum/event_actions_test.go +++ b/internal/ethereum/event_actions_test.go @@ -83,6 +83,13 @@ func mockStreamLoopEmpty(mRPC *rpcbackendmocks.Backend) { hbh := args[1].(*ethtypes.HexInteger) *hbh = *ethtypes.NewHexInteger64(testHighBlock) }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*string) = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*string) = testLogsFilterID1 }).Maybe() diff --git a/internal/ethereum/event_stream_test.go b/internal/ethereum/event_stream_test.go index 4292cb5..bb0ea7d 100644 --- a/internal/ethereum/event_stream_test.go +++ b/internal/ethereum/event_stream_test.go @@ -447,6 +447,13 @@ func TestLeadGroupDeliverEvents(t *testing.T) { mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { *args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexInteger64(testHighBlock) }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*string) = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { *args[1].(*string) = testLogsFilterID1 @@ -521,6 +528,13 @@ func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) { mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { *args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexInteger64(10) }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*string) = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { assert.Equal(t, int64(0), args[3].(*ethrpc.LogFilterJSONRPC).FromBlock.BigInt().Int64()) @@ -562,6 +576,13 @@ func TestLeadGroupCatchupRetry(t *testing.T) { hbh := args[1].(*ethtypes.HexInteger) *hbh = *ethtypes.NewHexInteger64(testHighBlock) }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*string) = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "pop"}). Run(func(args mock.Arguments) { close(retried) @@ -620,6 +641,13 @@ func TestStreamLoopNewFilterFail(t *testing.T) { hbh := args[1].(*ethtypes.HexInteger) *hbh = *ethtypes.NewHexInteger64(testHighBlock) }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*string) = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(&rpcbackend.RPCError{Message: "pop"}). Run(func(args mock.Arguments) { close(retried) @@ -701,6 +729,13 @@ func TestStreamLoopChangeFilter(t *testing.T) { hbh := args[1].(*ethtypes.HexInteger) *hbh = *ethtypes.NewHexInteger64(testHighBlock) }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*string) = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { es := <-esChl @@ -750,6 +785,13 @@ func TestStreamLoopFilterReset(t *testing.T) { hbh := args[1].(*ethtypes.HexInteger) *hbh = *ethtypes.NewHexInteger64(testHighBlock) }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*string) = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { *args[1].(*string) = testLogsFilterID1 @@ -797,6 +839,13 @@ func TestStreamLoopEnrichFail(t *testing.T) { hbh := args[1].(*ethtypes.HexInteger) *hbh = *ethtypes.NewHexInteger64(testHighBlock) }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*string) = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { *args[1].(*string) = testLogsFilterID1 diff --git a/internal/ethereum/new_block_listener_test.go b/internal/ethereum/new_block_listener_test.go index 50d1b43..3155a44 100644 --- a/internal/ethereum/new_block_listener_test.go +++ b/internal/ethereum/new_block_listener_test.go @@ -39,6 +39,7 @@ func TestNewBlockListenerOK(t *testing.T) { hbh := args[1].(*ethtypes.HexInteger) *hbh = *ethtypes.NewHexInteger64(1000) }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).Return(nil).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) *hbh = testBlockFilterID1 diff --git a/internal/msgs/en_error_messages.go b/internal/msgs/en_error_messages.go index a495ff5..1b99587 100644 --- a/internal/msgs/en_error_messages.go +++ b/internal/msgs/en_error_messages.go @@ -89,4 +89,5 @@ var ( MsgInvalidChainTrackingMode = ffe("FF23069", "Invalid chain tracking mode '%s': must be 'light' or 'full'") MsgTransactionNotIncludedInChainHead = ffe("FF23070", "Transaction '%s' cannot be reconciled because chain head %d is before receipt block %s") MsgTransactionEstimateTooLargeForBlock = ffe("FF23071", "Gas estimate %s (scaled at %.2f from estimate %s) too large for the current block gas limit %s") + MsgMonitoredHeadLengthInvalid = ffe("FF23072", "Monitored head length must be greater than or equal to 1 value=%d") ) diff --git a/pkg/ethblocklistener/blocklistener.go b/pkg/ethblocklistener/blocklistener.go index f001e33..41c99e9 100644 --- a/pkg/ethblocklistener/blocklistener.go +++ b/pkg/ethblocklistener/blocklistener.go @@ -125,6 +125,7 @@ type blockListener struct { BlockListenerConfig // canonical chain + monitoredHeadLength uint64 canonicalChainLock sync.RWMutex // covers highestBlock and canonicalChain canonicalChain *list.List highestBlockSet bool @@ -168,6 +169,10 @@ func NewBlockListenerSupplyBackend(ctx context.Context, retry *retry.Retry, conf blockFetchConcurrencyThrottle: make(chan *blockReceiptRequest, conf.MaxAsyncBlockFetchConcurrency), BlockListenerConfig: *conf, } + if conf.MonitoredHeadLength <= 0 { + return nil, i18n.WrapError(ctx, err, msgs.MsgMonitoredHeadLengthInvalid, conf.MonitoredHeadLength) + } + bl.monitoredHeadLength = uint64(conf.MonitoredHeadLength) bl.blockCache, err = lru.New(conf.BlockCacheSize) if err != nil { return nil, i18n.WrapError(ctx, err, msgs.MsgCacheInitFail, "block") @@ -187,6 +192,35 @@ func (bl *blockListener) GetMonitoredHeadLength() int { return bl.BlockListenerConfig.MonitoredHeadLength } +// seedMonitoredHead fetches the single anchor block at highestBlock-MonitoredHeadLength+1. +// The returned block is used by the listen loop to seed the canonical chain on the first +// iteration via reconcileCanonicalChain, so that the chain is populated before the first +// filter poll and confirmations can be delivered as soon as they arrive. +func (bl *blockListener) seedMonitoredHead() *ethrpc.BlockInfoJSONRPC { + bl.canonicalChainLock.RLock() + highestBlockSet := bl.highestBlockSet + startBlock := uint64(0) + if bl.highestBlock >= bl.monitoredHeadLength { + startBlock = bl.highestBlock - bl.monitoredHeadLength + 1 + } + bl.canonicalChainLock.RUnlock() + + if !highestBlockSet { + return nil + } + + var bi *ethrpc.BlockInfoJSONRPC + if err := bl.retry.Do(bl.ctx, "seed monitored head", func(_ int) (retry bool, err error) { + bi, err = bl.GetBlockInfoByNumber(bl.ctx, startBlock, false, "", "") + return err != nil, err + }); err != nil || bi == nil { + log.L(bl.ctx).Warnf("Failed to seed monitored head at block %d: %v", startBlock, err) + return nil + } + log.L(bl.ctx).Infof("Seeded monitored head at block %d", startBlock) + return bi +} + // setting block filter status updates that new block filter has been created func (bl *blockListener) markStarted() { if !bl.isStarted { @@ -289,6 +323,14 @@ func (bl *blockListener) listenLoop() { log.L(bl.ctx).Warnf("Block listener exiting before establishing initial block height: %s", err) } + // Seed the canonical chain before starting the filter loop (not applicable in light mode). + // The seed block is reconciled on the first loop iteration instead of polling the filter, + // so the in-memory chain is pre-populated and confirmations can be delivered immediately. + var seedBi *ethrpc.BlockInfoJSONRPC + if bl.ChainTrackingMode != ffcapi.ChainTrackingModeLight { + seedBi = bl.seedMonitoredHead() + } + var filter string failCount := 0 gapPotential := true @@ -321,19 +363,27 @@ func (bl *blockListener) listenLoop() { bl.markStarted() } + // On the first iteration use the seed block (leaves blockHashes nil). + // On subsequent iterations poll the filter for new block hashes. var blockHashes []ethtypes.HexBytes0xPrefix - rpcErr := bl.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter) - if rpcErr != nil { - if etherrors.MapError(etherrors.FilterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound { - log.L(bl.ctx).Warnf("Block filter '%v' no longer valid. Recreating filter: %s", filter, rpcErr.Message) - filter = "" - gapPotential = true + var notifyPos *list.Element + if seedBi != nil { + notifyPos = bl.reconcileCanonicalChain(seedBi) + seedBi = nil + } else { + rpcErr := bl.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter) + if rpcErr != nil { + if etherrors.MapError(etherrors.FilterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound { + log.L(bl.ctx).Warnf("Block filter '%v' no longer valid. Recreating filter: %s", filter, rpcErr.Message) + filter = "" + gapPotential = true + } + log.L(bl.ctx).Errorf("Failed to query block filter changes: %s", rpcErr.Message) + failCount++ + continue } - log.L(bl.ctx).Errorf("Failed to query block filter changes: %s", rpcErr.Message) - failCount++ - continue + log.L(bl.ctx).Debugf("Block filter received new block hashes: %+v", blockHashes) } - log.L(bl.ctx).Debugf("Block filter received new block hashes: %+v", blockHashes) if bl.ChainTrackingMode == ffcapi.ChainTrackingModeLight { head, err := bl.refreshHighestBlockFromRPC() @@ -360,7 +410,6 @@ func (bl *blockListener) listenLoop() { } update := &ffcapi.BlockHashEvent{GapPotential: gapPotential, Created: fftypes.Now()} - var notifyPos *list.Element for _, h := range blockHashes { if len(h) != 32 { if !bl.HederaCompatibilityMode { diff --git a/pkg/ethblocklistener/blocklistener_test.go b/pkg/ethblocklistener/blocklistener_test.go index 536106c..05abb94 100644 --- a/pkg/ethblocklistener/blocklistener_test.go +++ b/pkg/ethblocklistener/blocklistener_test.go @@ -104,9 +104,131 @@ func (tl *testLatch) waitComplete() { <-tl.c } +// testBlockHashFor generates a deterministic 32-byte block hash for a given height. +// Optional mods create alternate-fork hashes at the same height. +func testBlockHashFor(height uint64, mods ...uint64) ethtypes.HexBytes0xPrefix { + seed := height + for _, mod := range mods { + seed = seed*31337 + mod + } + h := fmt.Sprintf("0x%016x%016x%016x%016x", 0xfeed000000000000+height, seed, ^seed, ^height) + return ethtypes.MustNewHexBytes0xPrefix(h) +} + +// mockInitialBlockHeight mocks one eth_blockNumber call returning height. +func mockInitialBlockHeight(mRPC *rpcbackendmocks.Backend, height uint64) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexIntegerU64(height) + }).Once() +} + +// mockSeedBlockNotFound mocks eth_getBlockByNumber at seedHeight returning nil (block not found). +func mockSeedBlockNotFound(mRPC *rpcbackendmocks.Backend, seedHeight uint64) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", hexNumber(seedHeight), false).Return(nil).Once() +} + +// mockSeedBlock mocks eth_getBlockByNumber at height returning a block with the given hash. +func mockSeedBlock(mRPC *rpcbackendmocks.Backend, height uint64, hash ethtypes.HexBytes0xPrefix) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", hexNumber(height), false).Return(nil).Run(func(args mock.Arguments) { + *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{ + BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ + Number: ethtypes.HexUint64(height), + Hash: hash, + }, + } + }) +} + +// mockNewBlockFilter mocks eth_newBlockFilter returning filterID. +func mockNewBlockFilter(mRPC *rpcbackendmocks.Backend, filterID string) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + *args[1].(*string) = filterID + }) +} + +// mockFilterChanges mocks eth_getFilterChanges for filterID, optionally waiting on latch, returning hashes. +func mockFilterChanges(mRPC *rpcbackendmocks.Backend, filterID string, latch *testLatch, hashes ...ethtypes.HexBytes0xPrefix) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", filterID).Return(nil).Run(func(args mock.Arguments) { + if latch != nil { + latch.waitComplete() + } + *args[1].(*[]ethtypes.HexBytes0xPrefix) = hashes + }) +} + +// mockFilterChangesEmpty mocks any eth_getFilterChanges returning empty, optionally running fns. +func mockFilterChangesEmpty(mRPC *rpcbackendmocks.Backend, fns ...func()) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + for _, fn := range fns { + fn() + } + }) +} + +// mockBlockByHash mocks eth_getBlockByHash for hash returning a block at height with parentHash. +func mockBlockByHash(mRPC *rpcbackendmocks.Backend, height uint64, hash, parentHash ethtypes.HexBytes0xPrefix) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", hash.String(), false).Return(nil).Run(func(args mock.Arguments) { + *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{ + BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ + Number: ethtypes.HexUint64(height), + Hash: hash, + ParentHash: parentHash, + }, + } + }) +} + +// mockBlockByHashNotFound mocks eth_getBlockByHash for hash returning nil (block not found). +func mockBlockByHashNotFound(mRPC *rpcbackendmocks.Backend, hash ethtypes.HexBytes0xPrefix) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", hash.String(), false).Return(nil) +} + +// mockBlockByHashFail mocks eth_getBlockByHash for hash returning an RPC error. +func mockBlockByHashFail(mRPC *rpcbackendmocks.Backend, hash ethtypes.HexBytes0xPrefix) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", hash.String(), false). + Return(&rpcbackend.RPCError{Message: "pop"}) +} + +// mockBlockByNumber mocks eth_getBlockByNumber at height. Pass nil hash to return nil (not found). +// When hash is non-nil, parentHash is derived as testBlockHashFor(height-1). +func mockBlockByNumber(mRPC *rpcbackendmocks.Backend, height uint64, hash *ethtypes.HexBytes0xPrefix) *mock.Call { + return mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", hexNumber(height), false).Return(nil).Run(func(args mock.Arguments) { + if hash == nil { + return + } + *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{ + BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ + Number: ethtypes.HexUint64(height), + Hash: *hash, + ParentHash: testBlockHashFor(height - 1), + }, + } + }) +} + +// mockBlockRangeWithHash mocks sequential eth_getBlockByNumber and eth_getBlockByHash for heights start..end +// using deterministic hashes from testBlockHashFor. +func mockBlockRangeWithHash(mRPC *rpcbackendmocks.Backend, start, end uint64, mods ...uint64) { + for height := start; height <= end; height++ { + hash := testBlockHashFor(height, mods...) + mockBlockByNumber(mRPC, height, &hash).Maybe() + mockBlockByHash(mRPC, height, hash, testBlockHashFor(height-1, mods...)).Maybe() + } +} + +func TestBlockListenerConstructorFailMonitoredHeadLength(t *testing.T) { + _, err := NewBlockListener(context.Background(), &retry.Retry{}, &BlockListenerConfig{ + BlockCacheSize: -1, + MonitoredHeadLength: -1, + }, &ffresty.Config{}, &wsclient.WSConfig{}) + require.Regexp(t, "FF23072", err) +} + func TestBlockListenerConstructorFailCacheConfig(t *testing.T) { _, err := NewBlockListener(context.Background(), &retry.Retry{}, &BlockListenerConfig{ - BlockCacheSize: -1, + BlockCacheSize: -1, + MonitoredHeadLength: 1, }, &ffresty.Config{}, &wsclient.WSConfig{}) require.Regexp(t, "FF23040", err) } @@ -117,32 +239,26 @@ func TestBlockListenerStartGettingHighestBlockRetry(t *testing.T) { mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber"). Return(&rpcbackend.RPCError{Message: "pop"}).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(12345) - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Maybe() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe() + mockInitialBlockHeight(mRPC, 12345) + mockSeedBlockNotFound(mRPC, 12345-(50-1)).Maybe() + mockNewBlockFilter(mRPC, testBlockFilterID1).Maybe() + mockFilterChangesEmpty(mRPC).Maybe() h, ok := bl.GetHighestBlock(bl.ctx) assert.Equal(t, uint64(12345), h) assert.True(t, ok) - done() // Stop immediately in this case, while we're in the polling interval + done() <-bl.listenLoopDone mRPC.AssertExpectations(t) - } func TestBlockListenerStartGettingHighestBlockFailBeforeStop(t *testing.T) { _, bl, mRPC, done := newTestBlockListener(t) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - filterID := args[1].(*string) - *filterID = testBlockFilterID1 - }).Maybe() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Maybe() + mockNewBlockFilter(mRPC, testBlockFilterID1).Maybe() + mockFilterChangesEmpty(mRPC).Maybe() done() // Stop before we start mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber"). @@ -155,66 +271,85 @@ func TestBlockListenerStartGettingHighestBlockFailBeforeStop(t *testing.T) { <-bl.listenLoopDone mRPC.AssertExpectations(t) - } -func TestBlockListenerOKSequential(t *testing.T) { +func TestBlockListenerSeedMonitoredHead_BlockFound(t *testing.T) { + block951Hash := testBlockHashFor(951) + + _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, _ context.CancelFunc) { + conf.MonitoredHeadLength = 50 // seed at block 1000-50+1 = 951 + mockSeedBlock(mRPC, 951, block951Hash).Once() + }) + defer done() - block1000Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + bl.canonicalChainLock.Lock() + bl.highestBlock = 1000 + bl.highestBlockSet = true + bl.canonicalChainLock.Unlock() + + bi := bl.seedMonitoredHead() + + require.NotNil(t, bi) + assert.Equal(t, uint64(951), bi.Number.Uint64()) + assert.Equal(t, block951Hash, bi.Hash) + mRPC.AssertExpectations(t) +} + +func TestBlockListenerSeedMonitoredHead_ReconcileAndDispatch(t *testing.T) { + block951Hash := testBlockHashFor(951) - startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - conf.MonitoredHeadLength = 2 // check wrapping + conf.MonitoredHeadLength = 50 // seed at block 1000-50+1 = 951 - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002Hash, - } - }).Once() + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlock(mRPC, 951, block951Hash).Once() + mockNewBlockFilter(mRPC, testBlockFilterID1).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil + cancelCtx() + }).Maybe() + }) + defer done() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001Hash, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002Hash, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { + // Register consumer directly to avoid AddConsumer's waitUntilStarted race. + consumerID := fftypes.NewUUID() + updates := make(chan *ffcapi.BlockHashEvent, 10) + bl.consumerMux.Lock() + bl.consumers[*consumerID] = &BlockUpdateConsumer{ID: consumerID, Ctx: context.Background(), Updates: updates} + bl.consumerMux.Unlock() + + bl.checkAndStartListenerLoop() + + ev := <-updates + assert.Equal(t, []string{block951Hash.String()}, ev.BlockHashes) + + bl.WaitClosed() + mRPC.AssertExpectations(t) +} + +func TestBlockListenerOKSequential(t *testing.T) { + + block1001Hash := testBlockHashFor(1001) + block1002Hash := testBlockHashFor(1002) + block1003Hash := testBlockHashFor(1003) + + startLatch := newTestLatch() + _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.BlockPollingInterval = shortDelay + conf.MonitoredHeadLength = 2 // seed at 1000-2+1 = 999 + + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 999) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1001Hash, block1002Hash).Once() + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003Hash).Once() + mockFilterChangesEmpty(mRPC) + + mockBlockByHash(mRPC, 1001, block1001Hash, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1002, block1002Hash, block1001Hash) + // block1003 has GasLimit set — inline to capture the extra field + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", block1003Hash.String(), false).Return(nil).Run(func(args mock.Arguments) { *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ Number: 1003, Hash: block1003Hash, @@ -234,14 +369,9 @@ func TestBlockListenerOKSequential(t *testing.T) { startLatch.complete() bu := <-updates - assert.Equal(t, []string{ - block1001Hash.String(), - block1002Hash.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1001Hash.String(), block1002Hash.String()}, bu.BlockHashes) bu = <-updates - assert.Equal(t, []string{ - block1003Hash.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1003Hash.String()}, bu.BlockHashes) assert.False(t, bu.GapPotential) bl.RemoveConsumer(context.Background(), consumerID) @@ -249,13 +379,9 @@ func TestBlockListenerOKSequential(t *testing.T) { <-bl.listenLoopDone assert.Equal(t, uint64(1003), bl.highestBlock) - mRPC.AssertExpectations(t) - assert.Len(t, bl.SnapshotMonitoredHeadChain(), bl.MonitoredHeadLength) - require.Equal(t, int64(10000), bl.GetBlockGasLimit().Int64()) - } func TestBlockListenerWSShoulderTap(t *testing.T) { @@ -303,7 +429,6 @@ func TestBlockListenerWSShoulderTap(t *testing.T) { } } else { rpcRes.Result = fftypes.JSONAnyPtr(fmt.Sprintf(`"%s"`, fftypes.NewUUID())) - // Spam with notifications go func() { defer close(pingerDone) for !complete { @@ -317,10 +442,11 @@ func TestBlockListenerWSShoulderTap(t *testing.T) { } }() } + case "eth_getBlockByNumber": + rpcRes.Result = fftypes.JSONAnyPtr("null") // seed block not found case "eth_newBlockFilter": rpcRes.Result = fftypes.JSONAnyPtr(fmt.Sprintf(`"%s"`, fftypes.NewUUID())) case "eth_getFilterChanges": - // ok we can close - the shoulder tap worked complete = true <-pingerDone go done() @@ -338,7 +464,6 @@ func TestBlockListenerWSShoulderTap(t *testing.T) { bl.checkAndStartListenerLoop() - // Wait until we close because it worked <-bl.listenLoopDone assert.True(t, failedConnectOnce) assert.True(t, failedSubOnce) @@ -349,74 +474,29 @@ func TestBlockListenerWSShoulderTap(t *testing.T) { func TestBlockListenerOKDuplicates(t *testing.T) { - block1000Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1001Hash := testBlockHashFor(1001) + block1002Hash := testBlockHashFor(1002) + block1003Hash := testBlockHashFor(1003) startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1001Hash, block1002Hash).Once() + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003Hash).Once() + // Third call returns duplicates and signals shutdown mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1002Hash, - block1003Hash, - } - cancelCtx() // once we've detected these duplicates, we can close + *args[1].(*[]ethtypes.HexBytes0xPrefix) = []ethtypes.HexBytes0xPrefix{block1002Hash, block1003Hash} + cancelCtx() }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + mockFilterChangesEmpty(mRPC) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001Hash, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002Hash, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003Hash, - ParentHash: block1002Hash, - }} - }) + mockBlockByHash(mRPC, 1001, block1001Hash, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1002, block1002Hash, block1001Hash) + mockBlockByHash(mRPC, 1003, block1003Hash, block1002Hash) }) defer done() @@ -429,94 +509,38 @@ func TestBlockListenerOKDuplicates(t *testing.T) { startLatch.complete() bu := <-updates - require.Equal(t, []string{ - block1001Hash.String(), - block1002Hash.String(), - }, bu.BlockHashes) + require.Equal(t, []string{block1001Hash.String(), block1002Hash.String()}, bu.BlockHashes) bu = <-updates - require.Equal(t, []string{ - block1003Hash.String(), - }, bu.BlockHashes) + require.Equal(t, []string{block1003Hash.String()}, bu.BlockHashes) require.False(t, bu.GapPotential) <-bl.listenLoopDone require.Equal(t, uint64(1003), bl.highestBlock) - mRPC.AssertExpectations(t) - } func TestBlockListenerReorgKeepLatestHeadInSameBatch(t *testing.T) { - block1000Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) // parent - block1001HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1001HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1001HashA := testBlockHashFor(1001, 1111) + block1001HashB := testBlockHashFor(1001) + block1002Hash := testBlockHashFor(1002) + block1003Hash := testBlockHashFor(1003) startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001HashA, - block1001HashB, - block1002Hash, - block1003Hash, - } - }).Once() - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1001HashA, block1001HashB, block1002Hash, block1003Hash).Once() + mockFilterChangesEmpty(mRPC) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001HashA.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001HashA, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001HashB.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001HashB, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002Hash, - ParentHash: block1001HashB, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003Hash, - ParentHash: block1002Hash, - }} - }) + mockBlockByHash(mRPC, 1001, block1001HashA, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1001, block1001HashB, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1002, block1002Hash, block1001HashB) + mockBlockByHash(mRPC, 1003, block1003Hash, block1002Hash) }) updates := make(chan *ffcapi.BlockHashEvent) @@ -528,114 +552,43 @@ func TestBlockListenerReorgKeepLatestHeadInSameBatch(t *testing.T) { startLatch.complete() bu := <-updates - assert.Equal(t, []string{ - block1001HashB.String(), - block1002Hash.String(), - block1003Hash.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1001HashB.String(), block1002Hash.String(), block1003Hash.String()}, bu.BlockHashes) done() <-bl.listenLoopDone assert.Equal(t, uint64(1003), bl.highestBlock) - mRPC.AssertExpectations(t) } func TestBlockListenerReorgKeepLatestHeadInSameBatchValidHashFirst(t *testing.T) { - block1000Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) // parent - block1001HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1001HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + // "Valid" (canonical) hash arrives first in the filter batch; the stale hash arrives after, + // forcing a rebuild to confirm the canonical chain. + block1001HashB := testBlockHashFor(1001) // canonical — arrives first + block1001HashA := testBlockHashFor(1001, 1111) // stale fork — arrives second + block1002Hash := testBlockHashFor(1002) + block1003Hash := testBlockHashFor(1003) startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001HashB, // valid hash is in the front of the array, so will need to re-build the chain - block1001HashA, - block1002Hash, - block1003Hash, - } - }) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1001) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001HashB, - ParentHash: block1000Hash, - }} - }) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1002) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002Hash, - ParentHash: block1001HashB, - }} - }) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1003) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003Hash, - ParentHash: block1002Hash, - }} - }) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1004) // not found - }), false).Return(nil) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001HashA.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001HashA, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001HashB.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001HashB, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002Hash, - ParentHash: block1001HashB, - }} - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1001HashB, block1001HashA, block1002Hash, block1003Hash) + mockFilterChangesEmpty(mRPC) + + mockBlockByHash(mRPC, 1001, block1001HashB, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1001, block1001HashA, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1002, block1002Hash, block1001HashB) + + // Rebuild fetches 1001–1003 by number then hits nil at 1004 + mockBlockByNumber(mRPC, 1001, &block1001HashB) + mockBlockByNumber(mRPC, 1002, &block1002Hash) + mockBlockByNumber(mRPC, 1003, &block1003Hash) + mockBlockByNumber(mRPC, 1004, nil) }) updates := make(chan *ffcapi.BlockHashEvent) @@ -647,90 +600,36 @@ func TestBlockListenerReorgKeepLatestHeadInSameBatchValidHashFirst(t *testing.T) startLatch.complete() bu := <-updates - assert.Equal(t, []string{ - block1001HashB.String(), - block1002Hash.String(), - block1003Hash.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1001HashB.String(), block1002Hash.String(), block1003Hash.String()}, bu.BlockHashes) done() <-bl.listenLoopDone assert.Equal(t, uint64(1003), bl.highestBlock) - mRPC.AssertExpectations(t) } func TestBlockListenerReorgKeepLatestMiddleInSameBatch(t *testing.T) { - block1000Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) // parent - block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1001Hash := testBlockHashFor(1001) + block1002HashA := testBlockHashFor(1002, 1111) + block1002HashB := testBlockHashFor(1002) + block1003Hash := testBlockHashFor(1003) startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002HashA, - block1002HashB, - block1003Hash, - } - }).Once() - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1001Hash, block1002HashA, block1002HashB, block1003Hash).Once() + mockFilterChangesEmpty(mRPC) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001Hash, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002HashA.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002HashA, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002HashB.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002HashB, - ParentHash: block1001Hash, - }} - }) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003Hash, - ParentHash: block1002HashB, - }} - }) + mockBlockByHash(mRPC, 1001, block1001Hash, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1002, block1002HashA, block1001Hash) + mockBlockByHash(mRPC, 1002, block1002HashB, block1001Hash) + mockBlockByHash(mRPC, 1003, block1003Hash, block1002HashB) }) updates := make(chan *ffcapi.BlockHashEvent) @@ -742,92 +641,36 @@ func TestBlockListenerReorgKeepLatestMiddleInSameBatch(t *testing.T) { startLatch.complete() bu := <-updates - assert.Equal(t, []string{ - block1001Hash.String(), - block1002HashB.String(), - block1003Hash.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1001Hash.String(), block1002HashB.String(), block1003Hash.String()}, bu.BlockHashes) done() <-bl.listenLoopDone assert.Equal(t, uint64(1003), bl.highestBlock) - mRPC.AssertExpectations(t) } func TestBlockListenerReorgKeepLatestTailInSameBatch(t *testing.T) { - block1000Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) // parent - block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1001Hash := testBlockHashFor(1001) + block1002Hash := testBlockHashFor(1002) + block1003HashA := testBlockHashFor(1003, 1111) + block1003HashB := testBlockHashFor(1003) startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { - conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002Hash, - block1003HashA, - block1003HashB, - } - }).Once() - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001Hash, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002Hash, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003HashA.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003HashA, - ParentHash: block1002Hash, - }} - }) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003HashB.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003HashB, - ParentHash: block1002Hash, - }} - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1001Hash, block1002Hash, block1003HashA, block1003HashB).Once() + mockFilterChangesEmpty(mRPC) + mockBlockByHash(mRPC, 1001, block1001Hash, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1002, block1002Hash, block1001Hash) + mockBlockByHash(mRPC, 1003, block1003HashA, block1002Hash) + mockBlockByHash(mRPC, 1003, block1003HashB, block1002Hash) }) updates := make(chan *ffcapi.BlockHashEvent) @@ -839,17 +682,12 @@ func TestBlockListenerReorgKeepLatestTailInSameBatch(t *testing.T) { startLatch.complete() bu := <-updates - assert.Equal(t, []string{ - block1001Hash.String(), - block1002Hash.String(), - block1003HashB.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1001Hash.String(), block1002Hash.String(), block1003HashB.String()}, bu.BlockHashes) done() <-bl.listenLoopDone assert.Equal(t, uint64(1003), bl.highestBlock) - mRPC.AssertExpectations(t) } @@ -857,38 +695,20 @@ func TestBlockListenerReorgKeepLatestTailInSameBatch(t *testing.T) { // diverges from the node but an older prefix still matches. The suffix must be removed without // dropping the last matching block (regression for incorrect removal variable in the trim loop). func TestTrimToLastValidBlockRemovesInvalidTail(t *testing.T) { - h98 := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - h99 := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - h100 := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - h101Stale := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - h102Stale := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - h101 := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - h102 := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + h98 := testBlockHashFor(98) + h99 := testBlockHashFor(99) + h100 := testBlockHashFor(100) + h101Stale := testBlockHashFor(101, 999) + h102Stale := testBlockHashFor(102, 999) + h101 := testBlockHashFor(101) + h102 := testBlockHashFor(102) _, bl, mRPC, done := newTestBlockListener(t) defer done() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", hexNumber(102), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 102, - Hash: h102, - ParentHash: h101, - }} - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", hexNumber(101), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 101, - Hash: h101, - ParentHash: h100, - }} - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", hexNumber(100), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 100, - Hash: h100, - ParentHash: h99, - }} - }).Once() + mockBlockByNumber(mRPC, 102, &h102).Once() + mockBlockByNumber(mRPC, 101, &h101).Once() + mockBlockByNumber(mRPC, 100, &h100).Once() b99 := ðrpc.BlockInfoJSONRPC{Number: ethtypes.HexUint64(99), Hash: h99, ParentHash: h98} b100 := ðrpc.BlockInfoJSONRPC{Number: ethtypes.HexUint64(100), Hash: h100, ParentHash: h99} @@ -912,90 +732,37 @@ func TestTrimToLastValidBlockRemovesInvalidTail(t *testing.T) { front := bl.canonicalChain.Front().Value.(*ethrpc.BlockInfoJSONRPC) require.Equal(t, uint64(99), front.Number.Uint64()) require.True(t, front.Hash.Equals(h99)) + tail := bl.canonicalChain.Back().Value.(*ethrpc.BlockInfoJSONRPC) require.Equal(t, uint64(100), tail.Number.Uint64()) require.True(t, tail.Hash.Equals(h100)) mRPC.AssertExpectations(t) } + func TestBlockListenerReorgReplaceTail(t *testing.T) { - block1000Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1001Hash := testBlockHashFor(1001) + block1002Hash := testBlockHashFor(1002) + block1003HashA := testBlockHashFor(1003, 1111) + block1003HashB := testBlockHashFor(1003) startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003HashA, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003HashB, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001Hash, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002Hash, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003HashA.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003HashA, - ParentHash: block1002Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003HashB.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003HashB, - ParentHash: block1002Hash, - }} - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1001Hash, block1002Hash).Once() + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003HashA).Once() + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003HashB).Once() + mockFilterChangesEmpty(mRPC) + + mockBlockByHash(mRPC, 1001, block1001Hash, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1002, block1002Hash, block1001Hash) + mockBlockByHash(mRPC, 1003, block1003HashA, block1002Hash) + mockBlockByHash(mRPC, 1003, block1003HashB, block1002Hash) }) updates := make(chan *ffcapi.BlockHashEvent) @@ -1007,28 +774,19 @@ func TestBlockListenerReorgReplaceTail(t *testing.T) { startLatch.complete() bu := <-updates - assert.Equal(t, []string{ - block1001Hash.String(), - block1002Hash.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1001Hash.String(), block1002Hash.String()}, bu.BlockHashes) bu = <-updates - assert.Equal(t, []string{ - block1003HashA.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1003HashA.String()}, bu.BlockHashes) assert.False(t, bu.GapPotential) bu = <-updates - assert.Equal(t, []string{ - block1003HashB.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1003HashB.String()}, bu.BlockHashes) assert.False(t, bu.GapPotential) done() <-bl.listenLoopDone assert.Equal(t, uint64(1003), bl.highestBlock) - mRPC.AssertExpectations(t) - } func TestBlockListenerGap(t *testing.T) { @@ -1038,117 +796,28 @@ func TestBlockListenerGap(t *testing.T) { // needs to cope with this. This means winding back when we find a gap and re-building our canonical // view of the chain. - block1000Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1004Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1005Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1001Hash := testBlockHashFor(1001) + block1002HashA := testBlockHashFor(1002, 1111) + block1004Hash := testBlockHashFor(1004) startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002HashA, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1004Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1001Hash, block1002HashA).Once() + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1004Hash).Once() + mockFilterChangesEmpty(mRPC) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001Hash, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002HashA.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002HashA, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1004Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1004, - Hash: block1004Hash, - ParentHash: block1003Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1001) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001Hash, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1002) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002HashB, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1003) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003Hash, - ParentHash: block1002HashB, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1004) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1004, - Hash: block1004Hash, - ParentHash: block1003Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1005) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1005, // this one pops in while we're rebuilding - Hash: block1005Hash, - ParentHash: block1004Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1006) - }), false).Return(nil) + mockBlockByHash(mRPC, 1001, block1001Hash, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1002, block1002HashA, block1001Hash) + mockBlockByHash(mRPC, 1004, block1004Hash, testBlockHashFor(1003)) + + // Rebuild: 1001 same hash, 1002 canonical (different from A), 1003–1005 fill in, 1006 not found + mockBlockRangeWithHash(mRPC, 1001, 1005) + mockBlockByNumber(mRPC, 1006, nil) }) updates := make(chan *ffcapi.BlockHashEvent) @@ -1160,16 +829,14 @@ func TestBlockListenerGap(t *testing.T) { startLatch.complete() bu := <-updates - assert.Equal(t, []string{ - block1001Hash.String(), - block1002HashA.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1001Hash.String(), block1002HashA.String()}, bu.BlockHashes) bu = <-updates + // Rebuild yields canonical 1002, fills 1003, 1004, 1005 assert.Equal(t, []string{ - block1002HashB.String(), - block1003Hash.String(), // The gap we filled in - block1004Hash.String(), - block1005Hash.String(), // Appeared while we were rebuilding our chain + testBlockHashFor(1002).String(), + testBlockHashFor(1003).String(), + testBlockHashFor(1004).String(), + testBlockHashFor(1005).String(), }, bu.BlockHashes) assert.False(t, bu.GapPotential) @@ -1177,92 +844,35 @@ func TestBlockListenerGap(t *testing.T) { <-bl.listenLoopDone assert.Equal(t, uint64(1005), bl.highestBlock) - mRPC.AssertExpectations(t) - } func TestBlockListenerReorgWhileRebuilding(t *testing.T) { - block1000Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1001Hash := testBlockHashFor(1001) + block1002HashA := testBlockHashFor(1002, 1111) + block1003HashA := testBlockHashFor(1003, 1111) startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003HashA, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1001Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001Hash, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003HashA.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003HashA, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1001) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1001, - Hash: block1001Hash, - ParentHash: block1000Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1002) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002HashA, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1003) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003HashB, // this is a re-org'd block, so we stop here as if we've found the end of the chain - ParentHash: block1002HashB, - }} - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1001Hash).Once() + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003HashA).Once() + mockFilterChangesEmpty(mRPC) + + mockBlockByHash(mRPC, 1001, block1001Hash, testBlockHashFor(1000)) + mockBlockByHash(mRPC, 1003, block1003HashA, block1001Hash) + + // Rebuild: 1001 confirmed, 1002 fork A, 1003 canonical (different parent stops rebuild at 1002A) + mockBlockByNumber(mRPC, 1001, &block1001Hash) + mockBlockByNumber(mRPC, 1002, &block1002HashA) + // Block 1003 from node has canonical parent (testBlockHashFor(1002)), mismatching 1002HashA + block1003Canon := testBlockHashFor(1003) + mockBlockByNumber(mRPC, 1003, &block1003Canon) }) updates := make(chan *ffcapi.BlockHashEvent) @@ -1274,107 +884,43 @@ func TestBlockListenerReorgWhileRebuilding(t *testing.T) { startLatch.complete() bu := <-updates - assert.Equal(t, []string{ - block1001Hash.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1001Hash.String()}, bu.BlockHashes) bu = <-updates - assert.Equal(t, []string{ - block1002HashA.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1002HashA.String()}, bu.BlockHashes) done() <-bl.listenLoopDone assert.Equal(t, uint64(1003), bl.highestBlock) - mRPC.AssertExpectations(t) - } func TestBlockListenerReorgReplaceWholeCanonicalChain(t *testing.T) { - block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003HashA := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1002HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003HashB := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1002HashA := testBlockHashFor(1002, 1111) + block1003HashA := testBlockHashFor(1003, 1111) + block1002HashB := testBlockHashFor(1002) + block1003HashB := testBlockHashFor(1003) startLatch := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - startLatch.waitComplete() - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1002HashA, - block1003HashA, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003HashB, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1002HashA.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002HashA, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003HashA.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003HashA, - ParentHash: block1002HashA, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003HashB.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003HashB, - ParentHash: block1002HashB, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1002) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1002, - Hash: block1002HashB, - ParentHash: block1001Hash, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1003) - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003HashB, - ParentHash: block1002HashB, - }} - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.MatchedBy(func(bn string) bool { - return bn == hexNumber(1004) // not found - }), false).Return(nil) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, startLatch, block1002HashA, block1003HashA).Once() + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003HashB).Once() + mockFilterChangesEmpty(mRPC) + + mockBlockByHash(mRPC, 1002, block1002HashA, testBlockHashFor(1001)) + mockBlockByHash(mRPC, 1003, block1003HashA, block1002HashA) + mockBlockByHash(mRPC, 1003, block1003HashB, block1002HashB) + + // Rebuild replaces entire chain: 1002B, 1003B, then nil at 1004 + mockBlockByNumber(mRPC, 1002, &block1002HashB) + mockBlockByNumber(mRPC, 1003, &block1003HashB) + mockBlockByNumber(mRPC, 1004, nil) }) updates := make(chan *ffcapi.BlockHashEvent) @@ -1386,62 +932,33 @@ func TestBlockListenerReorgReplaceWholeCanonicalChain(t *testing.T) { startLatch.complete() bu := <-updates - assert.Equal(t, []string{ - block1002HashA.String(), - block1003HashA.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1002HashA.String(), block1003HashA.String()}, bu.BlockHashes) bu = <-updates - assert.Equal(t, []string{ - block1002HashB.String(), - block1003HashB.String(), - }, bu.BlockHashes) + assert.Equal(t, []string{block1002HashB.String(), block1003HashB.String()}, bu.BlockHashes) assert.False(t, bu.GapPotential) done() <-bl.listenLoopDone assert.Equal(t, uint64(1003), bl.highestBlock) - mRPC.AssertExpectations(t) - } func TestBlockListenerClosed(t *testing.T) { - block1002Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1003Hash := testBlockHashFor(1003) waitCalled := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - waitCalled.complete() // Close after we've processed the log - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003Hash).Once() + mockFilterChangesEmpty(mRPC, waitCalled.complete) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003Hash.String() - }), false).Return(nil).Run(func(args mock.Arguments) { - *args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = ðrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{ - Number: 1003, - Hash: block1003Hash, - ParentHash: block1002Hash, - }} - }) + mockBlockByHash(mRPC, 1003, block1003Hash, testBlockHashFor(1002)) }) go func() { waitCalled.waitComplete() @@ -1459,37 +976,23 @@ func TestBlockListenerClosed(t *testing.T) { bl.WaitClosed() mRPC.AssertExpectations(t) - } func TestBlockListenerBlockNotFound(t *testing.T) { + block1003Hash := testBlockHashFor(1003) + waitCalled := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - waitCalled.complete() // Close after we've processed the log - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003Hash).Once() + mockFilterChangesEmpty(mRPC, waitCalled.complete) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003Hash.String() - }), false).Return(nil) + mockBlockByHashNotFound(mRPC, block1003Hash) }) go func() { waitCalled.waitComplete() @@ -1497,41 +1000,25 @@ func TestBlockListenerBlockNotFound(t *testing.T) { }() bl.checkAndStartListenerLoop() - bl.WaitClosed() - mRPC.AssertExpectations(t) - } func TestBlockListenerBlockHashFailed(t *testing.T) { + block1003Hash := testBlockHashFor(1003) + waitCalled := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay - block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - waitCalled.complete() // Close after we've processed the log - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003Hash).Once() + mockFilterChangesEmpty(mRPC, waitCalled.complete) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == block1003Hash.String() - }), false).Return(&rpcbackend.RPCError{Message: "pop"}) + mockBlockByHashFail(mRPC, block1003Hash) }) go func() { waitCalled.waitComplete() @@ -1539,39 +1026,24 @@ func TestBlockListenerBlockHashFailed(t *testing.T) { }() bl.checkAndStartListenerLoop() - bl.WaitClosed() - mRPC.AssertExpectations(t) - } func TestBlockListenerProcessNonStandardHashRejectedWhenNotInHederaCompatibilityMode(t *testing.T) { + block1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef177df3b87beed681b1557e8ba7c3ecbd7e4db83d87b66c1e86aa484937ab93f1fae0eb6d4b24ca30aee13f29c83cc9") + waitCalled := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay conf.HederaCompatibilityMode = false - block1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef177df3b87beed681b1557e8ba7c3ecbd7e4db83d87b66c1e86aa484937ab93f1fae0eb6d4b24ca30aee13f29c83cc9") - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - waitCalled.complete() // Close after we've processed the log - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003Hash).Once() + mockFilterChangesEmpty(mRPC, waitCalled.complete) }) go func() { waitCalled.waitComplete() @@ -1579,39 +1051,24 @@ func TestBlockListenerProcessNonStandardHashRejectedWhenNotInHederaCompatibility }() bl.checkAndStartListenerLoop() - bl.WaitClosed() - mRPC.AssertExpectations(t) - } func TestBlockListenerProcessNonStandardHashRejectedWhenWrongSizeForHedera(t *testing.T) { + block1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef") + waitCalled := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay conf.HederaCompatibilityMode = true - block1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef") - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - waitCalled.complete() // Close after we've processed the log - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003Hash).Once() + mockFilterChangesEmpty(mRPC, waitCalled.complete) }) go func() { waitCalled.waitComplete() @@ -1619,44 +1076,27 @@ func TestBlockListenerProcessNonStandardHashRejectedWhenWrongSizeForHedera(t *te }() bl.checkAndStartListenerLoop() - bl.WaitClosed() - mRPC.AssertExpectations(t) - } func TestBlockListenerProcessNonStandardHashAcceptedWhenInHederaCompatbilityMode(t *testing.T) { + block1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef177df3b87beed681b1557e8ba7c3ecbd7e4db83d87b66c1e86aa484937ab93f1fae0eb6d4b24ca30aee13f29c83cc9") + truncatedBlock1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef177df3b87beed681b1557e8ba7c3ecbd7e4db83d87b66c1e86aa484937ab93") + waitCalled := newTestLatch() _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { conf.BlockPollingInterval = shortDelay conf.HederaCompatibilityMode = true - block1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef177df3b87beed681b1557e8ba7c3ecbd7e4db83d87b66c1e86aa484937ab93f1fae0eb6d4b24ca30aee13f29c83cc9") - truncatedBlock1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef177df3b87beed681b1557e8ba7c3ecbd7e4db83d87b66c1e86aa484937ab93") + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChanges(mRPC, testBlockFilterID1, nil, block1003Hash).Once() + mockFilterChangesEmpty(mRPC, waitCalled.complete) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - waitCalled.complete() // Close after we've processed the log - }) - - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { - return bh == truncatedBlock1003Hash.String() - }), false).Return(&rpcbackend.RPCError{Message: "pop"}) + mockBlockByHashFail(mRPC, truncatedBlock1003Hash) }) go func() { waitCalled.waitComplete() @@ -1664,11 +1104,8 @@ func TestBlockListenerProcessNonStandardHashAcceptedWhenInHederaCompatbilityMode }() bl.checkAndStartListenerLoop() - bl.WaitClosed() - mRPC.AssertExpectations(t) - } func TestBlockListenerReestablishBlockFilter(t *testing.T) { @@ -1676,64 +1113,43 @@ func TestBlockListenerReestablishBlockFilter(t *testing.T) { _, bl, mRPC, done := newTestBlockListener(t) bl.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID2 - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(&rpcbackend.RPCError{Message: "filter not found"}).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - go done() // Close after we've processed the log - }) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1).Once() + mockNewBlockFilter(mRPC, testBlockFilterID2).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1). + Return(&rpcbackend.RPCError{Message: "filter not found"}).Once() + mockFilterChangesEmpty(mRPC, func() { go done() }) bl.checkAndStartListenerLoop() - bl.WaitClosed() - mRPC.AssertExpectations(t) - } func TestBlockListenerReestablishBlockFilterFail(t *testing.T) { _, bl, mRPC, done := newTestBlockListener(t) bl.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(&rpcbackend.RPCError{Message: "pop"}).Run(func(args mock.Arguments) { + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter"). + Return(&rpcbackend.RPCError{Message: "pop"}).Run(func(args mock.Arguments) { go done() }) bl.checkAndStartListenerLoop() - bl.WaitClosed() - mRPC.AssertExpectations(t) - } func TestBlockListenerWaitUntilStartedOnlyReturnsAfterEstablishingBlockFilter(t *testing.T) { _, bl, mRPC, done := newTestBlockListener(t) bl.BlockPollingInterval = shortDelay - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*ethtypes.HexInteger) - *hbh = *ethtypes.NewHexIntegerU64(1000) - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*string) - *hbh = testBlockFilterID1 - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + mockInitialBlockHeight(mRPC, 1000) + mockSeedBlockNotFound(mRPC, 951) + mockNewBlockFilter(mRPC, testBlockFilterID1) + mockFilterChangesEmpty(mRPC) assert.False(t, bl.isStarted) bl.checkAndStartListenerLoop() @@ -1746,7 +1162,6 @@ func TestBlockListenerWaitUntilStartedOnlyReturnsAfterEstablishingBlockFilter(t done() bl.WaitClosed() - mRPC.AssertExpectations(t) } @@ -1754,9 +1169,6 @@ func TestBlockListenerWaitUntilStartedOnlyReturnsAfterEstablishingBlockFilter(t // eth_blockNumber refresh updates currentChainHead and dispatches BlockHashEvent with HeadBlockNumber; // when the RPC head is unchanged, no event is sent. func TestBlockListenerHeadBlockNumber_DispatchesAndSkipsDuplicateHead(t *testing.T) { - // Block the first getFilterChanges until after AddConsumer registers; otherwise markStarted() - // unblocks waitUntilStarted before the first head refresh+dispatch, so the first event can be - // sent with zero consumers. startLatch := newTestLatch() var bnCall int _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, _ context.CancelFunc) { @@ -1765,7 +1177,6 @@ func TestBlockListenerHeadBlockNumber_DispatchesAndSkipsDuplicateHead(t *testing mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { bnCall++ - hbh := args[1].(*ethtypes.HexInteger) var v uint64 switch bnCall { case 1: @@ -1777,14 +1188,12 @@ func TestBlockListenerHeadBlockNumber_DispatchesAndSkipsDuplicateHead(t *testing case 4: v = 1001 // head advanced → dispatch default: - v = 1001 // stable after cancel + v = 1001 } - *hbh = *ethtypes.NewHexIntegerU64(v) + *args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexIntegerU64(v) }).Maybe() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { - *args[1].(*string) = testBlockFilterID1 - }).Once() + mockNewBlockFilter(mRPC, testBlockFilterID1).Once() var getFilterCalls int mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { @@ -1792,8 +1201,7 @@ func TestBlockListenerHeadBlockNumber_DispatchesAndSkipsDuplicateHead(t *testing if getFilterCalls == 1 { startLatch.waitComplete() } - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = nil + *args[1].(*[]ethtypes.HexBytes0xPrefix) = nil }).Maybe() }) defer done() @@ -1823,6 +1231,25 @@ func TestBlockListenerHeadBlockNumber_DispatchesAndSkipsDuplicateHead(t *testing mRPC.AssertExpectations(t) } +func TestBlockListenerLightModeRefreshChainHeadFailure(t *testing.T) { + _, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.BlockPollingInterval = shortDelay + conf.ChainTrackingMode = ffcapi.ChainTrackingModeLight + + mockInitialBlockHeight(mRPC, 1000) + mockNewBlockFilter(mRPC, testBlockFilterID1).Once() + mockFilterChanges(mRPC, testBlockFilterID1, nil).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(&rpcbackend.RPCError{Message: "pop"}).Run(func(args mock.Arguments) { + cancelCtx() + }).Once() + }) + defer done() + + bl.checkAndStartListenerLoop() + bl.WaitClosed() + mRPC.AssertExpectations(t) +} + func TestBlockListenerDispatchStopped(t *testing.T) { _, bl, _, done := newTestBlockListener(t) done() @@ -1841,7 +1268,6 @@ func TestBlockListenerRebuildCanonicalChainEmpty(t *testing.T) { res := bl.rebuildCanonicalChain() assert.Nil(t, res) - } func TestBlockListenerRebuildCanonicalFailTerminate(t *testing.T) { @@ -1849,8 +1275,8 @@ func TestBlockListenerRebuildCanonicalFailTerminate(t *testing.T) { _, bl, mRPC, done := newTestBlockListener(t) bl.canonicalChain.PushBack(ðrpc.BlockInfoJSONRPC{ Number: ethtypes.HexUint64(1000), - Hash: ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()), - ParentHash: ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()), + Hash: testBlockHashFor(1000), + ParentHash: testBlockHashFor(999), }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByNumber", mock.Anything, false).