Skip to content

[Data] DataSourceV2: row-group-aware Parquet chunking#64020

Draft
abhishekverma-ray wants to merge 31 commits into
ray-project:masterfrom
abhishekverma-ray:dsv2-chunking
Draft

[Data] DataSourceV2: row-group-aware Parquet chunking#64020
abhishekverma-ray wants to merge 31 commits into
ray-project:masterfrom
abhishekverma-ray:dsv2-chunking

Conversation

@abhishekverma-ray

@abhishekverma-ray abhishekverma-ray commented Jun 11, 2026

Copy link
Copy Markdown

Why are these changes needed?

In DataSource V2, the ParquetFileChunker originally split files by a byte
estimate
(ceil(file_size / target_chunk_size)) and reconciled each guessed
byte range to real row groups at read time (_calculate_row_group_range).
Because the Parquet read's atomic unit is the row group (PyArrow fragments
are row-group-granular), this byte-vs-reality mismatch caused empty/wasted read
tasks and redundant footer reads — worst for enterprise row groups (128–512 MB).

Fixing the chunker means it now reads each file's footer once at listing
time
. That same footer read also exposes per-row-group row counts and
per-column (un)compressed sizes — so a second, long-standing problem becomes
cheap to fix in the same pass: partition sizing. DataSource V2 packs chunks
into read tasks by estimated in-memory bytes, but the estimate was a single
global on_disk × 5 ratio. When compression/encoding varies across (or within)
files, equal-on-disk chunks get equal estimates despite very different true
in-memory sizes, so read tasks come out skewed (under-fill on one side, OOM risk
on the other). This PR therefore lands two connected changes — true row-group
chunking
and a footer-derived, type-aware in-memory size estimate — both
powered by the one footer read the chunker now performs.

What this changes

1. Row-group-aware chunking (the chunker)

Read the footer once per file at listing time and chunk on true row-group
boundaries
, emitting explicit ranges so the reader needs no reconciliation:

  • ParquetFileChunkMetadata carries {row_group_start, row_group_end}
    (half-open) instead of a byte-estimate chunk index.
  • ParquetFileChunker.generate_chunk_metadatas greedily bundles consecutive
    row groups
    up to target_chunk_size (always ≥ 1 row group). Corrupt /
    zero-row-group footers fall back to a single whole-file chunk.
  • The indexer (file_indexer.py) threads filesystem through and
    parallelizes the per-file footer reads across its worker pool.
  • The reader path (parquet_file_chunking_utils.py) consumes the explicit range
    directly via fragment.subset(row_group_ids=...); _calculate_row_group_range
    is deleted.

2. Footer-derived, type-aware in-memory size estimate (v2a/v2b)

From the footer the chunker already reads, compute a per-chunk Arrow in-memory
size
and use it for partition sizing instead of the flat on_disk × 5 ratio:

  • New estimate_chunk_in_memory_size(...) in file_chunker.py sizes each column
    by Arrow type: fixed-width (int/float/temporal/decimal) = rows × byte_width (bool bit-packed), variable-width (string/binary/list) =
    uncompressed × var_width_factor + rows × 4 (int32 offsets), plus a
    ceil(rows/8) validity bitmap per nullable column. The chunker stamps the
    result into ParquetFileChunkMetadata["in_memory_size"].
  • New FooterDerivedInMemorySizeEstimator (in_memory_size_estimator.py) reads
    that hint (vectorized), falling back to on_disk × ratio for hint-less /
    non-Parquet rows so mixed manifests are always safe.
  • ParquetDatasourceV2.get_size_estimator() selects it when row-group-aware
    chunking is on; otherwise keeps the constant-ratio
    ParquetInMemorySizeEstimator.
  • Gated by a new DataContext.parquet_use_footer_size_estimate flag (default
    True), so it's A/B-able and reversible. The fixed→Arrow part is exact and
    compression/encoding-independent; only variable-width columns use the
    conservative constant DataContext.parquet_in_memory_var_width_factor (default
    2.0, v2d) — no per-file sampling.

3. Projection-aware sizing (v2c)

When a read projects a column subset, size by only those columns:

  • ListFiles gains a sizing-only projected_columns field.
  • ProjectionPushdown records the pushed column set on the upstream ListFiles
    (idempotent, fixed-point-safe) in addition to pushing it into ReadFiles.
  • plan_list_files_op propagates it onto the chunker, so the footer estimate
    counts only projected columns. A star/unknown projection leaves it unset
    (sizes the full row group, as before).

4. Benchmark tooling (release test helper)

release/nightly_tests/dataset/read_chunked_parquet.py gains a --mode mixed
generator that produces a heterogeneous dataset from just --num-files +
--file-size (codec sampled per file; on-disk row-group size + compression ratio
sampled per row group from discrete buckets — so files differ from one another
and row groups vary within a file). New read-side knobs
--footer-size-estimate {default,on,off} and --var-width-factor make the
v2a/v2b estimator directly A/B-able against the legacy constant-ratio one. The
uniform mode and the v1/v2/v3 read paths are unchanged.

Impact

  • No empty/wasted read tasks; one footer read per file at listing instead of
    one per byte-chunk at read time.
  • Correct for tiny and large row groups: small row groups map ~1:1 to chunks;
    128–512 MB row groups become exactly one chunk each (per-task memory floor =
    one row group).
  • Better-balanced read tasks under heterogeneous data: partition sizing now
    tracks per-chunk true in-memory size, neutralizing block-codec variance
    entirely and fixed-width encoding variance exactly; variable-width is improved
    (bounded by var_width_factor).
  • All sizing changes are gated and reversible
    (parquet_use_footer_size_estimate) and affect partition balance/perf only,
    never which data is read.

Scope / non-goals

  • Builds on the chunker by adding partition sizing (previously listed as a
    separate change); read-task parallelism (num_buckets) and reader
    batch/output-block sizing remain separate.
  • Sub-row-group reads remain out of scope — a single-row-group file is still
    one read task (the Parquet parallelism floor).
  • The footer-derived estimate is options-blind by construction (computed at
    listing, before read-time read_dictionary / tensor-extension casting), and
    variable-width / dictionary-encoded columns remain approximate; it's a bounded
    accuracy improvement, not a general solution, and not a substitute for
    read-time block-size bounding (tracked follow-up).
  • Footer reads happen at listing time (small range-GET per file), parallelized
    across the indexer's worker pool. Caching footer metadata to also avoid the
    read-time subset re-read is a tracked follow-up.

Related issue number

DATA-2480

Testing Strategy

  • Unit tests — row-group chunking (test_file_chunker.py,
    test_file_indexer.py, test_parquet_datasource_v2.py); footer-derived sizing
    (test_in_memory_size_estimator.py, estimator-selection in
    test_parquet_datasource_v2.py); projection-aware sizing
    (test_read_files_logical.py, test_list_files_op.py). End-to-end verified via
    read_parquet().select_columns() (projection reaches ListFiles; rows
    correct).
  • Benchmark / release helper — read_chunked_parquet.py --mode mixed
    generates heterogeneous data and A/B's the estimator across v1/v2/v3 and
    --footer-size-estimate on/off.
  • This PR is not tested :(

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Parquet file chunking mechanism in Ray Data (DataSourceV2) to chunk on true row-group boundaries by reading the Parquet file footer at listing time. Instead of estimating chunks, the ParquetFileChunker now bundles consecutive row groups into explicit ranges, and the FileIndexer parallelizes these footer reads across its thread pool. Feedback on the changes identifies a critical issue where the chunker uses the uncompressed row group size (total_byte_size) instead of the compressed size (total_compressed_size), which would lead to under-chunking and inconsistent manifest sizes. It is recommended to update both the chunker implementation and the corresponding unit tests to use total_compressed_size.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread python/ray/data/_internal/datasource_v2/chunkers/file_chunker.py Outdated
Comment thread python/ray/data/tests/unit/datasource_v2/test_file_chunker.py
Replaces the byte-estimate-and-reconcile Parquet chunker with one that
reads the footer at listing time and chunks on true row-group
boundaries.

Why
---
The Parquet read's atomic unit is the row group (PyArrow fragments are
row-group-granular). The previous chunker estimated chunk count from
``ceil(file_size / target_chunk_size)`` and reconciled the guess to real
row groups at read time. When the estimate diverged from the layout it
produced empty read tasks and redundant footer reads — and misbehaved
badly for large (128-512 MB) enterprise row groups, where a small target
emitted hundreds of chunks for a file with 1-2 row groups.

What
----
- ``ParquetFileChunker.generate_chunk_metadatas`` now reads each file's
  footer (``pq.read_metadata``) and greedily bundles consecutive row
  groups until their on-disk size reaches ``target_chunk_size`` (always
  >= 1 row group per chunk). Each chunk carries an explicit half-open
  ``{row_group_start, row_group_end}`` range. Corrupt/unreadable footer
  or zero row groups falls back to a single whole-file chunk.
- ``FileChunker`` gains a ``filesystem`` param on
  ``generate_chunk_metadatas`` and a ``reads_file_metadata`` flag.
- ``NonSamplingFileIndexer`` threads the filesystem to the chunker and,
  when the chunker reads metadata, fans the footer reads across its
  thread pool over the *discovered files* (``make_async_gen``), so reads
  parallelize even for a single input directory. Pruning runs before
  chunking so footers are never read for files that would be discarded.
- ``_fragments_from_chunk_metadata`` consumes the explicit range
  directly (defensive clamp to the file's actual row-group count);
  ``_calculate_row_group_range`` and the over-estimate→reconcile step are
  deleted.
- The chunker's default target falls back to
  ``DataContext.target_min_block_size`` so normal-sized row groups map
  ~1:1 to chunks.

Replaces the interim adaptive byte-estimate path: removes
``read_api._compute_adaptive_parquet_chunk_size`` and the
``ParquetDatasourceV2._get_file_indexer(target_chunk_size_override=...)``
kwarg. F1 (num_buckets), F2 (partitioner), F4 (batch sizing), F5 (output
block sizing) are unchanged.

Tests
-----
- ``test_file_chunker.py``: footer-based bundling (K=1 when target <
  row-group size; bundle-all when target large), contiguous covering
  ranges, chunk_size == summed row-group bytes, corrupt-footer
  whole-file fallback, target resolution / precedence,
  ``reads_file_metadata`` flags.
- ``test_parquet_datasource_v2.py``: ``_fragments_from_chunk_metadata``
  explicit-range slicing + defensive clamp; reader round-trips chunked
  manifests; deleted ``_calculate_row_group_range`` tests.
- ``test_file_indexer.py``: row-group-boundary splitting; parallel
  footer reads across workers.
- ``test_read_api_parallelism.py``: removed the deleted
  adaptive/override cases; kept F1/F4/F5 coverage.

Known limitation: a single-row-group file is still one read task (the
inherent Parquet floor; sub-row-group reads are out of scope). Footer
reads at listing scale with file count, parallelized across the indexer
thread pool; read-time footer-read elimination (metadata caching) is a
tracked follow-up.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
abhishekverma-ray and others added 9 commits June 11, 2026 17:43
…g in Datasource V2.

Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
…nts to use DSv2 instead of default v1.

Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
…re chunking experiments.

Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
…e Parquet chunker

Add DataContext.parquet_chunker_row_group_aware (default True) — a runtime
flag, set the same way as use_datasource_v2 — that selects between the
row-group-aware ParquetFileChunker and the (restored) legacy
ByteEstimateParquetFileChunker, so experiments can A/B the chunkers without
code changes or a rebuild.

- context.py: DEFAULT_PARQUET_CHUNKER_ROW_GROUP_AWARE=True + DataContext field.
- file_chunker.py: restore ByteEstimateParquetFileChunker (byte estimate,
  reads_file_metadata=False) and ByteEstimateParquetFileChunkMetadata
  ({chunk_idx, total_num_chunks}) alongside the row-group chunker.
- parquet_file_chunking_utils.py: restore _calculate_row_group_range; make
  _fragments_from_chunk_metadata dispatch on the metadata schema.
- parquet_datasource_v2.py: pick the chunker from the flag at construction
  (an explicit file_chunker still wins). The indexer auto-adapts via the
  chunker's reads_file_metadata flag, so no indexer change is needed.
- Tests: both chunkers, both reader paths, flag-based selection, context default.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
…n to DSv2 toggle for experiments.

Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
…rojection awareness

Replace the flat on-disk x encoding-ratio in-memory size estimate used for
size-balanced partition bucketing with a footer-derived, type-aware estimate,
so partition sizing tracks per-file compression/encoding variance instead of a
single global guess (reduces skew / OOM risk on heterogeneous inputs).

v2a/v2b/v2d -- type-aware footer hint + estimator:
- file_chunker.py: estimate_chunk_in_memory_size() sizes each column by Arrow
  type (fixed-width = rows x byte_width, bool bit-packed, var-width =
  uncompressed x factor + int32 offsets, + ceil(rows/8) validity per nullable
  field). The row-group-aware ParquetFileChunker stamps in_memory_size into
  each chunk's metadata.
- in_memory_size_estimator.py: FooterDerivedInMemorySizeEstimator reads the
  hint; falls back to on_disk x ratio for hint-less / non-Parquet rows.
- parquet_datasource_v2.py: get_size_estimator() selects it under the flag.
- context.py: parquet_use_footer_size_estimate (A/B flag, default on) +
  parquet_in_memory_var_width_factor (constant knob, no sampling).

v2c -- projection-aware sizing:
- read_operator.py: ListFiles gains a sizing-only projected_columns field.
- projection_pushdown.py: a projection pushed into ReadFiles is also recorded
  on the upstream ListFiles (idempotent, fixed-point safe).
- plan_list_files_op.py: the planner copies it onto the chunker so the footer
  estimate counts only the projected columns.

Gated behind parquet_use_footer_size_estimate; only affects partition *sizing*
(balance/perf), never which data is read. Legacy byte-estimate and non-Parquet
paths keep the constant-ratio estimator.

Tests: new test_in_memory_size_estimator.py + estimator-selection fallbacks,
ListFiles/ProjectionPushdown/planner v2c tests, updated chunker metadata-key
tests. Verified end-to-end via read_parquet().select_columns() with
use_datasource_v2=True.

Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… a mixed gen mode for testing.

Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
@abhishekverma-ray

Copy link
Copy Markdown
Author

Populate infer_metadata function during list and ensure that it propagates to downstream shuffle/read

abhishekverma-ray and others added 16 commits June 17, 2026 10:17
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
…ng with uniform rg sized tests.

Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
…zing knobs

Add a file-locality read partitioning strategy and make it the default, plus
two dedicated independently-tunable sizing knobs.

- FileAffinityPartitioner: groups each file's chunks into size-bounded
  partitions -- one file per read task (one open + footer + sequential I/O);
  large files fan into multiple partitions of consecutive row groups. Groups
  by path so interleaved chunk streams still group per file. New default;
  round_robin remains selectable via DataContext.parquet_partitioner_strategy.
- Dedicated knobs (default None -> unchanged behavior):
  - parquet_partitioner_max_bucket_size_bytes: per-partition in-memory cap,
    independent of target_max_block_size (bundle more row groups per task).
  - parquet_reader_target_batch_size_bytes: per-decode-batch target,
    independent of output-block size.
- Reader: fragments_to_read_for_manifest coalesces a partition's sister chunks
  per file into contiguous-run scans (one open + cached footer + sequential
  I/O) instead of one scan per row group; per-run offsets keep row hashes
  unique.
- read_chunked_parquet.py: --partitioner-strategy / --max-bucket-size /
  --batch-target-bytes / --chunker-target-chunk-size knobs.
- Tests + audit of RR-coupled tests.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
…nges.

Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Signed-off-by: Abhishek Verma <abhishek.verma@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant