rpc: harden client lifecycle and cache synchronization#240
rpc: harden client lifecycle and cache synchronization#240
Conversation
Summary of ChangesHello @tuhalf, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the robustness and performance of the RPC client by introducing atomic operations and mutex-protected access for critical client state, such as closure status, blockquick window, and last ticket. It also optimizes block header fetching through parallel processing and refines the client lifecycle management to prevent race conditions and improve error handling, particularly during client shutdown and network operations. These changes collectively lead to a more stable, efficient, and maintainable RPC client. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request significantly improves the client's lifecycle management, concurrency safety, and performance by introducing atomic operations for the client's closed state, mutex-protected accessors for shared state, and a worker-based approach for parallel block header fetching. However, a critical race condition was identified in the new RPC timeout logic within waitResponse. This flaw allows a malicious or slow server to trigger an application-wide panic, leading to a Denial of Service. Additionally, a couple of minor race conditions were missed in the refactoring.
- make waitResponse cleanup synchronous via RemoveCallByID to avoid timeout/late-response send-on-closed-channel panics - move greet() delayed ticket fallback access to client.s into srv.Cast - move SubmitNewTicket() client.s reads into srv.Cast - add regression test: TestWaitResponseTimeoutWithLateClaimedResponseDoesNotPanic
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request significantly improves the RPC client's lifecycle management and concurrency safety through atomic operations for the client's closed state, mutex-protected accessors for shared state, and refactoring block header fetching with a worker pool. However, a potential Denial of Service (DoS) vulnerability was identified in the refactored GetBlockHeadersUnsafe2 function due to memory allocation based on an unvalidated size provided by the server. Implementing a size limit for this function is crucial to enhance robustness against malicious servers. Additionally, a new test case appears flawed and does not correctly validate its intended scenario.
rpc/client.go
Outdated
| func (client *Client) GetBlockHeadersUnsafe2(blockNumbers []uint64) ([]blockquick.BlockHeader, error) { | ||
| count := len(blockNumbers) | ||
| headersCount := 0 | ||
| if count == 0 { | ||
| return []blockquick.BlockHeader{}, nil | ||
| } | ||
|
|
||
| workerCount := blockHeaderFetchWorkerCount |
There was a problem hiding this comment.
The function GetBlockHeadersUnsafe2 allocates a map and a channel with a size determined by the length of the blockNumbers slice provided by the server. A malicious server could send a very large list of block numbers, causing the client to allocate an excessive amount of memory for the responses map and the jobs channel, potentially leading to a crash due to memory exhaustion. This is inconsistent with other parts of the codebase (e.g., lines 1254, 1310) that include "safety belt" checks to limit allocations based on server-supplied sizes.
Remediation: Add a limit to the number of block headers that can be fetched in a single call to GetBlockHeadersUnsafe2. A limit of 1024 would be sufficient for legitimate use cases where the window size is typically 100.
| func (client *Client) GetBlockHeadersUnsafe2(blockNumbers []uint64) ([]blockquick.BlockHeader, error) { | |
| count := len(blockNumbers) | |
| headersCount := 0 | |
| if count == 0 { | |
| return []blockquick.BlockHeader{}, nil | |
| } | |
| workerCount := blockHeaderFetchWorkerCount | |
| func (client *Client) GetBlockHeadersUnsafe2(blockNumbers []uint64) ([]blockquick.BlockHeader, error) { | |
| count := len(blockNumbers) | |
| if count == 0 { | |
| return []blockquick.BlockHeader{}, nil | |
| } | |
| if count > 1024 { | |
| return []blockquick.BlockHeader{}, fmt.Errorf("too many block headers requested: %d", count) | |
| } | |
| workerCount := blockHeaderFetchWorkerCount |
| } | ||
|
|
||
| func (client *Client) averageLatency() int64 { | ||
| return client.latencySum / client.latencyCount |
There was a problem hiding this comment.
Made latency metrics atomic to prevent potential race
| return latencySum / latencyCount | ||
| } | ||
|
|
||
| func (client *Client) addLatencyMeasurement(latency time.Duration) { |
There was a problem hiding this comment.
these values are read during client sorting and written from connection/watch goroutines at the same time. Made them atomic to prevent race.
| func (client *Client) addLatencyMeasurement(latency time.Duration) { | ||
| client.latencySum += latency.Milliseconds() | ||
| client.latencyCount++ | ||
| atomic.AddInt64(&client.latencySum, latency.Milliseconds()) |
There was a problem hiding this comment.
centralize safe access to shared state and normalize close-related errors.
| call.sender.Close() | ||
| // Remove the call synchronously on exit. This avoids a race where a timed out | ||
| // call channel is closed before the call is removed from the manager. | ||
| defer client.cm.RemoveCallByID(call.id) |
There was a problem hiding this comment.
This can crash, if there is a need for synchronous removal should use client.srv.Call
| } | ||
| res = resp | ||
| return res, nil | ||
| case <-timer.C: |
There was a problem hiding this comment.
Why is this new timeout needed? What is the bug being fixed?
| } | ||
|
|
||
| func (client *Client) insertCall(call *Call) (err error) { | ||
| timeout := client.callTimeout(func() { |
There was a problem hiding this comment.
Why removing this callTimeout here?
rpc/type.go
Outdated
| func (client *Client) LastValid() (uint64, crypto.Sha3) { | ||
| var bq *blockquick.Window | ||
| client.callTimeout(func() { bq = client.bq }) | ||
| bq := client.getBlockquickWindow() |
| }) | ||
| } | ||
|
|
||
| func (p *DataPool) markCacheRefreshIfNeeded(key string) (shouldRefresh bool) { |
There was a problem hiding this comment.
Can you explain the improvement here?
|
@tuhalf I've added a couple of comments there. For these individual changes they all look fine, but it's hard to day if they catch a bug or introduce a new one. Some more description of the root causes fixed and in what state these cause issues would be helpful. We can discuss tomrorrow |
| } | ||
| go client.recvMessageLoop() | ||
| ssl := client.s | ||
| go client.recvMessageLoop(ssl) |
There was a problem hiding this comment.
Seems like the issues is the write to SendCallPtr?
Write at 0x00c000517ad8 by goroutine 19:
github.com/diodechain/diode_client/rpc.(*Client).doStart()
/Users/runner/work/diode_client/diode_client/rpc/client.go:1454 +0x13d
github.com/diodechain/diode_client/rpc.(*Client).Start.func1()
/Users/runner/work/diode_client/diode_client/rpc/client.go:1431 +0x31
github.com/dominicletz/genserver.(*GenServer).loop()
/Users/runner/go/pkg/mod/github.com/dominicletz/genserver@v1.3.3/genserver.go:73 +0x51
github.com/dominicletz/genserver.New.gowrap1()
/Users/runner/go/pkg/mod/github.com/dominicletz/genserver@v1.3.3/genserver.go:51 +0x33
Previous read at 0x00c000517ad8 by goroutine 71:
github.com/diodechain/diode_client/rpc.(*callManager).Insert()
/Users/runner/work/diode_client/diode_client/rpc/callmanager.go:35 +0x1dd
github.com/diodechain/diode_client/rpc.(*Client).insertCall.func1()
/Users/runner/work/diode_client/diode_client/rpc/client.go:279 +0xe6
github.com/dominicletz/genserver.(*GenServer).CallTimeout()
/Users/runner/go/pkg/mod/github.com/dominicletz/genserver@v1.3.3/genserver.go:143 +0x12d
github.com/diodechain/diode_client/rpc.(*Client).callTimeout()
/Users/runner/work/diode_client/diode_client/rpc/client.go:249 +0x5a
github.com/diodechain/diode_client/rpc.(*Client).insertCall()
/Users/runner/work/diode_client/diode_client/rpc/client.go:274 +0x124
github.com/diodechain/diode_client/rpc.(*Client).CastContext()
/Users/runner/work/diode_client/diode_client/rpc/client.go:269 +0x2e4
github.com/diodechain/diode_client/rpc.(*Client).CallContext()
/Users/runner/work/diode_client/diode_client/rpc/client.go:292 +0x8d
github.com/diodechain/diode_client/rpc.(*Client).GetBlockHeaderUnsafe()
/Users/runner/work/diode_client/diode_client/rpc/client.go:455 +0xe4
github.com/diodechain/diode_client/rpc.(*Client).GetBlockHeadersUnsafe2.func1()
/Users/runner/work/diode_client/diode_client/rpc/client.go:480 +0x127
github.com/diodechain/diode_client/rpc.(*Client).GetBlockHeadersUnsafe2.gowrap1()
/Users/runner/work/diode_client/diode_client/rpc/client.go:491 +0x41
| doCleanup = false | ||
| return | ||
| } | ||
| client.isClosed = true |
There was a problem hiding this comment.
Before this change:
Write at 0x00c0002478d8 by goroutine 566:
github.com/diodechain/diode_client/rpc.(*Client).Close.func1()
/Users/runner/work/diode_client/diode_client/rpc/client.go:1406 +0x50
github.com/dominicletz/genserver.(*GenServer).CallTimeout()
/Users/runner/go/pkg/mod/github.com/dominicletz/genserver@v1.3.3/genserver.go:143 +0x108
github.com/diodechain/diode_client/rpc.(*Client).callTimeout()
/Users/runner/work/diode_client/diode_client/rpc/client.go:249 +0x50
github.com/diodechain/diode_client/rpc.(*Client).Close()
/Users/runner/work/diode_client/diode_client/rpc/client.go:1401 +0x120
github.com/diodechain/diode_client/rpc.(*Client).recvMessageLoop()
/Users/runner/work/diode_client/diode_client/rpc/bridge.go:416 +0x24c
github.com/diodechain/diode_client/rpc.(*Client).doStart.gowrap1()
/Users/runner/work/diode_client/diode_client/rpc/client.go:1453 +0x34
Previous read at 0x00c0002478d8 by goroutine 460:
github.com/diodechain/diode_client/rpc.(*Client).insertCall.func1()
/Users/runner/work/diode_client/diode_client/rpc/client.go:275 +0x40
github.com/dominicletz/genserver.(*GenServer).CallTimeout()
/Users/runner/go/pkg/mod/github.com/dominicletz/genserver@v1.3.3/genserver.go:143 +0x108
github.com/diodechain/diode_client/rpc.(*Client).callTimeout()
/Users/runner/work/diode_client/diode_client/rpc/client.go:249 +0x50
github.com/diodechain/diode_client/rpc.(*Client).insertCall()
/Users/runner/work/diode_client/diode_client/rpc/client.go:274 +0x108
github.com/diodechain/diode_client/rpc.(*Client).CastContext()
/Users/runner/work/diode_client/diode_client/rpc/client.go:269 +0x260
github.com/diodechain/diode_client/rpc.(*Client).CallContext()
/Users/runner/work/diode_client/diode_client/rpc/client.go:292 +0x58
github.com/diodechain/diode_client/rpc.(*Client).GetBlockHeaderUnsafe()
/Users/runner/work/diode_client/diode_client/rpc/client.go:455 +0x90
github.com/diodechain/diode_client/rpc.(*Client).GetBlockHeadersUnsafe2.func1()
/Users/runner/work/diode_client/diode_client/rpc/client.go:480 +0xb0
github.com/diodechain/diode_client/rpc.(*Client).GetBlockHeadersUnsafe2.gowrap1()
/Users/runner/work/diode_client/diode_client/rpc/client.go:491 +0x40
The goid upgrade resolved most genserver race behavior, so remove extra workaround locking layered on top of actor-based access. - drop Client.stateMu and bq/lastTicket accessor helpers - route bq/lastTicket reads and writes back through callTimeout/srv context - restore insertCall serialization via callTimeout - remove unused CheckTicket helper Keep the essential race/panic fixes (lifecycle hardening, late-response cleanup, ticket-path serialization) while reducing complexity and keeping the codebase maintainable.
This pull request introduces significant improvements to the client state management, ticket handling, and block header fetching in the
rpc/client.goandrpc/bridge.gofiles. The most important changes include replacing theisClosedboolean with atomic operations for thread-safe client closure, refactoring ticket and window access to use mutex-protected methods, introducing worker-based parallel block header fetching, and updating various methods to use the new state management approach. These changes enhance concurrency safety, improve performance, and simplify client lifecycle handling.Client state management and concurrency improvements:
isClosedboolean with an atomicclosedflag and providedisClientClosedandsetClientClosedmethods for thread-safe client closure checks and updates. All relevant checks and updates now use these methods. ([[1]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL75-L79),F2fb1f2eL121R121,[[2]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL1385-R1533),[[3]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL1402-R1573))blockquick.WindowandlastTicketfields, replacing direct access and previous use ofcallTimeoutfor state updates. (F2fb1f2eL121R121,[[1]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL417-R518),[[2]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL512-R641),[[3]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL1221-R1364))Ticket handling and lifecycle:
SubmitNewTicketmethod. ([[1]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL613-R741),[[2]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL623-R761),[[3]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL668-R812),[[4]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL682-R821),[[5]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL702-R841),[[6]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcR873-R875),[[7]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL743-R889))[[1]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcR873-R875),[[2]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL743-R889))Block header fetching and performance:
GetBlockHeadersUnsafe2, improving performance and handling client closure during fetches. AddedblockHeaderFetchWorkerCountconstant. ([[1]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcR39),[[2]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL470-R629))[rpc/client.goL470-R629](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL470-R629))Client lifecycle and message loop:
recvMessageLoopto capture the SSL connection at start and updated all relevant usage to avoid races on the connection pointer during closure. ([[1]](https://github.com/diodechain/diode_client/pull/240/files#diff-de3f4b06f0425e6e4a16b2104ae57f9ba9209e17684f3a57d8b0c87e3df59c3dL393-R396),[[2]](https://github.com/diodechain/diode_client/pull/240/files#diff-de3f4b06f0425e6e4a16b2104ae57f9ba9209e17684f3a57d8b0c87e3df59c3dL405-R416),[[3]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL1432-R1608))[[1]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL1402-R1573),[[2]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL1432-R1608))Timeout handling and error reporting:
waitResponse, now using a timer and reporting RPC timeouts with more detailed logging and error propagation. ([[1]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL205-R269),[[2]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcR289-R297))callTimeoutand related methods to standardize closed client errors and avoid duplicate error reporting. ([[1]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL247-R326),[[2]](https://github.com/diodechain/diode_client/pull/240/files#diff-bcca4c7258b402c27ba36f7d0889e36df3bf4b9fa9556a64a0dfd57d95e384fcL274-R354))These changes collectively make the client code safer for concurrent use, more robust against race conditions, and easier to maintain.