Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Welcome @mingley! |
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughA trailing newline is added to Cargo.toml. The TSO timestamp module is refactored to replace inline response handling with a dedicated handler function, adjust wake-up semantics to only wake on queue state transitions, introduce batch observation, and add comprehensive unit tests covering new behavior. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/pd/timestamp.rs (1)
78-82:⚠️ Potential issue | 🟡 MinorBackground task errors are silently discarded due to missing
JoinHandlehandling.The
Result<()>return type and explicitOk(())at line 116 align cleanly with the?usage inside the function. However, theJoinHandlereturned bytokio::spawn(run_tso(...))at line 62 is never stored or awaited. Since errors can occur at both thepd_client.tso()call (line 99) and withinallocate_timestamps()(line 105), failures in the background task will go unnoticed and the connection closure will only be discovered when callers receive a channel-closed error instead of the root cause.Consider storing the
JoinHandleand handling its potential error, or spawn a task that logs/propagates failures.
🧹 Nitpick comments (3)
benches/tso_waker_policy.rs (2)
20-36: The "old" and "new" response benchmarks have asymmetric work, which is expected but worth noting.In
response_policy_old,wake()is called unconditionally on every iteration, while inresponse_policy_new, it's called only on the full→non-full transition (~once per 1024 iterations). The reported speedup primarily measures the cost of not callingwake(), rather than the overhead of the conditional check itself. This is fine for validating the optimization's effect, but the doc and PR description should be clear that the speedup reflects the amortized skip rate under this specific simulation pattern.Also applies to: 38-57
9-11: Consider documenting whatFULL_EVERYandFULL_WINDOWrepresent.These simulation parameters control how often the queue becomes full in the benchmark, directly affecting the measured speedup ratio. A brief comment explaining their role would help future readers understand and tune the benchmark.
doc/tso_waker_criterion.md (1)
1-58: Consider noting that results should be re-run when the benchmark or production code changes.Hardcoded benchmark results in committed documentation risk becoming stale as the code evolves. Consider adding a note that these numbers are a point-in-time snapshot and should be re-collected after significant changes to
timestamp.rsor the benchmark itself.
c2fdbf5 to
a66b5e6
Compare
|
Addressed the CodeRabbit feedback in commit
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
Addressed local review finding #1 in
|
839c263 to
8d5bb72
Compare
558b5b5 to
cd0eaa5
Compare
cd0eaa5 to
5e37adb
Compare
Signed-off-by: Michael Ingley <michael.ingley@gmail.com>
5e37adb to
a1312ac
Compare
Signed-off-by: Michael Ingley <mingley@linkedin.com>
Signed-off-by: Michael Ingley <michael.ingley@gmail.com>
|
@coderabbitai review\n\nPlease run a fresh review on the latest head (). Focus on lock-contention wake safety and the new response-loop wake tests. |
|
@coderabbitai review\n\nPlease run a fresh review on the latest head: 10f685c. Focus on lock-contention wake safety and the new response-loop wake tests. |
|
✅ Actions performedReview triggered.
|
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/pd/timestamp.rs`:
- Around line 61-67: TimestampOracle::new currently calls tokio::spawn directly
(spawning run_tso with cluster_id, pd_client, request_rx) which will panic if no
Tokio runtime exists; change TimestampOracle::new to guard the spawn by calling
tokio::runtime::Handle::try_current() and, if a handle is present, use
handle.spawn(...) to run run_tso, otherwise return a clear error (or change the
constructor to return a Result) or document the runtime requirement; ensure you
update the TimestampOracle::new signature/return path and callers accordingly so
missing runtime becomes a controlled error rather than a panic.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
Cargo.tomlsrc/pd/timestamp.rs
✅ Files skipped from review due to trivial changes (1)
- Cargo.toml
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/pd/timestamp.rs (1)
161-168:⚠️ Potential issue | 🔴 CriticalLost-wake risk: stack-local
MutexLockFuturedrops its waiter from the mutex queue between polls.Each
poll_nextcall creates a newMutexLockFutureon the stack (line 161). When the lock is contended,poll(cx)returnsPending, registers a waiter in tokio'sMutexqueue, and the function returns at line 167. As the function returns, the stack-local future is dropped — and with it, tokio'sMutexremoves the waiter from its queue.Consequence: when the response handler releases the lock, there is no waiter to notify. The stream has no registered waker —
poll_recvwas never called (so the channel has no waker), andself_wakerwas not registered either. The stream stalls indefinitely until an unrelated external event re-polls it.The existing test (
poll_next_waits_on_mutex_when_lock_is_contended, line 468) does not catch this because it manually re-polls the stream after dropping the guard, rather than verifying that the waker was automatically invoked when the lock was released.Fix: persist the lock future across polls so the waiter remains in the queue. Store it as a field (e.g.,
Option<Pin<Box<dyn Future<Output = MutexGuard<…>>>>>) or refactor to a two-phase state machine usingtokio::sync::Mutex::lock_ownedwith a pinned-boxed future field.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/pd/timestamp.rs` around lines 161 - 168, The code creates a stack-local Mutex lock future inside poll_next (the local pending_requests/MuexLockFuture created at let pending_requests = this.pending_requests.lock(); pin_mut!(pending_requests);) which is dropped between polls and causes lost-wake; persist the lock future across polls instead of recreating it each call: add a field on the stream (e.g., Option<Pin<Box<dyn Future<Output = tokio::sync::OwnedMutexGuard<...>>>>> or Option<Pin<Box<dyn Future<Output = tokio::sync::MutexGuard<...>>>>>), move the pending_requests lock future into that field when first polling, poll that pinned boxed future on subsequent poll_next invocations, and only take the resulting guard out of the field when it is Ready; alternatively implement a two-phase state machine using tokio::sync::Mutex::lock_owned with a stored pinned future to ensure the waiter remains registered across returns from poll_next.
♻️ Duplicate comments (1)
src/pd/timestamp.rs (1)
61-67:tokio::spawnwithout runtime guard — previously flagged.The prior review noted that calling
tokio::spawnin a synchronous function without verifying an active Tokio runtime will panic if none exists. This concern remains unaddressed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/pd/timestamp.rs` around lines 61 - 67, The background task uses tokio::spawn directly which will panic if there's no active Tokio runtime; change the call to first check for an existing runtime with tokio::runtime::Handle::try_current() and, if present, use handle.spawn(async move { if let Err(err) = run_tso(...).await { warn!(...) } }); if try_current() returns Err, create a dedicated runtime (e.g. tokio::runtime::Builder::new_current_thread().enable_all().build()) and use that runtime.spawn(...) or run the future on that runtime inside a std::thread so the task runs without requiring a caller-provided runtime; reference run_tso and the current tokio::spawn usage when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/pd/timestamp.rs`:
- Around line 467-487: The test poll_next_waits_on_mutex_when_lock_is_contended
currently manually re-polls the stream after dropping the lock and so doesn't
assert that the Mutex actually fired the stored waker; update the test to drop
the lock_guard and then assert that wake_counter.wakes has increased (i.e., the
waker was invoked) instead of relying solely on an immediate manual poll;
specifically, in poll_next_waits_on_mutex_when_lock_is_contended use
wake_counter (from wake_counter_context) to wait/assert the atomic wakes count
increments after drop(lock_guard) (optionally with a short spin/timeout to avoid
flakiness) and then finally poll the pinned stream to confirm readiness — this
will validate the automatic wake-up behavior of the Mutex/MutexLockFuture rather
than a manual re-poll.
---
Outside diff comments:
In `@src/pd/timestamp.rs`:
- Around line 161-168: The code creates a stack-local Mutex lock future inside
poll_next (the local pending_requests/MuexLockFuture created at let
pending_requests = this.pending_requests.lock(); pin_mut!(pending_requests);)
which is dropped between polls and causes lost-wake; persist the lock future
across polls instead of recreating it each call: add a field on the stream
(e.g., Option<Pin<Box<dyn Future<Output = tokio::sync::OwnedMutexGuard<...>>>>>
or Option<Pin<Box<dyn Future<Output = tokio::sync::MutexGuard<...>>>>>), move
the pending_requests lock future into that field when first polling, poll that
pinned boxed future on subsequent poll_next invocations, and only take the
resulting guard out of the field when it is Ready; alternatively implement a
two-phase state machine using tokio::sync::Mutex::lock_owned with a stored
pinned future to ensure the waiter remains registered across returns from
poll_next.
---
Duplicate comments:
In `@src/pd/timestamp.rs`:
- Around line 61-67: The background task uses tokio::spawn directly which will
panic if there's no active Tokio runtime; change the call to first check for an
existing runtime with tokio::runtime::Handle::try_current() and, if present, use
handle.spawn(async move { if let Err(err) = run_tso(...).await { warn!(...) }
}); if try_current() returns Err, create a dedicated runtime (e.g.
tokio::runtime::Builder::new_current_thread().enable_all().build()) and use that
runtime.spawn(...) or run the future on that runtime inside a std::thread so the
task runs without requiring a caller-provided runtime; reference run_tso and the
current tokio::spawn usage when making the change.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
Cargo.tomlsrc/pd/timestamp.rs
✅ Files skipped from review due to trivial changes (1)
- Cargo.toml
Signed-off-by: Michael Ingley <michael.ingley@gmail.com>
|
@coderabbitai please re-review the latest updates in 7530dbf; both major findings are addressed and threads are resolved. |
|
✅ Actions performedReview triggered.
|
Summary
This PR hardens TSO sender/response coordination in
src/pd/timestamp.rsto remove a lock-contention lost-wake risk while preserving throughput optimizations.Quality objective:
Rationale
Why this change exists
The prior optimization path used a custom lock-contention handshake (
try_lock+ shared waiting flag + custom wake coordination). That reduced contention overhead but still left an interleaving where sender progress could depend on a future external wake, creating stall risk.This change removes that fragile handshake and relies on the async mutex waiter path for lock-contention wakeup guarantees.
Deep Dive: Concurrency Model
There are two coordination loops:
TsoRequestStream::poll_nexthandle_tso_responsesShared state:
pending_requests: Mutex<VecDeque<RequestGroup>>sending_future_waker: AtomicWakerUpdated behavior:
Lock contention in sender path now uses
pending_requests.lock().poll(cx).This places the task on the mutex wait queue and gives deterministic wakeup when the lock becomes available.
Response path wake policy remains conditional:
was_fullbeforeallocate_timestamps,full -> non-fulltransition.This removes a race-prone protocol and narrows wake logic to queue-capacity transitions.
Why this does not break backward compatibility
allocate_timestampsinvariants are preserved).MAX_PENDING_COUNTstill bounds in-flight groups).Net effect: internal scheduling becomes safer under contention without changing externally observable client API behavior.
Benchmark Results
Measured with a temporary standalone Rust microbenchmark harness (
/tmp/pr529_tso_bench) to compare policy-level overhead for pre-PR logic (before) and latest PR logic (after). Harness was run in--releasemode and not committed to this repository.Benchmark configuration:
Response Wake Policy
Speedup (median): 7.10x
Self-Waker Registration Policy
Speedup (median): 9.49x
Notes:
File Scope
src/pd/timestamp.rsCargo.toml(branch-level context change already present in PR)Testing Done
Executed locally:
cargo +1.93.0 test pd::timestamp --lib-> pass (14 passed)cargo +1.93.0 test-> pass (67 unit passed,49 doc passed)cargo +1.93.0 run --releasein/tmp/pr529_tso_bench) -> metrics aboveFocused concurrency coverage in
src/pd/timestamp.rsincludes:poll_next_waits_on_mutex_when_lock_is_contendedpoll_next_registers_self_waker_when_pending_queue_is_fullpoll_next_does_not_register_self_waker_when_queue_not_fullhandle_tso_responses_wakes_sender_when_queue_transitions_from_fullhandle_tso_responses_does_not_wake_sender_when_queue_was_not_fullhandle_tso_responses_wakes_sender_once_for_each_full_to_non_full_transitionCompatibility
No public API surface change is introduced by this PR.
Summary by CodeRabbit
Tests
Chores