[data] Zarr datasource#63003
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces support for reading Zarr v2 stores in Ray Data by adding the ZarrV2Datasource and a public read_zarrv2 API. The implementation includes support for various storage backends (local, S3, Azure) and handles chunk metadata, slice bounds, and padding. Feedback focuses on reducing logic duplication in chunk calculation, simplifying path normalization, converting a utility method to a static method, and adhering to standard Python formatting for keyword arguments.
7390b42 to
1da5b21
Compare
901636d to
687c8fa
Compare
34ab253 to
a57fd8c
Compare
d991098 to
27539fe
Compare
|
Can we rename this so that it's read_zarr? |
Address four edge-case review findings in ZarrV2Datasource: 1. Pin local:// stores to the driver node: set supports_distributed_reads from the path scheme (like FileBasedDatasource) so read tasks aren't scheduled on workers that can't see the driver's local disk. 2. Detect consolidated metadata by trying open_consolidated rather than a separately-built exists() probe. The probe could disagree with the mapper's key lookup (e.g. archive/root stores with an empty store path) and wrongly treat a consolidated store as unconsolidated. 3. Reject a group path passed via array_paths on an unconsolidated store with a clear "is a group, not an array" error instead of a confusing AttributeError later. (The consolidated and full-scan paths already filter to arrays.) 4. Validate array_paths for single root-level array stores so a bad path errors instead of silently returning the root array. Add a test for each. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
The "This guide covers" list linked to #cloud-storage-and-credentials, but that section was renamed to "Zarr's .zattrs". Sphinx emits a myst.xref_missing warning, which ReadTheDocs (fail_on_warning: true) turns into a build failure -- though Buildkite's doc build tolerates it. Repoint the bullet to the .zattrs section via an explicit `(zarr-zattrs)=` target so the link doesn't depend on the auto-generated heading slug. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
chunk_shapes validation used isinstance(x, int), which rejected NumPy scalar integers (numpy.int64, etc.) even when positive -- a common case since chunk sizes are often derived from array metadata. Accept any numbers.Integral (excluding bool) via a shared _is_positive_int helper, and normalize stored values to plain ints. Adds a test. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
| def read_fn() -> Iterable[pd.DataFrame]: | ||
| yield pd.DataFrame( | ||
| { | ||
| "array": [d.array_name for d in batch], | ||
| "chunk_index": [d.chunk_index for d in batch], | ||
| "chunk_slices": [d.chunk_slices for d in batch], | ||
| "chunk": [ | ||
| _read_chunk(root, d.array_name, d.chunk_slices) for d in batch | ||
| ], | ||
| } | ||
| ) | ||
|
|
||
| return read_fn |
There was a problem hiding this comment.
should we yield pyarrow instead?
There was a problem hiding this comment.
You are right. I checked other datasources and they use DelegatingBlockBuilder in this place so I adopted that.
| torchvision==0.24.0 | ||
| confluent-kafka | ||
| zarr<3 ; python_version >= '3.11' # zarr 2.18.4+ requires py3.11+ (v2 API) | ||
| zarr>=2.18,<2.18.4 ; python_version < '3.11' # 2.18.3: last v2 line supporting py3.10 |
There was a problem hiding this comment.
can you remove the upperbound? should recompile without issue since the lock files have a pinned version
There was a problem hiding this comment.
tldr on why here: https://iscinumpy.dev/post/bound-version-constraints/#tldr
Build read-task output with DelegatingBlockBuilder (-> ArrowBlockBuilder) instead of hand-constructing pandas DataFrames, matching the tensor/per-row datasources (image, audio, video, torch). Blocks are now pyarrow Tables and the Arrow tensor extension handles the variable-shaped `chunk` column (shorter trailing-edge chunks) automatically. Drops the pandas dependency in the datasource. Test updates: - _execute_read_tasks converts each (now-Arrow) block to pandas. - _reconstruct_array sorts by a tuple key, since chunk_index/chunk_slices round-trip as Arrow lists, not Python tuples. - Drop ray_start_regular_shared from the two auto-init tests: building an Arrow block auto-inits Ray, which conflicted with the fixture's unguarded ray.init() (the rest of the module already relies on auto-init). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Remove three tests whose coverage is fully subsumed elsewhere: - test_align_axis_0_accepts_per_array_chunk_shapes: dict chunk_shapes resolution is covered by test_chunk_shapes_resolution_across_mixed_rank (asserts _array_chunks directly), and aligned wide-row output by test_align_axis_0_emits_wide_rows; the aligned path consumes the resolved chunks regardless of dict vs sequence, so the combination adds no path. - test_overlap_enables_windowing_without_cross_row_loss: its assertion is pure arithmetic on the per-row data extents already asserted by test_overlap_extends_chunk_data; it exercises no new datasource behavior. - test_align_axis_0_column_set: the no-array_paths case duplicated the column assertion in test_align_axis_0_emits_wide_rows; de-parametrized to keep only the array_paths-filtering case, which is its unique coverage. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Fix ray-project#2 (limit pushdown): the read fns now slice their batch to per_task_row_limit, so a downstream limit(K) reads ~K chunks instead of the whole batch's I/O. Previously ReadTask only truncated the already-built block (_iter_sliced_blocks), so every chunk in the batch was still fetched. Fix ray-project#4 (retries): chunk reads are wrapped in iterate_with_retry(match=DataContext.retried_io_errors) -- the same mechanism FileBasedDatasource uses -- so zarr reads now honor Ray Data's retry config. The underlying filesystem's own retry still applies underneath. Tests: per_task_row_limit caps the number of _read_chunk calls (not just the output row count); _read_chunk retries a transient error then succeeds. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
_get_long_form_read_tasks no longer materializes a per-chunk descriptor list on the driver -- the product(grid) enumeration was O(total chunks) and ran even for take(1)/limit (e.g. ~64,800 descriptors for one MUR SST array). Read tasks now describe a contiguous flat range of the chunk grid; the read fn unravels each flat index to an N-D chunk_index lazily on the worker. Planning is O(n_tasks) per array, independent of chunk count. - New _ChunkRange (replaces per-chunk _ChunkDescriptor) + _unravel (row-major, preserving the previous itertools.product ordering). - size_bytes is now an O(1) upper-bound estimate (full-size chunk per index) instead of an O(chunks) exact sum. - per_task_row_limit caps the range, not a list slice; aligned path unchanged (already O(output rows)). Adds a test asserting chunk_index order is identical to grid enumeration. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Replace the hand-rolled flat-index -> N-D unravel helper with numpy's np.unravel_index (the recognized primitive for exactly this). Its default C-order matches the previous ordering, so the emitted chunk_index sequence is unchanged (the ordering test still passes); int() keeps the indices as Python ints. Drops the _unravel helper. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
|
I did another round of self-review and also asked CC to view to make this quicker. |
pyrefly (CI lint) flagged 3 type errors in the new files. None are runtime bugs (the suite passes); the fixes clarify intent or suppress an intentional fake: - _create_aligned_read_fn: annotate `row: dict[str, Any]` so assigning a chunk ndarray isn't checked against the all-int TypedDict pyrefly inferred from the t_start/t_stop literals. - _is_positive_int: `int(x) > 0` (pyrefly can't type `>` on numbers.Integral). - test_read_chunk_retries_transient_io: `# pyrefly: ignore[bad-argument-type]` for the deliberate fake _Root() passed to _read_chunk (repo convention). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Use the public example dataset in the docs and add an integration test (matching read_zarr, ray-project#63003): the read_lerobot and LeRobotDatasource docstring examples now read s3://anonymous@ray-example-data/lerobot/libero-mini, and test_read_lerobot_integration_public_s3 reads it end-to-end. Review fixes (Ray Data conventions): - Expose num_cpus/num_gpus/memory/ray_remote_args/concurrency on read_lerobot, forwarded to read_datasource (mirrors read_images) so the decode-heavy read tasks can be tuned; document the override_num_blocks vs partitioning interaction. - Dictionary-encode the per-dataset-constant stats and task columns instead of repeating the multi-KB stats JSON on every row, and count the appended columns in the in-memory size estimate. - Close the fsspec file handles in _CredsVideoDecoderCache.clear() (was leaking a file descriptor per decoded video file). - Add image-based v3 unit coverage: a synthetic image_camera fixture and test_read_lerobot_image_camera. - Drop the redundant per-task driver ray.get(roots_ref); pass the already- materialized roots to _LeRobotReadTask, keeping the ref only for the worker read path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Reviewed by Cursor Bugbot for commit fa16bd3. Configure here.
| if ray.is_initialized(): | ||
| ray.shutdown() | ||
| ray.init( | ||
| num_cpus=1, | ||
| logging_level=logging.ERROR, | ||
| log_to_driver=False, | ||
| runtime_env={"worker_process_setup_hook": _register_codec}, | ||
| ) | ||
| try: | ||
| ds = ray.data.read_zarr(str(store_path)) | ||
| rows = sorted(ds.take_all(), key=lambda r: tuple(r["chunk_index"])) | ||
| recon = np.concatenate([r["chunk"] for r in rows]) | ||
| np.testing.assert_array_equal(recon, np.arange(8, dtype="u1")) | ||
| finally: | ||
| ray.shutdown() |
There was a problem hiding this comment.
is there no existing ray fixture for this? if not, can we use a fixture instead?
| zarrv2_datasource.ZarrV2Datasource(str(tmp_path)) | ||
|
|
||
|
|
||
| def test_explicit_filesystem_strips_uri_scheme(tmp_path): |
There was a problem hiding this comment.
none of these tests are well-isolated, i think you need the shutdown_only fixture or something
There was a problem hiding this comment.
Thanks! Isolated.
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
The datasource is backend-agnostic: path/filesystem resolution is delegated to pyarrow/fsspec (shared Ray Data machinery), so a live remote read exercises generic pyarrow/fsspec, not datasource logic. Filesystem handling is already covered hermetically by test_read_zarr_basic_across_filesystems (parametrized over fs flavors on local paths), so the unit-test file stays network-free. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>

Description
This PR introduces
ray.data.read_zarr()and the backingZarrV2Datasourcefor reading Zarr v2 stores with Ray Data.This adds a dedicated public API and datasource implementation for Zarr v2 so users can read chunk metadata from consolidated Zarr v2 stores through the standard Ray Data read API surface.
Related issues
N/A
Additional information
This PR adds:
ray.data.read_zarr()as a new public Ray Data read APIZarrV2Datasourceas the datasource implementation used by the APIExample usage: