Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func testEventStream(t *testing.T, listeners ...*ffcapi.EventListenerAddRequest)
func testEventStreamExistingConnector(t *testing.T, ctx context.Context, done func(), c *ethConnector, mRPC *rpcbackendmocks.Backend, listeners ...*ffcapi.EventListenerAddRequest) (*eventStream, chan *ffcapi.ListenerEvent, *rpcbackendmocks.Backend, func()) {
events := make(chan *ffcapi.ListenerEvent)
esID := fftypes.NewUUID()
c.chainID = "12345" // set chainID before streamLoop starts, so enrich does not call net_version
_, _, err := c.EventStreamStart(ctx, &ffcapi.EventStreamStartRequest{
ID: esID,
StreamContext: ctx,
Expand All @@ -52,7 +53,6 @@ func testEventStreamExistingConnector(t *testing.T, ctx context.Context, done fu
es := c.eventStreams[*esID]
es.c.eventFilterPollingInterval = 1 * time.Millisecond
es.c.retry.MaximumDelay = 1 * time.Microsecond
c.chainID = "12345"
assert.NotNil(t, es)

es.preStartProcessing()
Expand Down
38 changes: 32 additions & 6 deletions mocks/ethblocklistenermocks/block_listener.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 2 additions & 7 deletions mocks/fftmmocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions mocks/rpcbackendmocks/subscription.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 55 additions & 24 deletions pkg/ethblocklistener/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type BlockListener interface {
AddConsumer(ctx context.Context, c *BlockUpdateConsumer)
RemoveConsumer(ctx context.Context, id *fftypes.UUID)
GetHighestBlock(ctx context.Context) (uint64, bool)
GetHighestBlockInfo(ctx context.Context) (*ethrpc.BlockInfoJSONRPC, bool)
GetBlockGasLimit() *ethtypes.HexInteger // nil if unknown
GetBlockInfoByNumber(ctx context.Context, blockNumber uint64, allowCache bool, expectedParentHashStr string, expectedBlockHashStr string) (*ethrpc.BlockInfoJSONRPC, error)
GetBlockInfoByHash(ctx context.Context, hash0xString string) (*ethrpc.BlockInfoJSONRPC, error)
Expand Down Expand Up @@ -125,12 +126,12 @@ type blockListener struct {
BlockListenerConfig

// canonical chain
monitoredHeadLength uint64
canonicalChainLock sync.RWMutex // covers highestBlock and canonicalChain
canonicalChain *list.List
highestBlockSet bool
highestBlock uint64
highestBlockGasLimit *ethtypes.HexInteger
monitoredHeadLength uint64
canonicalChainLock sync.RWMutex // covers highestBlock and canonicalChain
canonicalChain *list.List
highestBlockSet bool
highestBlock uint64
headBlockInfo *ethrpc.BlockInfoJSONRPC // full info for the current head block, when seen

// headBlockNumber mode: last head value sent on the block listener channel (only written from listenLoop)
currentChainHead uint64
Expand Down Expand Up @@ -475,7 +476,7 @@ func (bl *blockListener) reconcileCanonicalChain(bi *ethrpc.BlockInfoJSONRPC) *l
bl.canonicalChainLock.Lock()
defer bl.canonicalChainLock.Unlock()

bl.checkAndSetHighestBlock(bi.Number.Uint64(), bi.GasLimit)
bl.checkAndSetHighestBlock(bi)

// Find the position of this block in the block sequence
pos := bl.canonicalChain.Back()
Expand Down Expand Up @@ -604,7 +605,7 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
notifyPos = newElem
}

bl.checkAndSetHighestBlock(bi.Number.Uint64(), bi.GasLimit)
bl.checkAndSetHighestBlock(bi)

}
return notifyPos
Expand Down Expand Up @@ -693,21 +694,27 @@ func (bl *blockListener) RemoveConsumer(_ context.Context, id *fftypes.UUID) {
delete(bl.consumers, *id)
}

func (bl *blockListener) waitForBlockHeightInit(ctx context.Context) bool {
bl.canonicalChainLock.RLock()
highestBlockSet := bl.highestBlockSet
bl.canonicalChainLock.RUnlock()
if highestBlockSet {
return true
}
select {
case <-bl.initialBlockHeightObtained:
return true
case <-ctx.Done():
return false
}
}

func (bl *blockListener) GetHighestBlock(ctx context.Context) (uint64, bool) {
bl.checkAndStartListenerLoop()
// block height will be established as the first step of listener startup process
// so we don't need to wait for the entire startup process to finish to return the result
bl.canonicalChainLock.RLock()
highestBlockSet := bl.highestBlockSet
bl.canonicalChainLock.RUnlock()
// if not yet initialized, wait to be initialized
if !highestBlockSet {
select {
case <-bl.initialBlockHeightObtained:
case <-ctx.Done():
// Inform caller we timed out, or were closed
return 0, false
}
if !bl.waitForBlockHeightInit(ctx) {
return 0, false
}
bl.canonicalChainLock.RLock()
highestBlock := bl.highestBlock
Expand All @@ -716,11 +723,30 @@ func (bl *blockListener) GetHighestBlock(ctx context.Context) (uint64, bool) {
return highestBlock, true
}

func (bl *blockListener) GetHighestBlockInfo(ctx context.Context) (*ethrpc.BlockInfoJSONRPC, bool) {
bl.checkAndStartListenerLoop()
if !bl.waitForBlockHeightInit(ctx) {
return nil, false
}
bl.canonicalChainLock.RLock()
defer bl.canonicalChainLock.RUnlock()
if bl.headBlockInfo == nil {
return nil, false
}
return bl.headBlockInfo, true
}

// Gives a non-nil value only if the block listener is tracking the head and has access to the full block
func (bl *blockListener) GetBlockGasLimit() *ethtypes.HexInteger {
bl.canonicalChainLock.RLock()
defer bl.canonicalChainLock.RUnlock()
return bl.highestBlockGasLimit
if bl.headBlockInfo == nil || bl.headBlockInfo.GasLimit == nil {
return nil
}
if bl.headBlockInfo.GasLimit.BigInt().Sign() <= 0 {
return nil
}
return bl.headBlockInfo.GasLimit
}

func (bl *blockListener) GetHeadBlockNumber(_ context.Context) uint64 {
Expand All @@ -734,15 +760,20 @@ func (bl *blockListener) setHighestBlock(block uint64) {
bl.highestBlockSet = true
}

// checkAndSetHighestBlock records the chain head height and caches full block info for the head.
// highestBlock is often set first by eth_blockNumber during startup, before any full block arrives.
// Caller MUST hold the canonicalChain WRITE LOCK
func (bl *blockListener) checkAndSetHighestBlock(block uint64, blockGasLimit *ethtypes.HexInteger) {
func (bl *blockListener) checkAndSetHighestBlock(bi *ethrpc.BlockInfoJSONRPC) {
block := bi.Number.Uint64()
if block > bl.highestBlock {
bl.highestBlock = block
bl.highestBlockSet = true
if blockGasLimit != nil && blockGasLimit.BigInt().Sign() > 0 {
bl.highestBlockGasLimit = blockGasLimit
}
bl.headBlockInfo = bi
} else if block == bl.highestBlock {
// Height already known from eth_blockNumber. Store the first full block at that height.
bl.headBlockInfo = bi
}
// Lower blocks are ignored. reconcileCanonicalChain also processes historical blocks during fork rebuilds.
}

// snapshot the whole view using the read-lock.
Expand Down
115 changes: 111 additions & 4 deletions pkg/ethblocklistener/blocklistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,11 @@ func TestBlockListenerOKSequential(t *testing.T) {
// block1003 has GasLimit set — inline to capture the extra field
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", block1003Hash.String(), false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**ethrpc.EVMBlockWithTxHashesJSONRPC) = &ethrpc.EVMBlockWithTxHashesJSONRPC{BlockHeaderJSONRPC: ethrpc.BlockHeaderJSONRPC{
Number: 1003,
Hash: block1003Hash,
ParentHash: block1002Hash,
GasLimit: ethtypes.NewHexInteger64(10000),
Number: 1003,
Hash: block1003Hash,
ParentHash: block1002Hash,
GasLimit: ethtypes.NewHexInteger64(10000),
BaseFeePerGas: ethtypes.NewHexInteger64(7),
}}
})
})
Expand Down Expand Up @@ -382,6 +383,10 @@ func TestBlockListenerOKSequential(t *testing.T) {
mRPC.AssertExpectations(t)
assert.Len(t, bl.SnapshotMonitoredHeadChain(), bl.MonitoredHeadLength)
require.Equal(t, int64(10000), bl.GetBlockGasLimit().Int64())
headBlockInfo, ok := bl.GetHighestBlockInfo(context.Background())
require.True(t, ok)
require.True(t, headBlockInfo.SupportsEIP1559())
require.Equal(t, int64(10000), headBlockInfo.GasLimit.Int64())
}

func TestBlockListenerWSShoulderTap(t *testing.T) {
Expand Down Expand Up @@ -1336,3 +1341,105 @@ func TestWaitUntilStartedCancelledCtx(t *testing.T) {
done()
bl.waitUntilStarted(context.Background())
}

func TestCheckAndSetHighestBlock(t *testing.T) {
_, bl, _, _ := newTestBlockListener(t)

bi500 := &ethrpc.BlockInfoJSONRPC{
Number: 500,
GasLimit: ethtypes.NewHexInteger64(10000),
}
bl.canonicalChainLock.Lock()
bl.checkAndSetHighestBlock(bi500)
require.Equal(t, uint64(500), bl.highestBlock)
require.True(t, bl.highestBlockSet)
require.Same(t, bi500, bl.headBlockInfo)

bi500EIP1559 := &ethrpc.BlockInfoJSONRPC{
Number: 500,
GasLimit: ethtypes.NewHexInteger64(20000),
BaseFeePerGas: ethtypes.NewHexInteger64(7),
}
bl.checkAndSetHighestBlock(bi500EIP1559)
require.Same(t, bi500EIP1559, bl.headBlockInfo)

bl.checkAndSetHighestBlock(&ethrpc.BlockInfoJSONRPC{Number: 499})
require.Same(t, bi500EIP1559, bl.headBlockInfo)
bl.canonicalChainLock.Unlock()
}

func TestGetBlockGasLimitFromHeadBlockInfo(t *testing.T) {
_, bl, _, _ := newTestBlockListener(t)
require.Nil(t, bl.GetBlockGasLimit())

bl.canonicalChainLock.Lock()
bl.headBlockInfo = &ethrpc.BlockInfoJSONRPC{GasLimit: ethtypes.NewHexInteger64(0)}
bl.canonicalChainLock.Unlock()
require.Nil(t, bl.GetBlockGasLimit())

bl.canonicalChainLock.Lock()
bl.headBlockInfo = &ethrpc.BlockInfoJSONRPC{}
bl.canonicalChainLock.Unlock()
require.Nil(t, bl.GetBlockGasLimit())
}

func TestGetHighestBlockInfoBeforeHeadBlockSeen(t *testing.T) {
_, bl, mRPC, done := newTestBlockListener(t)

mockInitialBlockHeight(mRPC, 500)
mockSeedBlockNotFound(mRPC, 500-(50-1)).Maybe()
mockNewBlockFilter(mRPC, testBlockFilterID1).Maybe()
mockFilterChangesEmpty(mRPC).Maybe()

_, ok := bl.GetHighestBlock(bl.ctx)
require.True(t, ok)

headInfo, ok := bl.GetHighestBlockInfo(bl.ctx)
require.False(t, ok)
require.Nil(t, headInfo)

done()
}

func TestGetHighestBlockInfoReturnsHeadBlock(t *testing.T) {
_, bl, mRPC, done := newTestBlockListener(t)

mockInitialBlockHeight(mRPC, 123)
mockSeedBlockNotFound(mRPC, 123-(50-1)).Maybe()
mockNewBlockFilter(mRPC, testBlockFilterID1).Maybe()
mockFilterChangesEmpty(mRPC).Maybe()

bi := &ethrpc.BlockInfoJSONRPC{
Number: 123,
BaseFeePerGas: ethtypes.NewHexInteger64(1),
GasLimit: ethtypes.NewHexInteger64(5000),
}
bl.canonicalChainLock.Lock()
bl.headBlockInfo = bi
bl.canonicalChainLock.Unlock()

headInfo, ok := bl.GetHighestBlockInfo(bl.ctx)
require.True(t, ok)
require.Same(t, bi, headInfo)
require.True(t, headInfo.SupportsEIP1559())
require.Equal(t, int64(5000), bl.GetBlockGasLimit().Int64())

done()
}

func TestGetHighestBlockInfoCancelledBeforeInit(t *testing.T) {
_, bl, mRPC, done := newTestBlockListener(t)
mockNewBlockFilter(mRPC, testBlockFilterID1).Maybe()
mockFilterChangesEmpty(mRPC).Maybe()
done()

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").
Return(&rpcbackend.RPCError{Message: "pop"})

cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()
_, ok := bl.GetHighestBlockInfo(cancelledCtx)
require.False(t, ok)

<-bl.listenLoopDone
}
Loading
Loading