Skip to content

Conversation

@wolfiestyle
Copy link
Contributor

@wolfiestyle wolfiestyle commented Jan 16, 2026

HCD-579

Summary

  • Fixed multiple hanging issues when using flush_group and flush_and_sync_group with batched handlers
  • Updated documentation to accurately reflect the groups API behavior

Bug Fixes

flush_and_sync_group:

  • Implemented two-pass flush strategy to prevent deadlocks with cascading messages (handlers that spawn other messages)
  • Added loop to repeatedly flush receivers until all are idle, handling partial batches correctly
  • Added is_idling() checks and 32-iteration fuse to prevent infinite loops

flush_group:

  • Now flushes receivers before waiting, so partial batches (fewer messages than batch_size) no longer cause indefinite hangs

Documentation

  • Rewrote README Task Grouping section with accurate descriptions of all flush methods
  • Documented flush_current_group() for use within handlers
  • Added batch handler behavior documentation (per-group batching)
  • Added group cleanup API documentation (GroupRemovalResult, remove_group, etc.)

Tests Added

  • test_flush_group_flushes_partial_batches
  • test_flush_and_sync_group_flushes_partial_batches
  • test_flush_and_sync_group_waits_for_cascading_messages
  • test_flush_and_sync_group_handles_deep_nesting
  • test_flush_and_sync_group_batched_with_cascading

Test plan

  • All 42 group tests pass
  • cargo run --example demo_groups works correctly
  • No hangs with partial batches
  • No hangs with cascading messages

Summary by CodeRabbit

  • New Features

    • Added non-blocking flush usable from within handlers to avoid deadlocks and wait for child work without blocking.
  • Enhancements

    • More robust flush/sync behavior to proactively handle partial batches, cascading messages, and stabilize completion across iterations.
    • Improved runtime logging and observability around group processing and flush operations.
  • Documentation

    • Expanded README and examples covering group propagation, flushing semantics, batching, cleanup, and utilities.
  • Tests

    • Large new test suite covering flush/sync semantics, batching, cascading, propagation, cleanup, and error scenarios.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 16, 2026

Walkthrough

Adds README documentation for group usage and flushing semantics; introduces flush_current_group (Bus) and flush_nowait (Receiver); enhances flush_group and flush_and_sync_group with multi-phase flushing, stabilization logic, and added logging; expands runtime logs in group registry; and adds extensive group-related tests.

Changes

Cohort / File(s) Change Summary
Documentation
README.md, examples/demo_groups.rs
Expanded README with group usage, propagation rules, flush method docs, handler-internal flushing guidance, batch interactions, cleanup utilities; updated demo comment to reference flush_group.
Group Registry Tracing
src/group.rs
Added debug/warn logs in increment, decrement_by, and wait_idle; minor refactor to read count into a local variable in wait_idle.
Bus Group Methods
src/lib.rs
Added pub async fn flush_current_group(&self); enhanced flush_group() to proactively flush receivers before waiting; substantially upgraded flush_and_sync_group() with two-phase flushing (non-blocking then blocking), stabilization loop (up to 32 iterations), per-iteration idle detection, and added logging.
Receiver Flush Methods
src/receiver.rs
Added pub fn flush_nowait(&self, bus: &Bus) to send Action::Flush without awaiting, returning immediately and logging on failure.
Group Testing Suite
tests/test_groups.rs
Large set of new tests covering partial-batch flushes, cascading/nested messages, batched handlers with groups, per-group isolation, group cleanup/removal, group_id propagation patterns, request/response interactions, panic/error handling, and edge cases.

Sequence Diagram(s)

sequenceDiagram
    participant Handler as Handler/Task
    participant Bus
    participant GroupReg as GroupRegistry
    participant Receiver

    Handler->>Bus: flush_and_sync_group(group_id, force=false)
    activate Bus
    Bus->>Bus: log(start)

    Note over Bus: Phase 1 — Non-blocking trigger
    Bus->>Receiver: flush_nowait() for each receiver in group
    Receiver-->>Bus: returns immediately

    Note over Bus: Phase 2 — Blocking stabilization loop (≤32 iters)
    loop up to 32
        Bus->>Receiver: flush() for each receiver in group
        Receiver-->>Bus: completes when processing done
        Bus->>GroupReg: get_group_count(group_id)
        GroupReg-->>Bus: current_count
        alt current_count == 0
            Bus->>Bus: stabilized -> break
        else current_count > 0
            Bus->>Bus: continue
        end
    end

    Note over Bus: Final partial-batch flush and wait
    Bus->>Receiver: partial flush for remaining batches
    Bus->>GroupReg: wait_idle(group_id)
    GroupReg-->>Bus: notify idle

    Bus->>Bus: synchronized group operations complete
    Bus-->>Handler: return
    deactivate Bus
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • gonzalog

Poem

🐰 I hopped through docs and logs with cheer,
I nudged flushes gently, then watched them clear,
Non-blocking hops, then patient loops unwind,
Groups settle softly — no loose threads to find,
Tests drum a burrow beat, all tidy and fine. 🥕✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly identifies the main bug fix (hang on group batches) and references the ticket ID (HCD-579), accurately summarizing the primary change across all modified files.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alanhernandez/hcd-579-fix-hang-on-group-batches

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link

claude bot commented Jan 16, 2026

Code Review: HCD-579 - Fix hang on group batches

Thank you for this comprehensive fix! This PR addresses critical hanging issues with group batching and adds excellent test coverage. Here's my detailed review:


✅ Strengths

1. Excellent Problem Analysis

The PR correctly identifies and fixes two distinct hanging scenarios:

  • Partial batches not being flushed in flush_group
  • Cascading message deadlocks in flush_and_sync_group

2. Comprehensive Test Coverage

The addition of 907 lines of tests is outstanding. Key tests added:

  • test_flush_group_flushes_partial_batches
  • test_flush_and_sync_group_flushes_partial_batches
  • test_flush_and_sync_group_waits_for_cascading_messages
  • test_flush_and_sync_group_handles_deep_nesting
  • test_flush_and_sync_group_batched_with_cascading

These cover edge cases thoroughly.

3. Documentation Improvements

The README updates are very helpful, especially:

  • Clear comparison table for flush methods
  • Explanation of batch handler behavior
  • Group cleanup API documentation

4. Defensive Programming

The saturating_sub in decrement_by prevents underflow while maintaining a debug assertion is good practice.


🔍 Issues & Suggestions

1. Magic Number: Fuse Count (src/lib.rs:897)

let fuse_count = 32i32;

Issue: The value 32 appears arbitrary without justification.

Suggestion:

  • Add a comment explaining why 32 iterations is sufficient
  • Consider making this a named constant: const MAX_FLUSH_ITERATIONS: i32 = 32;
  • Or derive it from a theoretical maximum nesting depth

Severity: Minor - works fine but lacks clarity


2. Potential Infinite Loop Risk (src/lib.rs:900-954)

loop {
    iters += 1;
    if iters > fuse_count {
        log::warn\!(...);
        break;  // Only logs warning, doesn't return error
    }
    // ... flush logic
}

Issue: If the fuse triggers, execution continues to the final flush and sync. This could:

  • Mask the underlying problem
  • Lead to incomplete processing
  • Make debugging harder in production

Suggestions:

  1. Consider returning a Result from flush_and_sync_group to signal this failure condition
  2. At minimum, add documentation about this edge case behavior
  3. Consider adding metrics/telemetry when the fuse triggers

Severity: Moderate - edge case handling could be more robust


3. Logging Verbosity (Throughout)

Multiple new log::info\!, log::debug\!, and log::trace\! calls were added.

Observations:

  • log::info\! in flush_and_sync_group (lines 884, 888, 958, 970, 977, 982) might be too verbose for production
  • Consider if these should be debug\! or trace\! level instead
  • Good: Comprehensive tracing for debugging
  • Concern: Performance impact in high-throughput scenarios

Suggestion: Review log levels - info should be for important state changes, while iteration details might be better as debug or trace.

Severity: Minor - tuning issue


4. Memory Overhead: Duplicate Receiver Iteration (src/lib.rs:911-967)

let receiver_ids = self.inner.group_registry.receivers_for_group(group_id);
// First pass
for r in self.inner.receivers.iter() {
    if receiver_ids.contains(&r.id()) { ... }
}
// Second pass
for r in self.inner.receivers.iter() {
    if receiver_ids.contains(&r.id()) { ... }
}
// ... then again at line 962-967

Issue: Iterating all receivers and filtering multiple times.

Suggestion: Pre-filter receivers once:

let relevant_receivers: Vec<_> = self.inner.receivers.iter()
    .filter(|r| receiver_ids.contains(&r.id()))
    .collect();

for r in &relevant_receivers {
    if \!r.is_idling() {
        r.flush_nowait(self);
    }
}

Severity: Low - likely not a bottleneck unless receiver count is very high


5. Race Condition Consideration (src/lib.rs:922)

if receiver_ids.contains(&r.id()) && \!r.is_idling() {
    r.flush_nowait(self);
}

Question: Is there a TOCTOU (time-of-check-time-of-use) race between checking \!r.is_idling() and calling flush_nowait?

  • The receiver could become idle between the check and the flush
  • This seems benign (flushing an idle receiver should be a no-op)
  • But worth documenting this is intentional/safe

Severity: Info - likely not an issue but worth confirming


6. Error Handling in flush_nowait (src/receiver.rs:1107-1109)

pub fn flush_nowait(&self, bus: &Bus) {
    if self.inner.send_action(bus, Action::Flush).is_err() {
        warn\!("flush_nowait failed\!");
    }
}

Issue: Silent failure with only a warning log.

Considerations:

  • Should this return a Result?
  • When would send_action fail, and is ignoring it always safe?
  • If the receiver is closed/dropped, should the caller know?

Severity: Minor - depends on failure semantics


7. Documentation Gap: flush_current_group (src/lib.rs:1014-1047)

The method is well-documented, but:

Missing:

  • What happens if called recursively (handler A flushes, which processes handler B, which calls flush_current_group)?
  • Interaction with the fuse in flush_and_sync_group
  • Whether it's safe to call from sync handlers

Suggestion: Add notes about these edge cases.

Severity: Minor - documentation enhancement


8. Type Safety: GroupId (Throughout)

GroupId appears to be a type alias (likely u64). Consider using a newtype pattern:

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub struct GroupId(u64);

This prevents accidentally mixing up GroupIds with receiver IDs or other numeric types.

Severity: Low - more of a long-term API consideration


🔒 Security Considerations

No security issues identified. The changes are internal synchronization logic without external inputs or security boundaries.


⚡ Performance Considerations

Positive:

  • Non-blocking flush_nowait prevents deadlocks efficiently
  • Two-pass strategy is well-motivated

Watch Out For:

  • Repeated receiver iteration in loops
  • Log call overhead (even if logs are disabled, string formatting may occur depending on macro implementation)
  • The loop with fuse could iterate 32 times in pathological cases

Recommendation: Consider adding metrics/tracing around iteration counts to monitor real-world behavior.


🧪 Test Coverage Assessment

Excellent coverage of the bug scenarios. Minor suggestions:

  1. Edge Case: Test with fuse_count exceeded (simulate a scenario requiring >32 iterations)
  2. Concurrency: Test multiple groups flushing concurrently
  3. Error Paths: Test behavior when receivers fail/close during flush
  4. Performance: Add a benchmark for flush_and_sync_group with various nesting depths

📋 Final Recommendations

Must Address:

  • None - the core functionality is sound

Should Consider:

  1. Extract magic number 32 to named constant with documentation
  2. Add documentation about fuse-exceeded behavior
  3. Review log levels (info → debug for iteration details)
  4. Consider returning errors when fuse triggers instead of silent continuation

Nice to Have:

  1. Pre-filter receivers to reduce iteration overhead
  2. Add test for fuse-exceeded scenario
  3. Enhance flush_current_group documentation for edge cases

✅ Verdict

LGTM with minor suggestions. This is a well-thought-out fix with excellent test coverage. The core logic is sound, and the identified issues are mostly about robustness, observability, and documentation polish rather than correctness.

The two-pass flush strategy is clever and well-documented. The addition of flush_nowait is the right abstraction for preventing deadlocks.

Great work! 🎉

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
README.md (1)

27-145: Update documentation: Batch handlers DO propagate group context

The claim on line 111 that "Batch handlers don't propagate group context to child messages" is incorrect. The code shows that all handler types—batch and non-batch—use Bus::with_group_context() to propagate group_id to child messages.

Batch handler implementations in src/receivers/buffer_unordered_batched/execution.rs and src/receivers/synchronize_batched/execution.rs explicitly call Bus::with_group_context(gid, ...) before executing the handler, identical to non-batch handlers.

Update the batch handler documentation section to reflect that batch handlers do propagate group context, similar to other handler types, though messages are still batched per group_id.

🤖 Fix all issues with AI agents
In `@src/lib.rs`:
- Around line 985-1047: The README text contradicts the implementation of
flush_current_group(): update the docs so they state that flush_current_group()
triggers non-blocking flushes (it calls Receiver::flush_nowait()) and does not
itself wait for child handlers to finish, and clarify that final
synchronization/waiting is performed by flush_and_sync_group()/the outer sync
mechanism which observes group_registry.processing_count() to detect completion;
reference flush_current_group(), flush_nowait(), flush_and_sync_group(), and
group_registry.processing_count() in the updated wording to make the distinction
explicit.

In `@tests/test_groups.rs`:
- Around line 3226-3358: The test spawns OS threads and calls
tokio::runtime::Handle::current() inside them (in
BatchWithCascadeHandler::handle in
test_flush_and_sync_group_batched_with_cascading), which can panic or drop
sends; instead capture a runtime handle in the test thread (let rt =
Handle::current()) and pass/clone that into the batch handler so
BatchWithCascadeHandler::handle uses rt.spawn(async move {
bus.send(...).await.unwrap(); }) rather than std::thread::spawn and
Handle::current() inside the thread; also ensure spawned tasks are awaited (or
tracked) so flush_and_sync_group waits for child processing and add an assertion
checking child_processed.load(...) to make the test deterministic.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c88f85d and b28d295.

📒 Files selected for processing (6)
  • README.md
  • examples/demo_groups.rs
  • src/group.rs
  • src/lib.rs
  • src/receiver.rs
  • tests/test_groups.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

Use appropriate SendOptions (Broadcast, Direct(id), Except(id), Random, or Balanced) when sending messages through the bus to control routing strategy

Files:

  • examples/demo_groups.rs
  • src/receiver.rs
  • src/group.rs
  • tests/test_groups.rs
  • src/lib.rs
🧠 Learnings (4)
📚 Learning: 2026-01-08T15:42:29.191Z
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Applies to messagebus/src/**/*.rs : Use `#[derive(Message)]` with appropriate attributes for types sent through the bus, such as `#[message(clone)]` for broadcast-enabled messages, `#[message(shared)]` for remote transport serialization, and `#[type_tag("custom::name")]` or `#[namespace("my_namespace")]` for custom type tags

Applied to files:

  • examples/demo_groups.rs
  • README.md
📚 Learning: 2026-01-08T15:42:29.191Z
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Applies to messagebus/src/**/*.rs : Choose the appropriate receiver type (BufferUnorderedAsync/Sync for concurrent processing, BufferUnorderedBatchedAsync/Sync for batched concurrent processing, SynchronizedAsync/Sync for sequential processing, or SynchronizedBatchedAsync/Sync for batched sequential processing) based on whether message processing should be concurrent or sequential

Applied to files:

  • src/receiver.rs
  • tests/test_groups.rs
  • src/lib.rs
  • README.md
📚 Learning: 2026-01-08T15:42:29.191Z
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Applies to messagebus/src/**/*.rs : Implement one of the 12 handler traits (Handler, AsyncHandler, BatchHandler, AsyncBatchHandler, SynchronizedHandler, AsyncSynchronizedHandler, BatchSynchronizedHandler, AsyncBatchSynchronizedHandler, LocalHandler, LocalAsyncHandler, LocalBatchHandler, or LocalAsyncBatchHandler) based on the required concurrency model (thread-safe, synchronized, or local) and whether messages should be batched or processed individually

Applied to files:

  • tests/test_groups.rs
  • src/lib.rs
  • README.md
📚 Learning: 2026-01-08T15:42:29.191Z
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Applies to **/*.rs : Use appropriate SendOptions (Broadcast, Direct(id), Except(id), Random, or Balanced) when sending messages through the bus to control routing strategy

Applied to files:

  • README.md
🧬 Code graph analysis (3)
src/receiver.rs (1)
tests/test_idle.rs (1)
  • bus (48-48)
src/group.rs (3)
src/envelop.rs (5)
  • group_id (180-180)
  • group_id (308-310)
  • group_id (455-457)
  • group_id (522-524)
  • group_id (591-593)
src/error.rs (1)
  • group_id (422-424)
src/receiver.rs (1)
  • drop (552-556)
src/lib.rs (3)
src/group.rs (1)
  • processing_count (261-267)
src/receiver.rs (3)
  • id (109-109)
  • id (418-420)
  • id (789-791)
src/relay.rs (1)
  • id (128-130)
🔇 Additional comments (14)
examples/demo_groups.rs (1)

6-6: LGTM!

The documentation update accurately reflects the usage of flush_group for waiting on group completion, aligning with the PR's enhancements to group-based flushing semantics.

src/receiver.rs (1)

1100-1110: LGTM!

The flush_nowait method is a well-designed addition that complements the existing blocking flush method. The non-blocking semantics are essential for the two-pass flush strategy in flush_and_sync_group and for use within handlers via flush_current_group to prevent deadlocks. The warning-level log on failure is appropriate for a fire-and-forget operation.

src/group.rs (3)

174-182: LGTM!

The trace-level logging in increment provides valuable observability for debugging group counter flow without impacting performance in production.


273-321: LGTM!

The debug and trace logging throughout wait_idle provides excellent visibility into the idle-waiting flow. The refactor to store count in a local variable (line 296) before the comparison is a minor improvement that makes the log message more accurate.


215-244: Good addition of observability logging.

The trace-level logging for successful decrements is appropriate. The warning for unknown group_id will be triggered when force_remove_group is called while handlers are still completing—this is expected behavior and actually helpful for catching unintended concurrent removal of groups. Each active handler's GroupGuard::drop will log the warning as it attempts to decrement a group that no longer exists in the registry, alerting developers to this race condition if it occurs unexpectedly.

src/lib.rs (2)

843-853: LGTM - Key fix for partial batch hangs.

The updated flush_group now proactively flushes receivers belonging to the group before waiting for idle. This correctly addresses the hanging issue where partial batches (fewer messages than batch_size) would never be processed. The implementation:

  1. Retrieves receiver IDs for the group
  2. Flushes only those receivers (targeted flush)
  3. Then waits for the group to become idle

883-983: Well-designed two-pass flush strategy for cascading messages.

The enhanced flush_and_sync_group implementation addresses deadlocks with cascading messages effectively:

  1. First pass (non-blocking): Triggers all receivers to start processing without waiting, preventing upstream handlers from blocking on downstream buffers.
  2. Second pass (blocking): Waits for processing to complete.
  3. Stabilization loop: Continues until no receivers need flushing, with a 32-iteration fuse.
  4. Final partial-batch flush: Ensures any remaining buffered messages are processed.

The logging is well-placed for debugging production issues.

tests/test_groups.rs (7)

2831-2919: Good regression coverage for partial batches in flush_and_sync_group.

Timeout guard + processed-count assertions make the hang regression reproducible.


2921-3002: Good coverage for flush_group partial-batch flush.

Clear reproduction of the pre-fix hang scenario.


3004-3111: Solid cascading-message wait coverage.

The test exercises parent/child completion under timeout, which is the core regression.


3113-3224: Deep-nesting wait coverage looks solid.

The depth tracking makes the intent clear and verifiable.


3360-3445: Independent-group flush behavior covered well.

Nice to see both groups flushed separately and verified.


3447-3490: Empty-group fast-path coverage looks good.

The short timeout makes the “no messages” case explicit.


3492-3732: Comprehensive VRBP-pattern regression test.

The sequence mirrors the real-world flow and should guard against future hangs.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@claude
Copy link

claude bot commented Jan 16, 2026

Pull Request Review: Fix hang on group batches

This PR addresses critical hanging issues in the group-based task tracking system, specifically with flush_group and flush_and_sync_group when used with batched handlers.

Strengths

  1. Correct Problem Identification - Fixes two distinct hang scenarios: partial batches and cascading messages
  2. Comprehensive Test Coverage - 5 new tests that reproduce the bugs with timeout validation
  3. Well-Documented API - Excellent README updates with comparison table and examples
  4. Good Architectural Solution - Two-pass flush strategy handles backpressure without deadlocks

Issues and Concerns

1. Magic Number - Fuse Count (Medium Priority)

Location: src/lib.rs:897

The hardcoded iteration limit of 32 lacks justification. Recommend adding a named constant with documentation.

2. Performance - Repeated Receiver Iteration (Low-Medium)

Location: src/lib.rs:900-954

The loop iterates over ALL receivers multiple times per iteration. Consider caching filtered receiver list.

3. Excessive Debug Logging (Low Priority)

11+ new log statements could create noise. Move iteration details to trace level.

4. Inconsistent Error Handling

Location: src/receiver.rs:1107-1109

flush_nowait error lacks context. Add receiver ID to warning message.

5. Race Condition (Needs Verification)

Location: src/lib.rs:946-953

Add comment explaining why final flush guarantees correctness.

Verdict

Approve with Minor Revisions

This PR successfully fixes critical hanging bugs with a well-reasoned approach. The concerns are mostly minor optimizations that don't block the core fix. Great work on the comprehensive tests and documentation!

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/lib.rs (1)

879-975: Align force parameter semantics with actual behavior.

The documentation states that force "skips waiting for idle and proceeds directly to sync," but the implementation unconditionally executes the final flush loop and wait_idle(group_id).await regardless of the force flag. The force parameter only skips the stabilization loop, not the subsequent wait operations. Either gate the final flush and wait_idle behind !force, or update the documentation to clarify that force only skips the stabilization loop.

✏️ Doc update option
- /// * `force` - If true, skips waiting for idle and proceeds directly to sync
+ /// * `force` - If true, skips the stabilization loop; a final flush + wait_idle still occurs
🤖 Fix all issues with AI agents
In `@README.md`:
- Around line 105-111: Update the "Batch Handler Behavior" docs to state that
batch handlers do propagate group context: clarify that batch handlers set
task-local storage so bus.send inherits the batch's group_id for child messages
unless the child explicitly provides its own group_id, while still noting
messages are batched per group_id and each message's group counter is
decremented after processing; mention that batch handlers do not strip group
context when spawning children only if those children override group_id.

In `@tests/test_groups.rs`:
- Around line 3230-3363: Replace the final fixed sleep in
test_flush_and_sync_group_batched_with_cascading with a bounded polling loop
that checks child_processed.load(Ordering::SeqCst) until it equals 2 (the
expected count) and aborts with a timeout; specifically, after calling
bus.flush_and_sync_group(job_id, false) poll child_processed in a loop using a
short sleep (e.g., 5-20ms) between iterations and wrap the whole wait in
tokio::time::timeout (or abort after a max Duration) so the test fails
deterministically if children spawned by BatchWithCascadeHandler do not complete
in time instead of relying on tokio::time::sleep(100ms).
♻️ Duplicate comments (1)
src/lib.rs (1)

985-1013: Documentation misleadingly implies blocking behavior, but the implementation uses non-blocking flush.

The doc comment and example say "wait for all child messages to complete," but flush_current_group() calls flush_nowait() internally to avoid deadlocks when called from within a handler. The code comment explicitly documents this trade-off. Update the documentation to clarify that this triggers a non-blocking flush and direct readers to flush_group() or flush_and_sync_group() if they need to actually block and wait for message processing.

Suggested doc updates
- /// This is designed to be called from within a handler to wait for all
- /// child messages (sent during this handler's execution) to complete.
+ /// This is designed to be called from within a handler to trigger a non-blocking
+ /// flush of child messages (sent during this handler's execution).
- ///     // Wait for all child messages to complete
+ ///     // Trigger a non-blocking flush of child messages
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b28d295 and fd2aec1.

📒 Files selected for processing (3)
  • README.md
  • src/lib.rs
  • tests/test_groups.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

Use appropriate SendOptions (Broadcast, Direct(id), Except(id), Random, or Balanced) when sending messages through the bus to control routing strategy

Files:

  • tests/test_groups.rs
  • src/lib.rs
🧠 Learnings (6)
📓 Common learnings
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Applies to messagebus/src/**/*.rs : Implement one of the 12 handler traits (Handler, AsyncHandler, BatchHandler, AsyncBatchHandler, SynchronizedHandler, AsyncSynchronizedHandler, BatchSynchronizedHandler, AsyncBatchSynchronizedHandler, LocalHandler, LocalAsyncHandler, LocalBatchHandler, or LocalAsyncBatchHandler) based on the required concurrency model (thread-safe, synchronized, or local) and whether messages should be batched or processed individually
📚 Learning: 2026-01-08T15:42:29.191Z
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Applies to messagebus/src/**/*.rs : Implement one of the 12 handler traits (Handler, AsyncHandler, BatchHandler, AsyncBatchHandler, SynchronizedHandler, AsyncSynchronizedHandler, BatchSynchronizedHandler, AsyncBatchSynchronizedHandler, LocalHandler, LocalAsyncHandler, LocalBatchHandler, or LocalAsyncBatchHandler) based on the required concurrency model (thread-safe, synchronized, or local) and whether messages should be batched or processed individually

Applied to files:

  • README.md
  • tests/test_groups.rs
  • src/lib.rs
📚 Learning: 2026-01-08T15:42:29.191Z
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Applies to messagebus/src/**/*.rs : Use `#[derive(Message)]` with appropriate attributes for types sent through the bus, such as `#[message(clone)]` for broadcast-enabled messages, `#[message(shared)]` for remote transport serialization, and `#[type_tag("custom::name")]` or `#[namespace("my_namespace")]` for custom type tags

Applied to files:

  • README.md
📚 Learning: 2026-01-08T15:42:29.191Z
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Applies to messagebus/src/**/*.rs : Choose the appropriate receiver type (BufferUnorderedAsync/Sync for concurrent processing, BufferUnorderedBatchedAsync/Sync for batched concurrent processing, SynchronizedAsync/Sync for sequential processing, or SynchronizedBatchedAsync/Sync for batched sequential processing) based on whether message processing should be concurrent or sequential

Applied to files:

  • README.md
  • tests/test_groups.rs
  • src/lib.rs
📚 Learning: 2026-01-08T15:42:29.191Z
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Applies to **/*.rs : Use appropriate SendOptions (Broadcast, Direct(id), Except(id), Random, or Balanced) when sending messages through the bus to control routing strategy

Applied to files:

  • README.md
📚 Learning: 2026-01-08T15:42:29.191Z
Learnt from: CR
Repo: shakacode/messagebus PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-08T15:42:29.191Z
Learning: Verify examples work correctly by running `cargo run --example <example_name>` for demo_async, demo_sync_batch, and benchmark examples

Applied to files:

  • tests/test_groups.rs
🧬 Code graph analysis (1)
src/lib.rs (3)
src/group.rs (1)
  • processing_count (261-267)
src/receiver.rs (3)
  • id (109-109)
  • id (418-420)
  • id (789-791)
src/relay.rs (1)
  • id (128-130)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: claude-review
🔇 Additional comments (8)
src/lib.rs (1)

822-853: Good fix for partial batch hangs.

Flushing receivers in the group before wait_idle should prevent partial batches from stalling.

tests/test_groups.rs (7)

2838-2919: LGTM — good regression for partial batch hangs in flush_and_sync_group.


2926-3002: LGTM — solid coverage for flush_group partial batches.


3009-3111: LGTM — validates cascading child messages are awaited.


3117-3224: LGTM — nice deep-nesting coverage.


3367-3450: LGTM — independent-group flushing coverage looks good.


3454-3495: LGTM — empty-group flush guard is a useful regression test.


3505-3737: LGTM — great end-to-end reproduction of the vrbp pattern.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@claude
Copy link

claude bot commented Jan 16, 2026

Pull Request Review: Fix hang on group batches

Summary

This PR addresses critical hanging issues in the messagebus library's group-based task flushing functionality, particularly when using batched handlers. The fixes are well-designed and the comprehensive test coverage provides strong confidence in the solution.

✅ Strengths

1. Excellent Problem Identification and Solution

  • The two-pass flush strategy in flush_and_sync_group (src/lib.rs:892-954) elegantly solves the cascading message deadlock problem
  • Adding flush_nowait for non-blocking flushes prevents handler-initiated deadlocks
  • The fuse counter (32 iterations) is a good safety mechanism to prevent infinite loops

2. Comprehensive Test Coverage

The PR adds extensive tests covering multiple edge cases:

  • Partial batch flushing
  • Cascading messages (parent → child → grandchild)
  • Deep nesting scenarios
  • Mixed batch/async handlers
  • Group propagation with batching

Tests are well-documented with clear explanations of what bugs they're preventing.

3. Excellent Documentation

  • README updates clearly explain the three flush methods and when to use each
  • Inline code comments explain complex logic (especially the two-pass flush strategy)
  • Good use of log statements at appropriate levels (debug/trace/warn)

4. API Design

  • flush_current_group() is a thoughtful addition for handlers to flush child messages
  • The distinction between flush_group, flush_and_sync_group, and flush_current_group is well-motivated

⚠️ Areas for Consideration

1. Performance Impact of Iteration

Location: src/lib.rs:900-954

The flush_and_sync_group implementation iterates over all receivers multiple times (potentially up to 32 iterations × 2 passes per iteration):

loop {
    iters += 1;
    if iters > fuse_count {  // fuse_count = 32
        // ...
    }
    
    // First pass: iterate all receivers
    for r in self.inner.receivers.iter() {
        if receiver_ids.contains(&r.id()) && !r.is_idling() {
            r.flush_nowait(self);
        }
    }
    
    // Second pass: iterate all receivers again
    for r in self.inner.receivers.iter() {
        if receiver_ids.contains(&r.id()) && !r.is_idling() {
            r.flush(self).await;
        }
    }
}

Concern: For systems with many receivers (e.g., 100+ different message types), this could become a performance bottleneck.

Suggestion: Consider maintaining a receiver ID → receiver lookup map in BusInner to avoid full iteration:

// Only iterate receivers that belong to this group
let receiver_ids = self.inner.group_registry.receivers_for_group(group_id);
for receiver_id in receiver_ids.iter() {
    if let Some(r) = self.inner.receiver_by_id(receiver_id) {
        if !r.is_idling() {
            r.flush_nowait(self);
        }
    }
}

2. Infinite Loop Warning vs Error Handling

Location: src/lib.rs:902-908

if iters > fuse_count {
    log::warn!(
        "flush_and_sync_group: group {} did not stabilize after {} iterations",
        group_id,
        fuse_count
    );
    break;
}

Concern: When the fuse triggers, it's unclear if the group is actually in a consistent state. This could lead to:

  • Silent data inconsistency
  • Difficult-to-debug production issues

Suggestions:

  1. Consider returning a Result or status enum indicating whether flushing completed successfully
  2. Alternatively, add telemetry/metrics for when the fuse triggers so it can be monitored
  3. Document what users should do if they encounter this warning in production

3. Debug Logging Verbosity

Location: src/lib.rs, src/group.rs

The PR adds many log::debug! statements (15+ in flush_and_sync_group alone). While helpful for debugging, this could:

  • Impact performance in debug builds
  • Create noisy logs even at debug level

Suggestion: Consider:

  • Moving some debug logs to trace level
  • Using conditional compilation for hot-path logging: #[cfg(feature = "verbose-logging")]

4. Race Condition in flush_current_group

Location: src/lib.rs:1034-1040

let receiver_ids = self.inner.group_registry.receivers_for_group(group_id);

for r in self.inner.receivers.iter() {
    if receiver_ids.contains(&r.id()) {
        r.flush_nowait(self);
    }
}

Potential Issue: The receiver_ids snapshot could become stale if new messages are sent to new receivers concurrently. This is probably acceptable given the non-blocking nature, but worth documenting.

5. Hardcoded Magic Number

Location: src/lib.rs:897

let fuse_count = 32i32;

Suggestion: Make this configurable or at least document why 32 was chosen. Consider:

const FLUSH_STABILIZATION_MAX_ITERS: i32 = 32;

6. Error Handling in flush_nowait

Location: src/receiver.rs:1106-1110

pub fn flush_nowait(&self, bus: &Bus) {
    if self.inner.send_action(bus, Action::Flush).is_err() {
        warn!("flush_nowait failed!");
    }
}

Concern: The error is logged but not propagated. While this makes sense for a "nowait" operation, it might hide serious issues (e.g., receiver channel closed).

Suggestion: Consider using a more descriptive warning message:

warn!("flush_nowait failed for receiver {}: channel may be closed", self.id());

🔍 Code Quality

Positive

  • Consistent error handling patterns
  • Good use of atomic operations with appropriate memory ordering
  • Debug assertions for catching underflow issues in development
  • Clear variable naming

Minor Issues

  • Some code duplication in test setup (could extract helper functions for common test patterns)
  • The three flush operations at the end of flush_and_sync_group (lines 962-982) feel a bit repetitive

🧪 Testing

Excellent: The test suite is comprehensive and well-structured:

  • Tests verify both the bug scenario and the fix
  • Good use of timeouts to detect hangs
  • Tests cover sync, async, and batched variants
  • Tests verify group cleanup and error cases

Suggestion: Consider adding:

  • A benchmark to measure the performance impact of the iterative flushing
  • Stress test with many concurrent groups (e.g., 1000 groups with cascading messages)
  • Test for the 32-iteration fuse triggering scenario

🔒 Security

No security concerns identified. The changes are internal synchronization logic.

📝 Documentation

Excellent: The README updates clearly explain:

  • When to use each flush method
  • Batch handler behavior with groups
  • Group cleanup APIs
  • Usage examples

Minor: The table in README.md could benefit from explaining the performance characteristics of each method.

Recommendation

✅ Approve with minor suggestions

This is a solid fix for critical bugs. The implementation is well-thought-out, thoroughly tested, and well-documented. The concerns raised are mostly about edge cases, performance optimization opportunities, and operational observability rather than correctness issues.

The PR significantly improves the reliability of the messagebus library's group functionality and should be merged after considering the performance implications for systems with many receivers.


Priority Suggestions:

  1. Medium: Address the receiver iteration performance (could impact large-scale deployments)
  2. Low: Improve fuse trigger handling (add metrics/telemetry)
  3. Low: Reduce debug logging verbosity (use trace level)

Great work on the thorough testing and documentation! 🎉

Copy link

@gonzalog gonzalog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants