Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/mrogachev-nit-4499.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
### Internal
- Replace TransactionFiltererAPI mutex with a channel
82 changes: 63 additions & 19 deletions cmd/transaction-filterer/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package api
import (
"context"
"errors"
"sync"
"testing"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -20,34 +19,85 @@ import (

"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/solgen/go/precompilesgen"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

const filterQueueSize = 100

type filterRequest struct {
ctx context.Context
hash common.Hash
// response fields, written by consumer before closing done
txHash common.Hash
err error
done chan struct{}
}

type TransactionFiltererAPI struct {
apiMutex sync.Mutex // avoids concurrent transactions with the same nonce
stopwaiter.StopWaiter

queue chan *filterRequest

arbFilteredTransactionsManager *precompilesgen.ArbFilteredTransactionsManager
txOpts *bind.TransactOpts
}

// Filter adds the given transaction hash to the filtered transactions set, which is managed by the ArbFilteredTransactionsManager precompile.
func (t *TransactionFiltererAPI) Filter(ctx context.Context, txHashToFilter common.Hash) (common.Hash, error) {
t.apiMutex.Lock()
defer t.apiMutex.Unlock()
func NewTransactionFiltererAPI(
manager *precompilesgen.ArbFilteredTransactionsManager,
txOpts *bind.TransactOpts,
) *TransactionFiltererAPI {
return &TransactionFiltererAPI{
arbFilteredTransactionsManager: manager,
txOpts: txOpts,
queue: make(chan *filterRequest, filterQueueSize),
}
}

txOpts := *t.txOpts
txOpts.Context = ctx
func (t *TransactionFiltererAPI) Start(ctx context.Context) error {
t.StopWaiter.Start(ctx, t)
return stopwaiter.CallWhenTriggeredWith(&t.StopWaiterSafe, func(_ context.Context, req *filterRequest) {
if req.ctx.Err() != nil {
req.err = req.ctx.Err()
} else {
req.txHash, req.err = t.filter(req.ctx, req.hash)
}
close(req.done)
}, t.queue)
}

log.Info("Received call to filter transaction", "txHashToFilter", txHashToFilter.Hex())
func (t *TransactionFiltererAPI) filter(ctx context.Context, txHashToFilter common.Hash) (common.Hash, error) {
if t.arbFilteredTransactionsManager == nil {
return common.Hash{}, errors.New("sequencer client not set yet")
}
txOpts := *t.txOpts
txOpts.Context = ctx
log.Info("Received call to filter transaction", "txHashToFilter", txHashToFilter.Hex())
tx, err := t.arbFilteredTransactionsManager.AddFilteredTransaction(&txOpts, txHashToFilter)
if err != nil {
log.Warn("Failed to filter transaction", "txHashToFilter", txHashToFilter.Hex(), "err", err)
return common.Hash{}, err
} else {
log.Info("Submitted filter transaction", "txHashToFilter", txHashToFilter.Hex(), "txHash", tx.Hash().Hex())
return tx.Hash(), nil
}
log.Info("Submitted filter transaction", "txHashToFilter", txHashToFilter.Hex(), "txHash", tx.Hash().Hex())
return tx.Hash(), nil
}

// Filter adds the given transaction hash to the filtered transactions set, which is managed by the ArbFilteredTransactionsManager precompile.
func (t *TransactionFiltererAPI) Filter(ctx context.Context, txHashToFilter common.Hash) (common.Hash, error) {
req := &filterRequest{
ctx: ctx,
hash: txHashToFilter,
done: make(chan struct{}),
}
select {
case t.queue <- req:
case <-ctx.Done():
return common.Hash{}, ctx.Err()
}
select {
case <-req.done:
return req.txHash, req.err
case <-ctx.Done():
return common.Hash{}, ctx.Err()
}
}

Expand All @@ -65,9 +115,6 @@ func (t *TransactionFiltererAPI) SetSequencerClient(_ *testing.T, sequencerClien
if err != nil {
return err
}

t.apiMutex.Lock()
defer t.apiMutex.Unlock()
t.arbFilteredTransactionsManager = arbFilteredTransactionsManager
return nil
}
Expand Down Expand Up @@ -114,10 +161,7 @@ func NewStack(
}
}

api := &TransactionFiltererAPI{
arbFilteredTransactionsManager: arbFilteredTransactionsManager,
txOpts: txOpts,
}
api := NewTransactionFiltererAPI(arbFilteredTransactionsManager, txOpts)
apis := []rpc.API{{
Namespace: gethexec.TransactionFiltererNamespace,
Version: "1.0",
Expand Down
85 changes: 85 additions & 0 deletions cmd/transaction-filterer/api/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package api

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)

func newTestAPI(t *testing.T) *TransactionFiltererAPI {
t.Helper()
api := NewTransactionFiltererAPI(nil, &bind.TransactOpts{})
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
err := api.Start(ctx)
require.NoError(t, err)
t.Cleanup(api.StopAndWait)
return api
}

func TestFilterNoManager(t *testing.T) {
api := newTestAPI(t)

_, err := api.Filter(context.Background(), common.HexToHash("0x1234"))
require.ErrorContains(t, err, "sequencer client not set yet")
}

func TestFilterContextCancelledBeforeEnqueue(t *testing.T) {
api := newTestAPI(t)

ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately

_, err := api.Filter(ctx, common.HexToHash("0x1234"))
require.ErrorIs(t, err, context.Canceled)
}

func TestFilterSequentialProcessing(t *testing.T) {
api := newTestAPI(t)

// Fill the queue to capacity to verify sequential processing.
// Without a real manager we can only test the "no manager" error path,
// but we can verify requests are processed one by one.
const n = 10
errs := make([]error, n)
var wg sync.WaitGroup
wg.Add(n)
for i := range n {
go func(idx int) {
defer wg.Done()
_, errs[idx] = api.Filter(context.Background(), common.HexToHash("0xabcd"))
}(i)
}
wg.Wait()

for i, err := range errs {
require.ErrorContains(t, err, "sequencer client not set yet", "request %d", i)
}
}

func TestFilterContextCancelledWhileQueued(t *testing.T) {
api := newTestAPI(t)

// Block the consumer by sending a request that will take time.
// We can't easily block the consumer without a real manager, but we
// can fill the queue and cancel one of the waiting callers.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

// This should either get processed (returning "no manager" error)
// or the context should cancel.
_, err := api.Filter(ctx, common.HexToHash("0x5678"))
if err != nil {
// Either context.DeadlineExceeded or "sequencer client not set yet" — both valid.
require.True(t, errors.Is(err, context.DeadlineExceeded) ||
err.Error() == "sequencer client not set yet",
"unexpected error: %v", err)
}
}
8 changes: 7 additions & 1 deletion cmd/transaction-filterer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,16 @@ func mainImpl() int {
return 1
}

stack, _, err := api.NewStack(&stackConf, txOpts, sequencerClient)
stack, api, err := api.NewStack(&stackConf, txOpts, sequencerClient)
if err != nil {
fmt.Fprintf(os.Stderr, "error creating stack: %v\n", err)
return 1
}
err = api.Start(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "error starting API: %v\n", err)
return 1
}

err = stack.Start()
if err != nil {
Expand All @@ -220,6 +225,7 @@ func mainImpl() int {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGTERM)
<-sigint
api.StopAndWait()

return 0
}
Expand Down
4 changes: 4 additions & 0 deletions system_tests/delayed_message_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func createTransactionFiltererService(t *testing.T, ctx context.Context, builder
transactionFiltererStackConf.AuthPort = 0
transactionFiltererStack, transactionFiltererAPI, err := api.NewStack(&transactionFiltererStackConf, &filtererTxOpts, nil)
require.NoError(t, err)

err = transactionFiltererAPI.Start(ctx)
require.NoError(t, err)

err = transactionFiltererStack.Start()
require.NoError(t, err)

Expand Down
Loading