Skip to content

Replace TransactionFiltererAPI mutex with a channel#4428

Open
MishkaRogachev wants to merge 3 commits intomasterfrom
filterer-use-a-channel-instead-of-mutex
Open

Replace TransactionFiltererAPI mutex with a channel#4428
MishkaRogachev wants to merge 3 commits intomasterfrom
filterer-use-a-channel-instead-of-mutex

Conversation

@MishkaRogachev
Copy link
Contributor

Fixes NIT-4499

Address #4294 (comment)

@codecov
Copy link

codecov bot commented Feb 24, 2026

Codecov Report

❌ Patch coverage is 0% with 40 lines in your changes missing coverage. Please review.
✅ Project coverage is 32.32%. Comparing base (3833eec) to head (48f0b05).

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4428      +/-   ##
==========================================
- Coverage   33.03%   32.32%   -0.71%     
==========================================
  Files         493      493              
  Lines       58290    58315      +25     
==========================================
- Hits        19256    18853     -403     
- Misses      35675    36109     +434     
+ Partials     3359     3353       -6     

@MishkaRogachev MishkaRogachev force-pushed the filterer-use-a-channel-instead-of-mutex branch from 2c947df to 9869061 Compare February 24, 2026 13:47
@github-actions
Copy link
Contributor

github-actions bot commented Feb 24, 2026

❌ 8 Tests Failed:

Tests completed Failed Passed Skipped
4248 8 4240 0
View the top 3 failed tests by shortest run time
TestDataStreaming_PositiveScenario/Many_senders,_long_messages
Stack Traces | 0.160s run time
... [CONTENT TRUNCATED: Keeping last 20 lines]
        github.com/offchainlabs/nitro/daprovider/data_streaming.testBasic.func1()
        	/home/runner/work/nitro/nitro/daprovider/data_streaming/protocol_test.go:230 +0x19b
        created by github.com/offchainlabs/nitro/daprovider/data_streaming.testBasic in goroutine 204
        	/home/runner/work/nitro/nitro/daprovider/data_streaming/protocol_test.go:223 +0x85
        
    protocol_test.go:230: �[31;1m [] too much time has elapsed since request was signed �[0;0m
WARN [03-03|16:38:15.032] Served datastreaming_start               conn=127.0.0.1:33988 reqid=350 duration="125.278µs" err="too much time has elapsed since request was signed"
INFO [03-03|16:38:15.035] rpc response                             method=datastreaming_start logId=350 err="too much time has elapsed since request was signed" result={} attempt=0 args="[\"0x69a70e76\", \"0x30\", \"0xd9\", \"0x288f\", \"0xa\", \"0x3991cb2eedc46608bbc4421d51beb6504f91638c0f686b87608708c4967235c33bc50abdbb1d19ba9f80415983425d1921d80def98889ffc405676dbc668038101\"]" errorData=null
    protocol_test.go:230: goroutine 307 [running]:
        runtime/debug.Stack()
        	/opt/hostedtoolcache/go/1.25.7/x64/src/runtime/debug/stack.go:26 +0x5e
        github.com/offchainlabs/nitro/util/testhelpers.RequireImpl({0x161fa30, 0xc0005a0e00}, {0x1606280, 0xc0014b2ff0}, {0x0, 0x0, 0x0})
        	/home/runner/work/nitro/nitro/util/testhelpers/testhelpers.go:29 +0x9f
        github.com/offchainlabs/nitro/daprovider/data_streaming.testBasic.func1()
        	/home/runner/work/nitro/nitro/daprovider/data_streaming/protocol_test.go:230 +0x19b
        created by github.com/offchainlabs/nitro/daprovider/data_streaming.testBasic in goroutine 204
        	/home/runner/work/nitro/nitro/daprovider/data_streaming/protocol_test.go:223 +0x85
        
    protocol_test.go:230: �[31;1m [] too much time has elapsed since request was signed �[0;0m
--- FAIL: TestDataStreaming_PositiveScenario/Many_senders,_long_messages (0.16s)
TestDataStreaming_PositiveScenario
Stack Traces | 0.180s run time
=== RUN   TestDataStreaming_PositiveScenario
--- FAIL: TestDataStreaming_PositiveScenario (0.18s)
TestRedisProduceComplex/one_producer,_all_consumers_are_active
Stack Traces | 2.009s run time
... [CONTENT TRUNCATED: Keeping last 20 lines]
�[36mDEBUG�[0m[03-03|16:38:19.524] checkResponses                           �[36mresponded�[0m=0  �[36merrored�[0m=0 �[36mchecked�[0m=4
�[36mDEBUG�[0m[03-03|16:38:19.529] redis producer: check responses starting
�[36mDEBUG�[0m[03-03|16:38:19.530] checkResponses                           �[36mresponded�[0m=0  �[36merrored�[0m=0 �[36mchecked�[0m=4
�[36mDEBUG�[0m[03-03|16:38:19.535] redis producer: check responses starting
�[36mDEBUG�[0m[03-03|16:38:19.535] checkResponses                           �[36mresponded�[0m=0  �[36merrored�[0m=0 �[36mchecked�[0m=4
�[36mDEBUG�[0m[03-03|16:38:19.541] redis producer: check responses starting
�[36mDEBUG�[0m[03-03|16:38:19.542] checkResponses                           �[36mresponded�[0m=0  �[36merrored�[0m=0 �[36mchecked�[0m=4
�[36mDEBUG�[0m[03-03|16:38:19.547] redis producer: check responses starting
�[36mDEBUG�[0m[03-03|16:38:19.548] checkResponses                           �[36mresponded�[0m=0  �[36merrored�[0m=0 �[36mchecked�[0m=4
�[36mDEBUG�[0m[03-03|16:38:19.553] redis producer: check responses starting
�[36mDEBUG�[0m[03-03|16:38:19.554] checkResponses                           �[36mresponded�[0m=0  �[36merrored�[0m=0 �[36mchecked�[0m=4
�[36mDEBUG�[0m[03-03|16:38:19.560] redis producer: check responses starting
�[36mDEBUG�[0m[03-03|16:38:19.561] checkResponses                           �[36mresponded�[0m=0  �[36merrored�[0m=0 �[36mchecked�[0m=4
�[36mDEBUG�[0m[03-03|16:38:19.568] redis producer: check responses starting
�[36mDEBUG�[0m[03-03|16:38:19.568] request timed out waiting for response   �[36mmsgId�[0m=1772555897564-16 �[36mallowedOldestId�[0m=1772555897568-0
�[36mDEBUG�[0m[03-03|16:38:19.569] request timed out waiting for response   �[36mmsgId�[0m=1772555897564-17 �[36mallowedOldestId�[0m=1772555897568-0
�[36mDEBUG�[0m[03-03|16:38:19.569] request timed out waiting for response   �[36mmsgId�[0m=1772555897564-18 �[36mallowedOldestId�[0m=1772555897568-0
�[36mDEBUG�[0m[03-03|16:38:19.569] request timed out waiting for response   �[36mmsgId�[0m=1772555897564-15 �[36mallowedOldestId�[0m=1772555897568-0
�[36mDEBUG�[0m[03-03|16:38:19.569] checkResponses                           �[36mresponded�[0m=0  �[36merrored�[0m=4 �[36mchecked�[0m=4
--- FAIL: TestRedisProduceComplex/one_producer,_all_consumers_are_active (2.01s)

📣 Thoughts on this report? Let Codecov know! | Powered by Codecov

Copy link
Member

@Tristan-Wilson Tristan-Wilson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the old code with the mutex was clearer and easier to reason about.

t.apiMutex.Lock()
defer t.apiMutex.Unlock()
t.arbFilteredTransactionsManager = arbFilteredTransactionsManager
t.queue <- func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also select for ctx.Done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines +59 to +77
case t.queue <- func() {
if t.arbFilteredTransactionsManager == nil {
reply <- errors.New("sequencer client not set yet")
return
}
txOpts := *t.txOpts
txOpts.Context = ctx
log.Info("Received call to filter transaction", "txHashToFilter", txHashToFilter.Hex())
tx, err := t.arbFilteredTransactionsManager.AddFilteredTransaction(&txOpts, txHashToFilter)
if err != nil {
log.Warn("Failed to filter transaction", "txHashToFilter", txHashToFilter.Hex(), "err", err)
reply <- err
return
}
log.Info("Submitted filter transaction", "txHashToFilter", txHashToFilter.Hex(), "txHash", tx.Hash().Hex())
return tx.Hash(), nil
txHash = tx.Hash()
reply <- nil
}:
case <-ctx.Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a situation where this is called and the closure is enqueued, and then later its context is canceled, and we return an error to the caller at line 84, but the closure is still enqueued and then will execute later, which will fail immediately inside the contract binding call and probably fail immediately due to the canceled context and log an error. At least it won't hang forever since the reply chan has size 1. I think the decoupling introduced by the channel isn't worth it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made another attempt with the channel, and now it should be a bit clearer

Copy link
Contributor

@diegoximenes diegoximenes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with the mutex approach is that, if the incoming call is aborted, the apiMutex.Lock call will not be canceled.
Current strategy solves that.
However this is not a big issue given the current architecture, e.g., a single sequencer replica calling this API sequentially.
This kind of change can make it easier to, in the future, adding retry logic when calling the sequencer, etc.
But the extra complexities introduced here are not worth it.

You can simplify mutex usage by making TransactionFiltererAPI.Filter not return a transaction hash, we don't use this response today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants