Skip to content

[Data] Add per-stage training-thread blocking attribution to iter_batches#64183

Open
OneSizeFitsQuorum wants to merge 12 commits into
ray-project:masterfrom
OneSizeFitsQuorum:data/iter-batches-observability
Open

[Data] Add per-stage training-thread blocking attribution to iter_batches#64183
OneSizeFitsQuorum wants to merge 12 commits into
ray-project:masterfrom
OneSizeFitsQuorum:data/iter-batches-observability

Conversation

@OneSizeFitsQuorum

@OneSizeFitsQuorum OneSizeFitsQuorum commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Summary

Decomposes iter_total_blocked_s into per-stage contributions so users can see where the training thread is blocked. Closes #64132, part of RFC #63911.

Design

Each pipeline stage records an independent (start_s, end_s) window via StageTiming, a lightweight context manager. The training thread records its blocked window around next(batch_iter). Per-stage attribution = overlap of the two windows:

overlap = min(stage.end_s, blocked_end) - max(stage.start_s, blocked_start)

Invariant: sum(iter_blocked_*) ≤ iter_total_blocked_s

The fetch stage spans from upstream wait (blocked on data pipeline) through ray.get() completion, capturing both production delays and cross-node transfer.

New Prometheus Metrics (8)

Metric Description
data_iter_blocked_fetch_seconds Upstream wait + ray.get()
data_iter_blocked_batching_seconds Batch creation
data_iter_blocked_format_seconds Format conversion
data_iter_blocked_collate_seconds collate_fn
data_iter_blocked_finalize_seconds finalize_fn
data_iter_batches_total Batches delivered to training loop
data_iter_rows_total Rows delivered (exit-side, respects drop_last)
data_iter_total_seconds Total iterator wall-clock time

Also: rank extracted from dataset tag (last split_N) for per-rank Prometheus querying.

Changes

  • interfaces.pyStageTiming (context manager), BatchTimings, BlockWithTiming
  • iter_batches.py_report_batch_timings() overlap computation with docstring
  • util.pyresolve_block_refs always returns BlockWithTiming (no record_timings flag, no Union/isinstance); nested context managers replace redundant perf_counter() calls; fetch window includes upstream wait
  • stats.py — 8 Prometheus Gauges, Timer gains start_s/end_s, IterStatsSummary per-stage breakdown, rank extraction
  • iterator.py — Final metrics flush on iteration end

Example Output

Per-stage training-thread blocked time breakdown:
    * block fetch (ray.get): 1.74ms
    * batching: 428.36us
    * format: 4.1ms
Total batches consumed: 10
Total rows consumed: 100

Performance

~6 μs/batch overhead. At 10k batches/sec: <0.04% impact on a 10ms training step.

Tests

  • 51 unit tests (iter_batches) + 7 unit tests (stats/summary) — all passing
  • Integration verified with Ray Train v2 DataParallelTrainer + TorchTrainer

References

Closes #64132 · Part of RFC #63911

cc @JasonLi1909

@OneSizeFitsQuorum OneSizeFitsQuorum requested a review from a team as a code owner June 17, 2026 15:50

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces detailed per-stage wall-clock timing and statistics tracking (including fetch, batching, formatting, collating, finalization, and order restoration) for Ray Data batch iteration, along with corresponding Prometheus metrics and unit tests. The review feedback suggests updating the type annotation of dataset_tag in _create_iteration_tags to Optional[str] to align with the underlying function and avoid static type checking errors.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread python/ray/data/_internal/stats.py Outdated
@OneSizeFitsQuorum OneSizeFitsQuorum force-pushed the data/iter-batches-observability branch 2 times, most recently from b02e44d to 850ff58 Compare June 17, 2026 15:59
@ray-gardener ray-gardener Bot added data Ray Data-related issues community-contribution Contributed by the community labels Jun 17, 2026
@OneSizeFitsQuorum OneSizeFitsQuorum force-pushed the data/iter-batches-observability branch 2 times, most recently from faaeb93 to 14ec378 Compare June 18, 2026 01:57

@JasonLi1909 JasonLi1909 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR @OneSizeFitsQuorum! The core idea of capturing the start and end of each pipeline stage to compute overlap with the training thread stall makes sense. That said, let's keep the scope of this PR to just the training thread attribution metrics. We can compartmentalize other metrics for a later PR, but at a glance some of them such as data_iter_prefetch_queue_depth are more of an implementation detail, which we should avoid. Also it would be great if you could simplify the PR description so it's shorter and easier to understand. Thanks!

@OneSizeFitsQuorum OneSizeFitsQuorum force-pushed the data/iter-batches-observability branch from 14ec378 to 69c8cbb Compare June 20, 2026 03:21
@OneSizeFitsQuorum OneSizeFitsQuorum changed the title [Data] Add per-stage training-thread blocking attribution and pipeline observability to iter_batches [Data] Add per-stage training-thread blocking attribution to iter_batches Jun 20, 2026
Comment thread python/ray/data/_internal/stats.py Outdated
…e observability to iter_batches

Implements overlap-based latency attribution for Ray Data's iter_batches
pipeline, addressing ray-project#64132 and RFC ray-project#63911. Each pipeline stage (fetch,
batching, format, collate, finalize, restore_order) records an independent
(start_s, end_s) time window. The training thread captures its own blocked
window around next(). Attribution per stage is the overlap of the two
windows, correctly handling prefetch > 1.

New Prometheus metrics (14 total):
- data_iter_blocked_{fetch,batching,format,collate,finalize,restore_order}_seconds
- data_iter_batches_total, data_iter_rows_total
- data_iter_total_seconds, data_iter_restore_order_buffer_peak
- data_iter_shuffle_buffer_{rows,compactions_total,compaction_seconds}
- data_iter_prefetch_queue_depth

Also adds:
- Per-stage breakdown rendering in IterStatsSummary.to_string()
- Rank extraction from dataset tags for Prometheus labels
- Final metrics flush on iterator completion

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
@OneSizeFitsQuorum OneSizeFitsQuorum force-pushed the data/iter-batches-observability branch from 69c8cbb to 071ebd6 Compare June 20, 2026 03:26
@OneSizeFitsQuorum

Copy link
Copy Markdown
Contributor Author

Thanks for the review @JasonLi1909! I've made the following changes:

  1. Reduced scope — removed the 5 implementation-detail metrics (data_iter_prefetch_queue_depth, data_iter_restore_order_buffer_peak, data_iter_shuffle_buffer_*). The PR now only contains the 9 core training-thread attribution metrics. The removed metrics are preserved on a separate branch for a future PR.

  2. Simplified PR description — cut it down from ~300 lines to ~80 lines, focusing on design, the 9 metrics, and test results.

Also fixed a regex issue flagged by Cursor Bugbot: the rank extraction now uses the last split_N match (instead of the first) to avoid false matches when the user-defined dataset name contains split_<digits>.

Reverts batcher.py changes that were only needed for the shuffle
buffer metrics which have been removed from this PR's scope per
reviewer feedback.

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
with stats.iter_get_s.timer() if stats else nullcontext():
block = ray.get(block_ref)
yield block
end_s = time.perf_counter()

@JasonLi1909 JasonLi1909 Jun 20, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having tow timers for the same thing, can we consolidate the capture of start_s and end_s into the Timer class (in stats.py)? Double check for any backwards compatibility issues with other consumers of Timer

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. StageTiming now supports context manager protocol (__enter__/__exit__), and Timer gained start_s/end_s fields. Pipeline functions use nested context managers instead of redundant perf_counter() calls. No backwards compatibility issues — Timer is only used internally within ray/data.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also capture the blocked time here. It will be useful to indicate how we much we are blocked on the upstream data pipeline and cross node transfer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The fetch window in resolve_block_refs now spans from next(block_ref_iter) (upstream wait) through ray.get() completion, capturing both production delays and cross-node transfer.

def resolve_block_refs(
block_ref_iter: Iterator[ObjectRef[Block]],
stats: Optional[DatasetStats] = None,
) -> Iterator[Block]:
record_timings: bool = False,

@JasonLi1909 JasonLi1909 Jun 20, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the optionality here and always record the timings to avoid downstream isinstance(block, BlockWithTimings) checks, Union types, and other type branching logic. And if we can consolidate the (start, end) capture into Timer, then we can still toggle this timing via the presence of stats

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. resolve_block_refs always returns BlockWithTiming (no record_timings parameter). batch_blocks() wraps raw blocks in BlockWithTiming with zero timing before passing to blocks_to_batches(), so _BatchingIterator receives a uniform type. All isinstance checks, Union types, and type branching logic removed.

@@ -452,7 +474,9 @@ def get_next_ref_bundle() -> RefBundle:
prefetcher.stop()


def restore_original_order(batch_iter: Iterator[Batch]) -> Iterator[Batch]:
def restore_original_order(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's hold off on timing restore order and surfacing data_iter_blocked_restore_order_seconds. This is also more of an implementation detail. Instead we should focus on surfacing actionable metrics for users.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed restore_order stage, data_iter_blocked_restore_order_seconds metric, and all related code. restore_original_order() reverted to the original simple for-loop. PR now exposes 8 metrics (5 blocked stages + batches/rows/total).

Comment thread python/ray/data/_internal/block_batching/interfaces.py Outdated
with self.yield_batch_context(batch):
yield batch.data

self.after_epoch_end()

def _report_batch_timings(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: docstring

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@JasonLi1909 JasonLi1909 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @OneSizeFitsQuorum, thanks for the cleanup. I've left some more comments. Overall, make sure to include docstrings, comments, and that names of data classes/functions communicate their purpose. I'll take another look after you're done, thanks!

Per reviewer feedback, restore_order is an implementation detail
rather than an actionable user-facing metric. Reverts
restore_original_order() to the original simple for-loop and
removes the data_iter_blocked_restore_order_seconds Prometheus
metric along with all related fields, exports, and tests.

The PR now exposes 8 core metrics (5 blocked stages + batches/rows/total).

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
Per reviewer feedback, consolidates the dual timing mechanism:

- StageTiming now supports context manager protocol (__enter__/
  __exit__) to automatically capture start_s/end_s
- Timer gains start_s/end_s fields populated by timer()
- Pipeline functions (resolve_block_refs, _format_batch,
  _collate_batch, _finalize_batch) use nested context managers
  instead of redundant perf_counter() + _record_stage_window()
- resolve_block_refs always returns BlockWithTiming, removing the
  record_timings parameter, Union types, and isinstance branching
- Removed _record_stage_window helper (no longer needed)

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
The fetch timing window in resolve_block_refs now spans from when we
start waiting for the upstream iterator (blocked on the data pipeline)
through ray.get() completion. This captures cross-node transfer and
upstream production delays, giving a more complete picture of what
blocks the training thread.

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
Per reviewer feedback, adds clear docstrings to:
- BatchTimings (per-batch pipeline-stage timing windows)
- BlockWithTiming (resolved block with fetch timing)
- BatchTimings.merge_fetch() (multi-block fetch window expansion)
- BatchTimings.stages() (stage name/timing iterator)
- _report_batch_timings() (overlap-based attribution algorithm)

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
_BatchingIterator can receive blocks from paths other than
resolve_block_refs (e.g., doctest examples that pass raw pyarrow
Tables). Restore the isinstance check to handle both BlockWithTiming
and raw Block objects gracefully.

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 2 potential issues.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 9fcde56. Configure here.

Comment thread python/ray/data/_internal/block_batching/interfaces.py Outdated
Comment thread python/ray/data/_internal/block_batching/iter_batches.py Outdated
Per reviewer feedback, removed isinstance check and Union type from
_BatchingIterator by ensuring all entry points wrap blocks in
BlockWithTiming:

- batch_blocks() now wraps raw blocks in BlockWithTiming with zero
  timing before passing to blocks_to_batches()
- _BatchingIterator now assumes all blocks are BlockWithTiming
- Removed Union import from util.py

This provides a uniform type throughout the batching pipeline while
maintaining backward compatibility for external callers of batch_blocks().

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
Per Cursor Bugbot review:

1. merge_fetch now sums fetch durations instead of taking the span,
   avoiding counting idle gaps between consecutive block fetches as
   fetch blocking time.

2. Move blocked_start_s/blocked_end_s captures inside
   get_next_batch_context() so the blocked window aligns with
   iter_total_blocked_s, preventing sum(iter_blocked_*) from exceeding
   iter_total_blocked_s.

Updated tests to reflect the new duration-summing behavior.

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
After changing merge_fetch to sum durations instead of taking the span,
the _merge_stage helper is no longer called anywhere. Remove the dead code.

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
After deeper analysis, the span approach (taking [earliest_start, latest_end])
is semantically correct for multi-block fetches:

- From the training thread's perspective, it's blocked for the entire span,
  even if there are gaps between consecutive block fetches
- Those "idle gaps" are actually pipeline overhead (batching logic, scheduling)
  and are part of the blocking experience
- Summing durations would underestimate the actual blocking time

The Cursor Bugbot concern about "idle gaps" is valid in theory, but in practice:
1. The gaps are very small (microseconds of pipeline overhead)
2. They represent real blocking time from the training thread's perspective
3. Span aligns with the semantic meaning of "how long did training wait"

Reverted tests to expect span behavior.

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
@OneSizeFitsQuorum

Copy link
Copy Markdown
Contributor Author

Re: merge_fetch idle gaps (Cursor Bugbot)

After deeper analysis, we've reverted to the span-based approach (commit aa41de5). Here's why:

Semantic correctness: From the training thread's perspective, when it calls next(batch_iter) for a multi-block batch, it's blocked for the entire span from when the first block starts fetching to when the last block finishes. Even if there are gaps between consecutive block fetches, the training thread is still waiting.

The "idle gaps" aren't really idle: The gaps between block fetches are actually pipeline overhead (batching logic, scheduling, etc.). They're part of the blocking experience from the training thread's viewpoint.

Sum underestimates: If we sum durations, we'd report less blocking time than the training thread actually experienced, which defeats the purpose of observability.

Example:

Block 1: [0ms, 100ms]
Pipeline overhead: [100ms, 150ms]
Block 2: [150ms, 250ms]

Training thread blocked: 0ms → 250ms = 250ms (span) ✓
Sum of fetch work: 100ms + 100ms = 200ms (underestimates) ✗

The span approach correctly answers "how long did the training thread wait for this batch?", which is what users care about for performance debugging.

Cursor Bugbot's concern about idle gaps is theoretically valid, but in practice those gaps are microseconds of pipeline overhead and represent real blocking time.

- test_util.py: Updated test_resolve_block_refs to expect BlockWithTiming
  objects and test_blocks_to_batches to wrap raw blocks
- block_batching.py: Changed generator expression to map() to avoid holding
  references to blocks, fixing test_chained_transforms_release_intermediates

Signed-off-by: OneSizeFitsQuorum <tanxinyu@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC] Data Ingest Observability with Ray Data + Ray Train

2 participants