Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9b5a9f1
rev
rauljordan Feb 23, 2026
77e9d02
add a mel enabled L3 test and modify the nitro testnode
rauljordan Feb 23, 2026
34d5942
changelog
rauljordan Feb 23, 2026
52dc5a2
Merge branch 'raul/mel-pr3-core' into raul/mel-test-node
rauljordan Feb 23, 2026
0acfab0
support mel inbox reader / tracker interface for staker
rauljordan Feb 23, 2026
2f8ea96
Merge branch 'raul/mel-test-node' of github.com:OffchainLabs/nitro in…
rauljordan Feb 23, 2026
84ceca6
build
rauljordan Feb 23, 2026
8199cc0
added metrics to the mel runner
rauljordan Feb 26, 2026
9efed40
Merge branch 'raul/mel-metrics' into raul/mel-test-node
rauljordan Feb 26, 2026
68debf4
Merge branch 'raul/mel-test-node' of github.com:OffchainLabs/nitro in…
rauljordan Feb 26, 2026
61fe102
improve metrics
rauljordan Feb 26, 2026
6398abd
sync
rauljordan Mar 17, 2026
cb0255b
geth commit
rauljordan Mar 17, 2026
3b326ac
testnode
rauljordan Mar 17, 2026
537226d
Merge branch 'raul/mel-pr3-core' into raul/mel-test-node
rauljordan Mar 18, 2026
333ce36
Merge branch 'master' into raul/mel-test-node
rauljordan Mar 23, 2026
aee81b6
build
rauljordan Mar 23, 2026
c66b700
fix up l3 test
rauljordan Mar 23, 2026
5db2580
Merge branch 'master' into raul/mel-test-node
rauljordan Mar 23, 2026
c44964f
process block
rauljordan Mar 23, 2026
c94fac8
Merge branch 'raul/mel-test-node' of github.com:OffchainLabs/nitro in…
rauljordan Mar 23, 2026
60cc9c0
edited the runner with some issues found by pr review toolkit Joshs p…
rauljordan Mar 23, 2026
1c51e6a
Merge branch 'master' into raul/mel-test-node
rauljordan Mar 23, 2026
a521155
josh feedback
rauljordan Mar 31, 2026
2940512
josh feedback and prom counter renames
rauljordan Mar 31, 2026
a336cdf
Merge branch 'master' into raul/mel-test-node
rauljordan Mar 31, 2026
36a7de0
builds
rauljordan Apr 1, 2026
6308c65
fix conflicts
rauljordan Apr 3, 2026
4cd9ee0
josh feedback
rauljordan Apr 6, 2026
38a21b2
Merge branch 'master' into raul/mel-test-node
rauljordan Apr 6, 2026
5d8fd7d
Merge branch 'master' into raul/mel-test-node
rauljordan Apr 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arbnode/mel/runner/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (d *Database) setMelState(batch ethdb.KeyValueWriter, parentChainBlockNumbe
if err != nil {
return err
}
melStateSizeBytesGauge.Update(int64(len(melStateBytes)))
if err := batch.Put(key, melStateBytes); err != nil {
return err
}
Expand Down
66 changes: 47 additions & 19 deletions arbnode/mel/runner/mel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"

Expand All @@ -29,46 +28,49 @@ import (
"github.com/offchainlabs/nitro/bold/containers/fsm"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/daprovider"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

var (
stuckFSMIndicatingGauge = metrics.NewRegisteredGauge("arb/mel/stuck", nil) // 1-stuck, 0-not_stuck
)

type MessageExtractionConfig struct {
Enable bool `koanf:"enable"`
RetryInterval time.Duration `koanf:"retry-interval"`
BlocksToPrefetch uint64 `koanf:"blocks-to-prefetch"`
ReadMode string `koanf:"read-mode"`
StallTolerance uint64 `koanf:"stall-tolerance"`
Enable bool `koanf:"enable"`
RetryInterval time.Duration `koanf:"retry-interval"`
BlocksToPrefetch uint64 `koanf:"blocks-to-prefetch"`
ReadMode string `koanf:"read-mode"`
StallTolerance uint64 `koanf:"stall-tolerance"`
LogExtractionStatusFrequencyBlocks uint64 `koanf:"log-extraction-status-frequency-blocks"`
}

func (c *MessageExtractionConfig) Validate() error {
c.ReadMode = strings.ToLower(c.ReadMode)
if c.ReadMode != "latest" && c.ReadMode != "safe" && c.ReadMode != "finalized" {
return fmt.Errorf("inbox reader read-mode is invalid, want: latest or safe or finalized, got: %s", c.ReadMode)
}
if c.LogExtractionStatusFrequencyBlocks == 0 {
return errors.New("log-extraction-status-frequency-blocks must be greater than 0")
}
return nil
}

var DefaultMessageExtractionConfig = MessageExtractionConfig{
Enable: false,
// The retry interval for the message extractor FSM. After each tick of the FSM,
// the extractor service stop waiter will wait for this duration before trying to act again.
RetryInterval: time.Millisecond * 500,
BlocksToPrefetch: 499, // 500 is the eth_getLogs block range limit
ReadMode: "latest",
StallTolerance: 10,
RetryInterval: time.Millisecond * 500,
BlocksToPrefetch: 499, // 500 is the eth_getLogs block range limit
ReadMode: "latest",
StallTolerance: 10,
LogExtractionStatusFrequencyBlocks: 100,
}

var TestMessageExtractionConfig = MessageExtractionConfig{
Enable: false,
RetryInterval: time.Millisecond * 10,
BlocksToPrefetch: 499,
ReadMode: "latest",
StallTolerance: 10,
Enable: false,
RetryInterval: time.Millisecond * 10,
BlocksToPrefetch: 499,
ReadMode: "latest",
StallTolerance: 10,
LogExtractionStatusFrequencyBlocks: 100,
}

func MessageExtractionConfigAddOptions(prefix string, f *pflag.FlagSet) {
Expand All @@ -77,6 +79,7 @@ func MessageExtractionConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Uint64(prefix+".blocks-to-prefetch", DefaultMessageExtractionConfig.BlocksToPrefetch, "the number of blocks to prefetch relevant logs from. Recommend using max allowed range for eth_getLogs rpc query")
f.String(prefix+".read-mode", DefaultMessageExtractionConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized")
f.Uint64(prefix+".stall-tolerance", DefaultMessageExtractionConfig.StallTolerance, "max times the MEL fsm is allowed to be stuck without logging error")
f.Uint64(prefix+".log-extraction-status-frequency-blocks", DefaultMessageExtractionConfig.LogExtractionStatusFrequencyBlocks, "frequency of logging message extraction status in terms of number of blocks processed")
}

// SequencerBatchCountFetcher queries the on-chain sequencer inbox batch count at a given parent chain block.
Expand Down Expand Up @@ -321,6 +324,17 @@ func (m *MessageExtractor) GetMsgCount() (arbutil.MessageIndex, error) {
return arbutil.MessageIndex(headState.MsgCount), nil
}

func (m *MessageExtractor) GetDelayedMessageBytes(ctx context.Context, seqNum uint64) ([]byte, error) {
msg, err := m.GetDelayedMessage(seqNum)
if err != nil {
return nil, err
}
if msg.Message == nil {
return nil, fmt.Errorf("message at seqNum %d has nil Message field", seqNum)
}
return msg.Message.Serialize()
}

func (m *MessageExtractor) GetDelayedMessage(index uint64) (*mel.DelayedInboxMessage, error) {
headState, err := m.melDB.GetHeadMelState()
if err != nil {
Expand Down Expand Up @@ -519,6 +533,10 @@ func (m *MessageExtractor) FindInboxBatchContainingMessage(pos arbutil.MessageIn
}
}

func (m *MessageExtractor) SetBlockValidator(_ *staker.BlockValidator) {
log.Info("MEL does not support block validation registration; SetBlockValidator is a no-op")
}

func (m *MessageExtractor) GetBatchCount() (uint64, error) {
headState, err := m.melDB.GetHeadMelState()
if err != nil {
Expand All @@ -527,6 +545,14 @@ func (m *MessageExtractor) GetBatchCount() (uint64, error) {
return headState.BatchCount, nil
}

func (m *MessageExtractor) GetBatchAcc(seqNum uint64) (common.Hash, error) {
metadata, err := m.GetBatchMetadata(seqNum)
if err != nil {
return common.Hash{}, err
}
return metadata.Accumulator, nil
}

func (m *MessageExtractor) CaughtUp() chan struct{} {
return m.caughtUpChan
}
Expand All @@ -549,13 +575,15 @@ func (m *MessageExtractor) Act(ctx context.Context) (time.Duration, error) {
// from the parent chain block. The FSM will transition to the `SavingMessages`
// state after successfully extracting messages.
case ProcessingNextBlock:
fsmBlocksProcessedCounter.Inc(1)
return m.processNextBlock(ctx, current)
// `SavingMessages` is the state responsible for saving the extracted messages
// and delayed messages to the database. It stores data in the node's consensus database
// and runs after the `ProcessingNextBlock` state.
// After data is stored, the FSM will then transition to the `ProcessingNextBlock` state
// yet again.
case SavingMessages:
fsmSaveMessagesCounter.Inc(1)
return m.saveMessages(ctx, current)
// `Reorging` is the state responsible for handling reorgs in the parent chain.
// It is triggered when a reorg occurs, and it will revert the MEL state being processed to the
Expand Down
32 changes: 32 additions & 0 deletions arbnode/mel/runner/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package melrunner

import "github.com/ethereum/go-ethereum/metrics"

var (
// FSM health.
stuckFSMIndicatingGauge = metrics.NewRegisteredGauge("arb/mel/stuck", nil) // 1-stuck, 0-not_stuck
fsmBlocksProcessedCounter = metrics.NewRegisteredCounter("arb/mel/fsm/process_block_total", nil)
fsmSaveMessagesCounter = metrics.NewRegisteredCounter("arb/mel/fsm/save_messages_total", nil)

// State progress.
latestBlockGauge = metrics.NewRegisteredGauge("arb/mel/latest/parent_chain_block_number", nil)
latestMsgCountGauge = metrics.NewRegisteredGauge("arb/mel/latest/msg_count", nil)
latestDelayedSeenCountGauge = metrics.NewRegisteredGauge("arb/mel/latest/delayed_msg_seen_count", nil)
latestDelayedReadCountGauge = metrics.NewRegisteredGauge("arb/mel/latest/delayed_msg_read_count", nil)

// Throughput.
msgsExtractedCounter = metrics.NewRegisteredCounter("arb/mel/msgs/extracted_total", nil)
msgsPushedCounter = metrics.NewRegisteredCounter("arb/mel/msgs/pushed_to_execution_total", nil)

// Errors.
extractionErrors = metrics.NewRegisteredCounter("arb/mel/errors/extraction_function_errors_total", nil)

// Reorgs
reorgCounter = metrics.NewRegisteredCounter("arb/mel/reorgs_total", nil)

// Performance.
blockProcessTimeGauge = metrics.NewRegisteredGauge("arb/mel/block_processing_time_micros", nil)

// MEL state size bytes.
melStateSizeBytesGauge = metrics.NewRegisteredGauge("arb/mel/mel_state_size_bytes", nil)
)
23 changes: 22 additions & 1 deletion arbnode/mel/runner/process_next_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/arbnode/mel/extraction"
melextraction "github.com/offchainlabs/nitro/arbnode/mel/extraction"
"github.com/offchainlabs/nitro/bold/containers/fsm"
)

Expand Down Expand Up @@ -81,6 +81,7 @@ func (m *MessageExtractor) processNextBlock(ctx context.Context, current *fsm.Cu
if err = m.logsAndHeadersPreFetcher.fetch(ctx, preState); err != nil {
return m.config.RetryInterval, err
}
start := time.Now()
postState, msgs, delayedMsgs, batchMetas, err := melextraction.ExtractMessages(
ctx,
preState,
Expand All @@ -92,6 +93,7 @@ func (m *MessageExtractor) processNextBlock(ctx context.Context, current *fsm.Cu
m.chainConfig,
)
if err != nil {
extractionErrors.Inc(1)
if errors.Is(err, mel.ErrDelayedMessagePreimageNotFound) {
if err := preState.RebuildDelayedMsgPreimages(m.melDB.FetchDelayedMessage); err != nil {
return m.config.RetryInterval, fmt.Errorf("error rebuilding delayed msg preimages when missing some preimages: %w", err)
Expand All @@ -100,6 +102,25 @@ func (m *MessageExtractor) processNextBlock(ctx context.Context, current *fsm.Cu
}
return m.config.RetryInterval, err
}
elapsed := time.Since(start)
// After processing every 100 parent chain blocks, print a status log
if postState.ParentChainBlockNumber%m.config.LogExtractionStatusFrequencyBlocks == 0 {
log.Info("Message extraction successful", "parentChainBlockNumber", postState.ParentChainBlockNumber, "msgCount", postState.MsgCount)
}

// Update metrics.
//#nosec G115
latestBlockGauge.Update(int64(postState.ParentChainBlockNumber))
//#nosec G115
latestMsgCountGauge.Update(int64(postState.MsgCount))
//#nosec G115
latestDelayedSeenCountGauge.Update(int64(postState.DelayedMessagesSeen))
//#nosec G115
latestDelayedReadCountGauge.Update(int64(postState.DelayedMessagesRead))
//#nosec G115
msgsExtractedCounter.Inc(int64(len(msgs)))
blockProcessTimeGauge.Update(elapsed.Microseconds())

// Begin the next FSM state immediately.
return 0, m.fsm.Do(saveMessages{
preStateMsgCount: preState.MsgCount,
Expand Down
3 changes: 3 additions & 0 deletions arbnode/mel/runner/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func (m *MessageExtractor) reorg(ctx context.Context, current *fsm.CurrentState[
return m.config.RetryInterval, err
}
m.logsAndHeadersPreFetcher.reset()

// Update metrics.
reorgCounter.Inc(1)
return 0, m.fsm.Do(processNextBlock{
prevStepWasReorg: true,
melState: previousState,
Expand Down
1 change: 1 addition & 0 deletions arbnode/mel/runner/save_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (m *MessageExtractor) saveMessages(ctx context.Context, current *fsm.Curren
log.Error("Error saving latest state as head state to db", "err", err)
return m.config.RetryInterval, err
}
msgsPushedCounter.Inc(int64(len(saveAction.messages)))
return 0, m.fsm.Do(processNextBlock{
melState: saveAction.postState,
})
Expand Down
22 changes: 20 additions & 2 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ func getInboxTrackerAndReader(
sequencerInbox *SequencerInbox,
) (*InboxTracker, *InboxReader, error) {
if config.MessageExtraction.Enable {
log.Info("Inbox reader and tracker disabled")
return nil, nil, nil
}
inboxTracker, err := NewInboxTracker(consensusDB, txStreamer, dapReaders)
Expand Down Expand Up @@ -863,6 +864,7 @@ func getMessageExtractor(
if err != nil {
return nil, err
}
log.Info("Message extractor enabled")
return msgExtractor, nil
}

Expand Down Expand Up @@ -939,6 +941,7 @@ func getStaker(
statelessBlockValidator *staker.StatelessBlockValidator,
blockValidator *staker.BlockValidator,
dapRegistry *daprovider.DAProviderRegistry,
messageExtractor *melrunner.MessageExtractor,
) (*multiprotocolstaker.MultiProtocolStaker, *MessagePruner, common.Address, error) {
var stakerObj *multiprotocolstaker.MultiProtocolStaker
var messagePruner *MessagePruner
Expand Down Expand Up @@ -990,11 +993,26 @@ func getStaker(

var confirmedNotifiers []legacystaker.LatestConfirmedNotifier
if config.MessagePruner.Enable {
if inboxTracker == nil {
return nil, nil, common.Address{}, errors.New("message pruning cannot be enabled when inbox tracker is disabled (e.g. with Message Extraction enabled)")
}
messagePruner = NewMessagePruner(txStreamer, inboxTracker, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner })
confirmedNotifiers = append(confirmedNotifiers, messagePruner)
}

stakerObj, err = multiprotocolstaker.NewMultiProtocolStaker(stack, l1Reader, wallet, bind.CallOpts{}, func() *legacystaker.L1ValidatorConfig { return &configFetcher.Get().Staker }, &configFetcher.Get().Bold, blockValidator, statelessBlockValidator, nil, deployInfo.StakeToken, deployInfo.Rollup, confirmedNotifiers, deployInfo.ValidatorUtils, deployInfo.Bridge, txStreamer, inboxTracker, inboxReader, dapRegistry, fatalErrChan)
var tracker staker.InboxTrackerInterface
var reader staker.InboxReaderInterface
if messageExtractor != nil {
tracker = messageExtractor
reader = messageExtractor
} else {
tracker = inboxTracker
reader = inboxReader
}
if tracker == nil || reader == nil {
return nil, nil, common.Address{}, errors.New("staker requires either message extractor or inbox tracker/reader")
}
stakerObj, err = multiprotocolstaker.NewMultiProtocolStaker(stack, l1Reader, wallet, bind.CallOpts{}, func() *legacystaker.L1ValidatorConfig { return &configFetcher.Get().Staker }, &configFetcher.Get().Bold, blockValidator, statelessBlockValidator, nil, deployInfo.StakeToken, deployInfo.Rollup, confirmedNotifiers, deployInfo.ValidatorUtils, deployInfo.Bridge, txStreamer, tracker, reader, dapRegistry, fatalErrChan)
if err != nil {
return nil, nil, common.Address{}, err
}
Expand Down Expand Up @@ -1363,7 +1381,7 @@ func createNodeImpl(
return nil, err
}

stakerObj, messagePruner, stakerAddr, err := getStaker(ctx, config, configFetcher, consensusDB, l1Reader, txOptsValidator, syncMonitor, parentChain, l1client, deployInfo, txStreamer, inboxTracker, inboxReader, stack, fatalErrChan, statelessBlockValidator, blockValidator, dapRegistry)
stakerObj, messagePruner, stakerAddr, err := getStaker(ctx, config, configFetcher, consensusDB, l1Reader, txOptsValidator, syncMonitor, parentChain, l1client, deployInfo, txStreamer, inboxTracker, inboxReader, stack, fatalErrChan, statelessBlockValidator, blockValidator, dapRegistry, messageExtractor)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbnode/db/schema"
"github.com/offchainlabs/nitro/arbnode/mel/runner"
melrunner "github.com/offchainlabs/nitro/arbnode/mel/runner"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
Expand Down
2 changes: 2 additions & 0 deletions changelog/rauljordan-nit-4537.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
### Added
- Nitro metrics for MEL and L3 system test
2 changes: 1 addition & 1 deletion staker/multi_protocol/multi_protocol_staker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/offchainlabs/nitro/solgen/go/rollupgen"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/staker/bold"
"github.com/offchainlabs/nitro/staker/legacy"
legacystaker "github.com/offchainlabs/nitro/staker/legacy"
"github.com/offchainlabs/nitro/staker/txbuilder"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand Down
32 changes: 32 additions & 0 deletions system_tests/l3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func TestSimpleL3(t *testing.T) {
defer cancel()

builder := NewNodeBuilder(ctx).DefaultConfig(t, true)
builder.nodeConfig.MessageExtraction.Enable = true
builder.l3Config.nodeConfig.MessageExtraction.Enable = true
cleanupL1AndL2 := builder.Build(t)
defer cleanupL1AndL2()

Expand All @@ -24,6 +26,7 @@ func TestSimpleL3(t *testing.T) {
firstNodeTestClient := builder.L3

secondNodeNodeConfig := arbnode.ConfigDefaultL1NonSequencerTest()
secondNodeNodeConfig.MessageExtraction.Enable = true
secondNodeTestClient, cleanupL3SecondNode := builder.Build2ndNodeOnL3(t, &SecondNodeParams{nodeConfig: secondNodeNodeConfig})
defer cleanupL3SecondNode()

Expand Down Expand Up @@ -51,4 +54,33 @@ func TestSimpleL3(t *testing.T) {
if l2balance.Cmp(big.NewInt(1e12)) != 0 {
t.Fatal("Unexpected balance:", l2balance)
}

// Wait for MEL to catch up on both nodes before checking state.
timeout := time.NewTimer(time.Minute)
defer timeout.Stop()
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-timeout.C:
t.Fatal("Timed out waiting for MEL head states to converge with MsgCount > 1")
case <-tick.C:
}
headState1, err := firstNodeTestClient.ConsensusNode.MessageExtractor.GetHeadState()
if err != nil {
t.Logf("Node 1 GetHeadState transient error: %v", err)
continue
}
if headState1.MsgCount <= 1 {
continue
}
headState2, err := secondNodeTestClient.ConsensusNode.MessageExtractor.GetHeadState()
if err != nil {
t.Logf("Node 2 GetHeadState transient error: %v", err)
continue
}
if headState1.Hash() == headState2.Hash() {
break
}
}
}
Loading