diff --git a/changelog/mrogachev-nit-4499.md b/changelog/mrogachev-nit-4499.md new file mode 100644 index 0000000000..345b7c2877 --- /dev/null +++ b/changelog/mrogachev-nit-4499.md @@ -0,0 +1,2 @@ +### Internal +- Replace TransactionFiltererAPI mutex with a channel diff --git a/cmd/transaction-filterer/api/api.go b/cmd/transaction-filterer/api/api.go index 99323c3961..16f798b3ac 100644 --- a/cmd/transaction-filterer/api/api.go +++ b/cmd/transaction-filterer/api/api.go @@ -6,7 +6,6 @@ package api import ( "context" "errors" - "sync" "testing" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -20,34 +19,85 @@ import ( "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/solgen/go/precompilesgen" + "github.com/offchainlabs/nitro/util/stopwaiter" ) +const filterQueueSize = 100 + +type filterRequest struct { + ctx context.Context + hash common.Hash + // response fields, written by consumer before closing done + txHash common.Hash + err error + done chan struct{} +} + type TransactionFiltererAPI struct { - apiMutex sync.Mutex // avoids concurrent transactions with the same nonce + stopwaiter.StopWaiter + + queue chan *filterRequest arbFilteredTransactionsManager *precompilesgen.ArbFilteredTransactionsManager txOpts *bind.TransactOpts } -// Filter adds the given transaction hash to the filtered transactions set, which is managed by the ArbFilteredTransactionsManager precompile. -func (t *TransactionFiltererAPI) Filter(ctx context.Context, txHashToFilter common.Hash) (common.Hash, error) { - t.apiMutex.Lock() - defer t.apiMutex.Unlock() +func NewTransactionFiltererAPI( + manager *precompilesgen.ArbFilteredTransactionsManager, + txOpts *bind.TransactOpts, +) *TransactionFiltererAPI { + return &TransactionFiltererAPI{ + arbFilteredTransactionsManager: manager, + txOpts: txOpts, + queue: make(chan *filterRequest, filterQueueSize), + } +} - txOpts := *t.txOpts - txOpts.Context = ctx +func (t *TransactionFiltererAPI) Start(ctx context.Context) error { + t.StopWaiter.Start(ctx, t) + return stopwaiter.CallWhenTriggeredWith(&t.StopWaiterSafe, func(_ context.Context, req *filterRequest) { + if req.ctx.Err() != nil { + req.err = req.ctx.Err() + } else { + req.txHash, req.err = t.filter(req.ctx, req.hash) + } + close(req.done) + }, t.queue) +} - log.Info("Received call to filter transaction", "txHashToFilter", txHashToFilter.Hex()) +func (t *TransactionFiltererAPI) filter(ctx context.Context, txHashToFilter common.Hash) (common.Hash, error) { if t.arbFilteredTransactionsManager == nil { return common.Hash{}, errors.New("sequencer client not set yet") } + txOpts := *t.txOpts + txOpts.Context = ctx + log.Info("Received call to filter transaction", "txHashToFilter", txHashToFilter.Hex()) tx, err := t.arbFilteredTransactionsManager.AddFilteredTransaction(&txOpts, txHashToFilter) if err != nil { log.Warn("Failed to filter transaction", "txHashToFilter", txHashToFilter.Hex(), "err", err) return common.Hash{}, err - } else { - log.Info("Submitted filter transaction", "txHashToFilter", txHashToFilter.Hex(), "txHash", tx.Hash().Hex()) - return tx.Hash(), nil + } + log.Info("Submitted filter transaction", "txHashToFilter", txHashToFilter.Hex(), "txHash", tx.Hash().Hex()) + return tx.Hash(), nil +} + +// Filter adds the given transaction hash to the filtered transactions set, which is managed by the ArbFilteredTransactionsManager precompile. +func (t *TransactionFiltererAPI) Filter(ctx context.Context, txHashToFilter common.Hash) (common.Hash, error) { + req := &filterRequest{ + ctx: ctx, + hash: txHashToFilter, + done: make(chan struct{}), + } + select { + case t.queue <- req: + case <-ctx.Done(): + return common.Hash{}, ctx.Err() + } + select { + case <-req.done: + return req.txHash, req.err + case <-ctx.Done(): + return common.Hash{}, ctx.Err() } } @@ -65,9 +115,6 @@ func (t *TransactionFiltererAPI) SetSequencerClient(_ *testing.T, sequencerClien if err != nil { return err } - - t.apiMutex.Lock() - defer t.apiMutex.Unlock() t.arbFilteredTransactionsManager = arbFilteredTransactionsManager return nil } @@ -114,10 +161,7 @@ func NewStack( } } - api := &TransactionFiltererAPI{ - arbFilteredTransactionsManager: arbFilteredTransactionsManager, - txOpts: txOpts, - } + api := NewTransactionFiltererAPI(arbFilteredTransactionsManager, txOpts) apis := []rpc.API{{ Namespace: gethexec.TransactionFiltererNamespace, Version: "1.0", diff --git a/cmd/transaction-filterer/api/api_test.go b/cmd/transaction-filterer/api/api_test.go new file mode 100644 index 0000000000..6ca8dacc33 --- /dev/null +++ b/cmd/transaction-filterer/api/api_test.go @@ -0,0 +1,85 @@ +package api + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" +) + +func newTestAPI(t *testing.T) *TransactionFiltererAPI { + t.Helper() + api := NewTransactionFiltererAPI(nil, &bind.TransactOpts{}) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err := api.Start(ctx) + require.NoError(t, err) + t.Cleanup(api.StopAndWait) + return api +} + +func TestFilterNoManager(t *testing.T) { + api := newTestAPI(t) + + _, err := api.Filter(context.Background(), common.HexToHash("0x1234")) + require.ErrorContains(t, err, "sequencer client not set yet") +} + +func TestFilterContextCancelledBeforeEnqueue(t *testing.T) { + api := newTestAPI(t) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + _, err := api.Filter(ctx, common.HexToHash("0x1234")) + require.ErrorIs(t, err, context.Canceled) +} + +func TestFilterSequentialProcessing(t *testing.T) { + api := newTestAPI(t) + + // Fill the queue to capacity to verify sequential processing. + // Without a real manager we can only test the "no manager" error path, + // but we can verify requests are processed one by one. + const n = 10 + errs := make([]error, n) + var wg sync.WaitGroup + wg.Add(n) + for i := range n { + go func(idx int) { + defer wg.Done() + _, errs[idx] = api.Filter(context.Background(), common.HexToHash("0xabcd")) + }(i) + } + wg.Wait() + + for i, err := range errs { + require.ErrorContains(t, err, "sequencer client not set yet", "request %d", i) + } +} + +func TestFilterContextCancelledWhileQueued(t *testing.T) { + api := newTestAPI(t) + + // Block the consumer by sending a request that will take time. + // We can't easily block the consumer without a real manager, but we + // can fill the queue and cancel one of the waiting callers. + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + // This should either get processed (returning "no manager" error) + // or the context should cancel. + _, err := api.Filter(ctx, common.HexToHash("0x5678")) + if err != nil { + // Either context.DeadlineExceeded or "sequencer client not set yet" — both valid. + require.True(t, errors.Is(err, context.DeadlineExceeded) || + err.Error() == "sequencer client not set yet", + "unexpected error: %v", err) + } +} diff --git a/cmd/transaction-filterer/main.go b/cmd/transaction-filterer/main.go index 2cb416a017..1344ae6d21 100644 --- a/cmd/transaction-filterer/main.go +++ b/cmd/transaction-filterer/main.go @@ -204,11 +204,16 @@ func mainImpl() int { return 1 } - stack, _, err := api.NewStack(&stackConf, txOpts, sequencerClient) + stack, api, err := api.NewStack(&stackConf, txOpts, sequencerClient) if err != nil { fmt.Fprintf(os.Stderr, "error creating stack: %v\n", err) return 1 } + err = api.Start(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "error starting API: %v\n", err) + return 1 + } err = stack.Start() if err != nil { @@ -220,6 +225,7 @@ func mainImpl() int { sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt, syscall.SIGTERM) <-sigint + api.StopAndWait() return 0 } diff --git a/system_tests/delayed_message_filter_test.go b/system_tests/delayed_message_filter_test.go index 4d084085f5..638b19bc05 100644 --- a/system_tests/delayed_message_filter_test.go +++ b/system_tests/delayed_message_filter_test.go @@ -140,6 +140,10 @@ func createTransactionFiltererService(t *testing.T, ctx context.Context, builder transactionFiltererStackConf.AuthPort = 0 transactionFiltererStack, transactionFiltererAPI, err := api.NewStack(&transactionFiltererStackConf, &filtererTxOpts, nil) require.NoError(t, err) + + err = transactionFiltererAPI.Start(ctx) + require.NoError(t, err) + err = transactionFiltererStack.Start() require.NoError(t, err)