From 80fb13f4155f5f79afb732d0aef7b9d63d0e86be Mon Sep 17 00:00:00 2001 From: pinjie Date: Fri, 15 May 2026 02:28:35 -0700 Subject: [PATCH] [feat] Add PyNvBatchAsyncStreamReader for 2D batched async stream decoding --- .claude/skills/docs-compliance/SKILL.md | 217 ++++++ docs/spelling_wordlist.txt | 1 + .../on_demand_video_decoder/__init__.py | 2 + .../on_demand_video_decoder/docs/sample.md | 133 +++- .../src/PyNvOnDemandDecoder/CMakeLists.txt | 1 + .../inc/PyNvBatchAsyncStreamReader.hpp | 167 ++++ .../src/PyNvBatchAsyncStreamReader.cpp | 717 ++++++++++++++++++ .../src/PyNvOnDemandDecoder.cpp | 2 + .../samples/SampleBatchAsyncStreamAccess.py | 186 +++++ .../tests/test_batch_async_stream_decoder.py | 456 +++++++++++ 10 files changed, 1881 insertions(+), 1 deletion(-) create mode 100644 .claude/skills/docs-compliance/SKILL.md create mode 100644 packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvBatchAsyncStreamReader.hpp create mode 100644 packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvBatchAsyncStreamReader.cpp create mode 100644 packages/on_demand_video_decoder/samples/SampleBatchAsyncStreamAccess.py create mode 100644 packages/on_demand_video_decoder/tests/test_batch_async_stream_decoder.py diff --git a/.claude/skills/docs-compliance/SKILL.md b/.claude/skills/docs-compliance/SKILL.md new file mode 100644 index 0000000..c15ebcb --- /dev/null +++ b/.claude/skills/docs-compliance/SKILL.md @@ -0,0 +1,217 @@ +--- +name: docs-compliance +description: ACCV-Lab documentation conventions and pre-PR compliance check. INVOKE when creating or editing any .md or .rst file under docs/ or packages/*/docs/, when modifying a Python module/class/function/method docstring, or before opening a PR that touches documentation. Provides hard rules for Sphinx role usage, docstring formatting, public-API export requirements, admonition syntax, and a pre-PR checklist with verification commands. +--- + +# ACCV-Lab Documentation Compliance + +## When this skill applies + +- Creating or editing any `.md` / `.rst` file under `docs/` or `packages/*/docs/` +- Modifying a Python module, class, function, or method docstring (anything autodoc renders) +- Preparing a PR that touches documentation, samples, or public-API docstrings + +## Authoritative project references + +Read these for ground truth before deviating from any rule below: + +- `docs/conf.py` — Sphinx configuration (extensions, autodoc options, custom handlers) +- `docs/guides/DOCUMENTATION_SETUP_GUIDE.md` — build pipeline & directory structure +- `docs/guides/FORMATTING_GUIDE.md` — Python/C++ formatting (also affects docstring rendering) +- `docs/spelling_wordlist.txt` — accepted technical-term whitelist +- `docs/_ext/` — local Sphinx extensions (`note_literalinclude`, `module_docstring`, `markdown_note_admonitions`) + +## Hard rules + +### Rule 1 — API references: use Sphinx roles, never bare backticks + +For any `accvlab.*` symbol mentioned in narrative text, use the appropriate role so it cross-links in the rendered HTML. + +| Symbol kind | MyST role (`.md`) | RST role (`.rst`) | +|---|---|---| +| Class | `` {py:class}`~accvlab..` `` | `` :class:`~accvlab..` `` | +| Method | `` {py:meth}`~accvlab...` `` | `` :meth:`~accvlab...` `` | +| Module function | `` {py:func}`~accvlab..` `` | `` :func:`~accvlab..` `` | +| Attribute | `` {py:attr}`~accvlab...` `` | `` :attr:`~accvlab...` `` | + +``` +Bad: + See `lookup()` and `put()` for details. + +Good: + See {py:meth}`~accvlab.on_demand_video_decoder.SharedGopStore.lookup` + and {py:meth}`~accvlab.on_demand_video_decoder.SharedGopStore.put` + for details. +``` + +**Exclusions** — keep bare backticks for: +- Stdlib types (`RuntimeWarning`, `NamedTuple`, `multiprocessing.Lock`) — this project does not cross-ref stdlib in user docs +- Parameter names, field names, prose terms (`access_tick`, `flock`, `spawn`) +- API names appearing inside fenced code blocks (` ```python ` … ` ``` `) — only narrative prose gets roles + +### Rule 2 — `Returns:` block formatting gotcha + +In Google/NumPy-style docstrings, the **first line** after `Returns:` is silently parsed as a return-type annotation if it ends with `:`, even when a real type annotation is on the signature. This produces malformed return docs that look fine in the source but break in the rendered API table. + +``` +Bad: + Returns: + Tuple of three things: + - first + - second + - third + +Good: + Returns: + Tuple containing + + - first + - second + - third +``` + +Lead with prose that does **not** end in `:`, then a blank line, then the bullets. + +### Rule 3 — Public API must be exported + +A new public class or function will not appear in the auto-generated `api.rst` unless **both** of these hold: + +1. It is imported in `packages//accvlab//__init__.py` +2. It is listed in that file's `__all__` + +Internal helpers belong under `_internal/` and are not exported. + +### Rule 4 — Type annotations on public APIs + +Every public function parameter and return value must have a type annotation. `sphinx_autodoc_typehints` renders them into the docs; missing annotations produce gaps in the rendered API table. + +```python +Good: + def get_batch(self, refs: List[GopRef]) -> List[np.ndarray]: + ... +``` + +### Rule 5 — Annotation must match docstring + +When changing a function's signature (parameter types, return type, parameter names), update the corresponding `Args:` / `Returns:` lines in the docstring **in the same edit**. Stale docstrings vs. live signatures are caught in review. + +### Rule 6 — No implementation details in user-facing docs + +User-facing docs (`docs/`, `packages/*/docs/`, public-class docstrings) describe **what the user does**, not **how the framework is implemented**. + +``` +Bad (jargon / impl detail leaked to user): + - put() acquires an flock for atomicity (double-check after acquiring the lock) + - Returns the original decoder + - Uses C++ GetGOP under the hood + +Good: + - put() acquires an flock for atomicity + - Returns the underlying PyNvGopDecoder + - Returns cached data without re-demuxing +``` + +If a phrase would prompt the question *"is there something the user should do?"*, rewrite it. Implementation notes belong in source-level comments or developer-facing docstrings under `_internal/`, not user docs. + +### Rule 7 — Doc build must be warning-free + +`./scripts/build_docs.sh` warnings and errors are **blocking**. Before requesting review: + +```bash +./scripts/build_docs.sh 2>&1 | tee /tmp/docs_build.log +grep -iE 'warning|error' /tmp/docs_build.log +``` + +Resolve every new warning. Common sources: bad role syntax, missing `__all__` exports, malformed `Returns:` blocks, broken cross-refs, unknown spelling. + +### Rule 8 — Admonitions: blockquote form for dual-readable files + +Files that must render correctly in **both** GitHub/IDE preview **and** Sphinx HTML use the blockquote admonition pattern: + +```md +> **ℹ️ Note**: Short tip for the reader. + +> **⚠️ Important**: Crucial warning users must not miss. +``` + +The local `markdown_note_admonitions` extension converts these to Sphinx admonitions at build time. Multi-line notes are supported as long as every line starts with `>`. + +Use fenced admonitions ```` ```{note} ```` / ```` ```{important} ```` **only** in files that are exclusively part of the built docs and never opened in GitHub/IDE. + +### Rule 9 — Edit source, not mirror + +Source-of-truth lives at `packages//docs/`. Files under `docs/contained_package_docs_mirror//docs/` are symlinks regenerated by `mirror_referenced_dirs.py` at build time. Editing the mirror is at best a no-op and at worst destructive (overwritten on next build). + +### Rule 10 — Relative paths in include/image/literalinclude + +Paths inside `.md` / `.rst` directives (`include`, `image`, `literalinclude`, etc.) must be **relative to the current document**. This keeps docs portable: links resolve correctly both in the original package directory and after mirroring into `docs/contained_package_docs_mirror/`. + +``` +Bad: + .. literalinclude:: /home/user/project/packages/foo/examples/demo.py + +Good: + .. literalinclude:: ../examples/demo.py +``` + +Inside Python docstrings, paths are relative to the file that includes the docstring (the autodoc directive's location), so absolute paths are acceptable there. + +### Rule 11 — Sample docs: explain real-use-case provenance and cross-link + +When a sample uses hard-coded values that would normally come from runtime sources (parser output, demuxer results, model outputs, etc.), explicitly document where those values come from in production. Cross-link to a related sample that demonstrates the real flow. + +```python +Good: + # Each task tuple: (video_path, target_frame_id, gop_first_frame, gop_len). + # + # In a real pipeline, gop_first_frame and gop_len would come from a + # demuxer (e.g. GetGOPList returning first_frame_ids / gop_lens). + # See samples/SampleSeparationAccessGOPListAPI.py for an end-to-end + # example. Hard-coded values here keep the demo dependency-free. + tasks = [...] +``` + +## Pre-PR compliance checklist + +Run through these before requesting review on any PR that touches docs or docstrings: + +- [ ] All `accvlab.*` API references in narrative use `{py:meth}` / `{py:func}` / `{py:class}` roles +- [ ] No bare backticks for `accvlab.*` names except in code blocks +- [ ] Admonitions in dual-readable files use the blockquote pattern +- [ ] Edits land in `packages//docs/`, not in the mirror +- [ ] Paths in directives are relative +- [ ] New technical terms added to `docs/spelling_wordlist.txt` +- [ ] `./scripts/build_docs.sh` runs with no new warnings or errors +- [ ] `./scripts/build_docs.sh --spelling` reviewed; report at `docs/_build/spelling/output.txt` +- [ ] Sample docs reference real-use-case origin and cross-link to related samples + +## Verification commands + +Quick scans to surface common violations before review: + +```bash +# 1. Bare backtick API references that should be sphinx roles. +# Customise the regex with the symbols touched by your PR. +grep -rnE '`(SharedGopStore|GopRef|CachedGopDecoder|PyNvGopDecoder|CreateGopDecoder)[A-Za-z_]*\(?\)?`' \ + docs/ packages/*/docs/ 2>/dev/null | grep -v '```' + +# 2. Returns: block immediately followed by a line that ends in ':' (Rule 2 violation). +grep -rEn -A1 'Returns:$' packages/*/accvlab/ | grep -E ':\s*$' + +# 3. Public symbols not exported in root __init__.py (manual diff). +# After adding `class Foo` or `def bar`, confirm: +# - `from . import Foo` (or `bar`) appears in __init__.py +# - `'Foo'` (or `'bar'`) appears in __all__ +grep -E '^(class|def) [A-Z]' packages//accvlab//.py +grep -E "(|__all__)" packages//accvlab//__init__.py + +# 4. Accidental edits inside the mirror directory (Rule 9 violation). +git diff --name-only | grep contained_package_docs_mirror + +# 5. Full doc build with warning surface. +./scripts/build_docs.sh 2>&1 | grep -iE 'warning|error' | grep -v -i 'INFO' + +# 6. Spelling check. +./scripts/build_docs.sh --spelling +cat docs/_build/spelling/output.txt 2>/dev/null +``` \ No newline at end of file diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 515f3ef..4662b77 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -209,3 +209,4 @@ unlinking atomicity picklable ABI +aggregator diff --git a/packages/on_demand_video_decoder/accvlab/on_demand_video_decoder/__init__.py b/packages/on_demand_video_decoder/accvlab/on_demand_video_decoder/__init__.py index 0f8e3b5..0ce2961 100644 --- a/packages/on_demand_video_decoder/accvlab/on_demand_video_decoder/__init__.py +++ b/packages/on_demand_video_decoder/accvlab/on_demand_video_decoder/__init__.py @@ -83,10 +83,12 @@ def _preload_local_ffmpeg() -> None: # C++ core interfaces 'PyNvGopDecoder', 'PyNvSampleReader', + 'PyNvBatchAsyncStreamReader', 'FastStreamInfo', 'DecodedFrameExt', 'RGBFrame', 'CreateSampleReader', + 'CreateBatchAsyncStreamReader', 'GetFastInitInfo', 'SavePacketsToFile', # Python decoder with caching diff --git a/packages/on_demand_video_decoder/docs/sample.md b/packages/on_demand_video_decoder/docs/sample.md index ee0648d..1d0b3e4 100644 --- a/packages/on_demand_video_decoder/docs/sample.md +++ b/packages/on_demand_video_decoder/docs/sample.md @@ -25,6 +25,7 @@ section helps you quickly locate the sample code that matches your requirements. | [SampleDecodeFromGopFilesToListAPI.py](../samples/SampleDecodeFromGopFilesToListAPI.py) | Selective GOP loading | {py:meth}`~accvlab.on_demand_video_decoder.PyNvGopDecoder.LoadGopsToList`, {py:meth}`~accvlab.on_demand_video_decoder.PyNvGopDecoder.DecodeFromGOPListRGB` | | [SampleDecodeFromGopList.py](../samples/SampleDecodeFromGopList.py) | Batch decode from multiple demux results (N demux → 1 decode) | {py:meth}`~accvlab.on_demand_video_decoder.PyNvGopDecoder.DecodeFromGOPListRGB` | | [SampleStreamAsyncAccess.py](../samples/SampleStreamAsyncAccess.py) | Async stream decoding with prefetching | {py:func}`~accvlab.on_demand_video_decoder.CreateSampleReader`, {py:meth}`~accvlab.on_demand_video_decoder.PyNvSampleReader.DecodeN12ToRGBAsync`, {py:meth}`~accvlab.on_demand_video_decoder.PyNvSampleReader.DecodeN12ToRGBAsyncGetBuffer` | +| [SampleBatchAsyncStreamAccess.py](../samples/SampleBatchAsyncStreamAccess.py) | 2D async stream decoding — multiple frames per video per call, with prefetching | {py:func}`~accvlab.on_demand_video_decoder.CreateBatchAsyncStreamReader`, {py:meth}`~accvlab.on_demand_video_decoder.PyNvBatchAsyncStreamReader.Decode`, {py:meth}`~accvlab.on_demand_video_decoder.PyNvBatchAsyncStreamReader.GetBuffer` | | [SampleSharedGopStore.py](../samples/SampleSharedGopStore.py) | Cross-process shared GOP cache for DataLoader | {py:class}`~accvlab.on_demand_video_decoder.SharedGopStore`, {py:class}`~accvlab.on_demand_video_decoder.GopRef` | For details on the **Key APIs**, please refer to the API documentation of the corresponding functions and classes. @@ -43,7 +44,9 @@ If you need random frame access: → Use SampleRandomAccess If you need sequential frame decoding: - If you need async decoding with prefetching for lower latency: + If you need multiple frames per video per call (2D batch): + → Use SampleBatchAsyncStreamAccess + Else if you need async decoding with prefetching for lower latency: → Use SampleStreamAsyncAccess Otherwise: → Use SampleStreamAccess @@ -554,6 +557,134 @@ cd packages/on_demand_video_decoder/samples python SampleStreamAsyncAccess.py ``` +#### 3.2.4 Sample: Batch Async Stream Access (2D) + +**File:** `packages/on_demand_video_decoder/samples/SampleBatchAsyncStreamAccess.py` + +**When to Use** + +The 2D batch async API is preferred over basic async stream access when: +- Each iteration consumes **multiple frames per video** (e.g. multi-sweep + StreamPETR-like training where one batch needs F sweeps × V cameras) +- You want a single in-flight submission to cover V × F frames instead of + V frames +- You want the output as a 2D structure ``out[v][f]`` rather than re-batching + V results F times in Python + +The 1D async API ({py:meth}`~accvlab.on_demand_video_decoder.PyNvSampleReader.DecodeN12ToRGBAsync`) +remains the right choice when you only need one frame per video per +iteration. + +**Key Differences from 1D Async Stream Access** + +| Feature | 1D Async ({py:class}`~accvlab.on_demand_video_decoder.PyNvSampleReader`) | 2D Batch Async ({py:class}`~accvlab.on_demand_video_decoder.PyNvBatchAsyncStreamReader`) | +|---------|---------|---------| +| Frame ids shape | ``List[int]`` (len V) | ``List[List[int]]`` (V × F) | +| Returned structure | ``List[RGBFrame]`` (len V) | ``List[List[RGBFrame]]`` (V × F) | +| Frames decoded per call | V | V × F | +| Result buffer | 1 result, V frames | 1 result, V × F frames | +| Pool sized at construction by | (n/a — per-reader) | ``max_frames_per_decode_call`` | + +**Core APIs** + +- {py:func}`~accvlab.on_demand_video_decoder.CreateBatchAsyncStreamReader`: Construct a 2D batch async reader +- {py:meth}`~accvlab.on_demand_video_decoder.PyNvBatchAsyncStreamReader.Decode`: Submit an async 2D decode (returns immediately) +- {py:meth}`~accvlab.on_demand_video_decoder.PyNvBatchAsyncStreamReader.GetBuffer`: Block until decode is done and return decoded frames + +**Code Walkthrough** + +Construct the reader. ``max_frames_per_decode_call`` is the F upper bound +(per ``Decode()`` call, not per video file): + +```python +import accvlab.on_demand_video_decoder as nvc + +reader = nvc.CreateBatchAsyncStreamReader( + num_of_set=1, + num_of_file=6, # V upper bound + max_frames_per_decode_call=4, # F upper bound (per Decode() call) + iGpu=0, +) +``` + +Build a 2D frame_ids and submit: + +```python +V = len(file_path_list) +F = 4 +# frame_ids[v][f] = f-th frame requested for video v. +# All inner lists must be the same length (jagged inner lengths are rejected). +frame_ids = [[0, 7, 14, 21]] * V + +reader.Decode(file_path_list, frame_ids, as_bgr=False) +# Returns immediately; decoding happens on a background worker thread. +``` + +Retrieve the result: + +```python +out = reader.GetBuffer(file_path_list, frame_ids, as_bgr=False) +# out is List[List[RGBFrame]] indexed [v][f]. +# out[v][f].shape == (H, W, 3), dtype uint8, GPU memory. +``` + +**Two Contracts to Remember** + +> **ℹ️ Note**: When +> {py:meth}`~accvlab.on_demand_video_decoder.PyNvBatchAsyncStreamReader.GetBuffer` +> returns, all GPU work (decode + internal copies) is already complete. You +> can read the returned frames on any CUDA stream — including PyTorch's +> default stream — without additional synchronization. + +> **⚠️ Important**: The returned +> {py:class}`~accvlab.on_demand_video_decoder.RGBFrame` objects are zero-copy +> views into the reader's internal aggregator pool. Submitting the next +> {py:meth}`~accvlab.on_demand_video_decoder.PyNvBatchAsyncStreamReader.Decode` +> reuses that memory. You **must** clone every frame you want to keep +> **before** the next ``Decode()`` call. Skipping the clone is silent data +> corruption. + +**Canonical Prefetch Pattern** + +```python +# Iteration 0: prime the pipeline +reader.Decode(files, frame_ids_0, as_bgr=False) +out = reader.GetBuffer(files, frame_ids_0, as_bgr=False) + +# Clone before submitting the next batch +tensors_0 = [ + [torch.as_tensor(out[v][f], device="cuda").clone() for f in range(F)] + for v in range(V) +] + +# Prefetch iteration 1 in parallel with processing iteration 0 +reader.Decode(files, frame_ids_1, as_bgr=False) +# ... process tensors_0 here (model forward, etc.) ... + +# Iteration 1: GetBuffer is usually already-ready because of the prefetch +out = reader.GetBuffer(files, frame_ids_1, as_bgr=False) +tensors_1 = [ + [torch.as_tensor(out[v][f], device="cuda").clone() for f in range(F)] + for v in range(V) +] +reader.Decode(files, frame_ids_2, as_bgr=False) +# ... process tensors_1 ... +``` + +**Resolution Uniformity Requirement** + +All V videos in a single ``Decode()`` call must share the same resolution. +This is checked at decode time; a mismatch raises through +{py:meth}`~accvlab.on_demand_video_decoder.PyNvBatchAsyncStreamReader.GetBuffer`. +Multi-camera setups typically already satisfy this. + +**Running the Sample** + +```bash +cd packages/on_demand_video_decoder/samples +python SampleBatchAsyncStreamAccess.py +``` + ### 3.3 Separation Access Decoding Separation Access mode decouples demuxing and decoding into two separate stages. This provides fine-grained control over the video processing pipeline and enables advanced optimization strategies. diff --git a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/CMakeLists.txt b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/CMakeLists.txt index 6b50631..f795060 100644 --- a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/CMakeLists.txt +++ b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/CMakeLists.txt @@ -55,6 +55,7 @@ set(PY_SOURCES src/PyNvGopDecoder_separate_decoder.cpp src/PyNvVideoReader.cpp src/PyNvSampleReader.cpp + src/PyNvBatchAsyncStreamReader.cpp src/PyNvGopDemuxer.cpp src/PyRGBFrame.cpp src/GPUMemoryPool.cpp diff --git a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvBatchAsyncStreamReader.hpp b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvBatchAsyncStreamReader.hpp new file mode 100644 index 0000000..69bc1e2 --- /dev/null +++ b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvBatchAsyncStreamReader.hpp @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "FixedSizeVideoReaderMap.hpp" +#include "GPUMemoryPool.hpp" +#include "NvCodecUtils.h" +#include "PyNvVideoReader.hpp" +#include "PyRGBFrame.hpp" +#include "ThreadPool.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#ifdef IS_DEBUG_BUILD +class __attribute__((visibility("default"))) PyNvBatchAsyncStreamReader { +#else +class PyNvBatchAsyncStreamReader { +#endif + public: + /** + * Construct an async 2D batch stream decoder. + * + * Args: + * num_of_set: number of decoder slots per file (same meaning as in PyNvSampleReader) + * num_of_file: maximum number of videos per decode call (V upper bound) + * max_frames_per_decode_call: maximum number of frames per video per decode call (F upper bound) + * iGpu: target GPU device id + * bSuppressNoColorRangeWarning: suppress warning if no color range can be extracted + */ + PyNvBatchAsyncStreamReader(int num_of_set, int num_of_file, int max_frames_per_decode_call, int iGpu, + bool bSuppressNoColorRangeWarning = false); + + ~PyNvBatchAsyncStreamReader(); + + /** + * Clear all underlying video readers (also waits for any pending async task). + */ + void clearAllReaders(); + + /** + * Release per-reader memory pools AND the 2D aggregator pool. + * Decoder state is preserved for efficient forward decoding. + */ + void ReleaseMemPools(); + + /** + * Release all decoder instances. After this, readers will be lazily re-created + * on the next decode call. + */ + void ReleaseDecoder(); + + /** + * Submit an async 2D decode task. Returns immediately. + * + * Contract: at most one in-flight task. Submitting while a previous task is + * pending causes a warning, joins the previous task, and discards its result. + * + * Args: + * filepaths: list of video file paths (len == V_call, V_call <= num_of_file) + * frame_ids_2d: 2D frame id list (outer V_call, inner F_call). All inner + * lengths must be equal. F_call <= max_frames_per_decode_call. + * frame_ids_2d[v][f] = the f-th frame requested for video v. + * as_bgr: output BGR if true, RGB if false + */ + void Decode(const std::vector& filepaths, + const std::vector>& frame_ids_2d, bool as_bgr); + + /** + * Block until the pending async task completes and return its decoded frames. + * + * Contract: + * - GPU side: all decode + D2D copies complete (worker did cuStreamSynchronize). + * Downstream torch / CUDA ops can read without further sync. + * - Buffer: the returned RGBFrame objects reference an internal aggregator + * pool. They are invalidated on the next Decode() call. Users + * must consume / clone the data before calling Decode() again. + * + * Args: must match the request previously submitted via Decode(). + * Returns: List[List[RGBFrame]], outer V_call, inner F_call, indexed [v][f] + * to match the input frame_ids_2d shape. + */ + std::vector> GetBuffer(const std::vector& filepaths, + const std::vector>& frame_ids_2d, + bool as_bgr); + + /** + * Wait for any pending async decode task to complete. No-op if no pending task. + */ + void waitForPendingAsyncTask(); + + /** + * Clear any cached async result (does NOT clear in-flight task). + */ + void clearDecodeResultBuffer(); + + private: + struct DecodeResult2D { + std::vector file_path_list; + std::vector> frame_id_list_2d; + bool as_bgr; + std::vector> decoded_frames; + std::exception_ptr exception; + bool is_ready; + + DecodeResult2D() : as_bgr(false), is_ready(false) {} + }; + + // Build a key string for diagnostic reporting on request mismatch. + std::string generate_request_key(const std::vector& filepaths, + const std::vector>& frame_ids_2d, bool as_bgr); + + // Field-wise compare a buffered result against an incoming request. + bool validate_request(const DecodeResult2D& result, const std::vector& filepaths, + const std::vector>& frame_ids_2d, bool as_bgr); + + // Throw if Decode() input violates declared invariants. + void validate_decode_input(const std::vector& filepaths, + const std::vector>& frame_ids_2d); + + // Sync 1D decode over the owned VideoReaderMap. The 2D worker calls this + // F times. Mirrors PyNvSampleReader::run_rgb_out but operates on this + // class's reader pool so the two classes don't share decoder state. + std::vector run_rgb_out_1d(const std::vector& filepaths, + const std::vector& frame_ids, bool as_bgr); + + private: + bool suppress_no_color_range_given_warning = false; + bool destroy_context = false; + CUcontext cu_context = nullptr; + CUstream cu_stream = nullptr; + int gpu_id = 0; + int num_of_file = 0; + int num_of_set = 0; + int max_frames_per_decode_call = 0; + + std::vector VideoReaderMap; + + // 2D-specific aggregator pool. Contiguous V*F*frame_bytes block of GPU memory. + // Only accessed by the single decode worker; no thread-safety required. + GPUMemoryPool agg_pool; + + // Async machinery (mirrors PyNvSampleReader) + ConcurrentQueue decode_result_queue; // capacity = 1 + ThreadRunner decode_worker; + std::mutex async_mutex; + bool has_pending_task = false; +}; diff --git a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvBatchAsyncStreamReader.cpp b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvBatchAsyncStreamReader.cpp new file mode 100644 index 0000000..e3b373e --- /dev/null +++ b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvBatchAsyncStreamReader.cpp @@ -0,0 +1,717 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PyNvBatchAsyncStreamReader.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "nvtx3/nvtx3.hpp" + +namespace py = pybind11; + +namespace { +// Parallel per-file fanout, mirroring PyNvSampleReader.cpp's local helper. +// Each filepath/frame_id pair is processed in its own thread; first exception +// captured rethrows after join. +template +std::vector process_frames_in_parallel(const std::vector& filepaths, + const std::vector& frame_ids, + const std::vector& video_readers, + Func process_frame) { + nvtxRangePushA("Process Frames in Parallel (2D worker)"); + std::vector res(filepaths.size()); + std::exception_ptr eptr = nullptr; + std::mutex mutex; + + std::vector threads; + threads.reserve(filepaths.size()); + + for (size_t i = 0; i < filepaths.size(); ++i) { + threads.emplace_back([&, i]() { + try { + res[i] = process_frame(video_readers[i], frame_ids[i]); + } catch (const std::exception&) { + std::lock_guard lock(mutex); + if (!eptr) eptr = std::current_exception(); + } + }); + } + for (auto& t : threads) t.join(); + + if (eptr) { + nvtxRangePop(); + std::rethrow_exception(eptr); + } + nvtxRangePop(); + return res; +} +} // namespace + +PyNvBatchAsyncStreamReader::PyNvBatchAsyncStreamReader(int num_of_set, int num_of_file, + int max_frames_per_decode_call, int iGpu, + bool bSuppressNoColorRangeWarning) + : suppress_no_color_range_given_warning(bSuppressNoColorRangeWarning), + gpu_id(iGpu), + num_of_file(num_of_file), + num_of_set(num_of_set), + max_frames_per_decode_call(max_frames_per_decode_call), + decode_result_queue(1), // Buffer size = 1 + has_pending_task(false) { + if (num_of_set <= 0) { + throw std::invalid_argument("num_of_set must be > 0, got " + std::to_string(num_of_set)); + } + if (num_of_file <= 0) { + throw std::invalid_argument("num_of_file must be > 0, got " + std::to_string(num_of_file)); + } + if (max_frames_per_decode_call <= 0) { + throw std::invalid_argument("max_frames_per_decode_call must be > 0, got " + + std::to_string(max_frames_per_decode_call)); + } + +#ifdef IS_DEBUG_BUILD + std::cout << "New PyNvBatchAsyncStreamReader object" << std::endl; +#endif + + ck(cuInit(0)); + int nGpu = 0; + ck(cuDeviceGetCount(&nGpu)); + if (iGpu < 0 || iGpu >= nGpu) { + std::cout << "GPU ordinal out of range. Should be within [0, " << nGpu - 1 << "]" << std::endl; + } + + this->destroy_context = false; + this->cu_context = nullptr; + + CUdevice cuDevice = 0; + ck(cuDeviceGet(&cuDevice, this->gpu_id)); + ck(cuDevicePrimaryCtxRetain(&this->cu_context, cuDevice)); + this->destroy_context = true; + + if (!this->cu_context) { + throw std::domain_error( + "[ERROR] Failed to create a cuda context. Create a " + "cudacontext and pass it as named argument 'cudacontext = app_ctx'"); + } + + // Push context temporarily for stream creation; pop immediately so the + // destructor can run on any thread without context-leak issues. + ck(cuCtxPushCurrent(this->cu_context)); + ck(cuStreamCreate(&this->cu_stream, CU_STREAM_DEFAULT)); + ck(cuCtxPopCurrent(NULL)); + + VideoReaderMap.reserve(this->num_of_file); + for (int i = 0; i < this->num_of_file; i++) { + VideoReaderMap.emplace_back(this->num_of_set); + } +} + +PyNvBatchAsyncStreamReader::~PyNvBatchAsyncStreamReader() { +#ifdef IS_DEBUG_BUILD + std::cout << "Delete PyNvBatchAsyncStreamReader object" << std::endl; +#endif + + bool need_join = false; + { + std::lock_guard lock(async_mutex); + need_join = has_pending_task; + } + if (need_join) { + decode_worker.join(); + } + + decode_result_queue.clear(); + + this->clearAllReaders(); + + agg_pool.HardRelease(); + + if (this->cu_stream) { + ck(cuCtxPushCurrent(this->cu_context)); + ck(cuStreamDestroy(this->cu_stream)); + ck(cuCtxPopCurrent(NULL)); + } + if (this->destroy_context) { + ck(cuDevicePrimaryCtxRelease(this->gpu_id)); + } +} + +void PyNvBatchAsyncStreamReader::waitForPendingAsyncTask() { + bool need_join = false; + { + std::lock_guard lock(async_mutex); + need_join = has_pending_task; + } + if (need_join) { + decode_worker.join(); + } +} + +void PyNvBatchAsyncStreamReader::clearDecodeResultBuffer() { + while (!decode_result_queue.empty()) { + decode_result_queue.pop_front(); + } +} + +void PyNvBatchAsyncStreamReader::clearAllReaders() { + waitForPendingAsyncTask(); + for (auto& reader_map : VideoReaderMap) { + reader_map.clearAllReaders(); + } +} + +void PyNvBatchAsyncStreamReader::ReleaseMemPools() { + waitForPendingAsyncTask(); + for (auto& reader_map : VideoReaderMap) { + reader_map.releaseAllMemPools(); + } + agg_pool.HardRelease(); +} + +void PyNvBatchAsyncStreamReader::ReleaseDecoder() { + waitForPendingAsyncTask(); + clearAllReaders(); +} + +std::string PyNvBatchAsyncStreamReader::generate_request_key( + const std::vector& filepaths, const std::vector>& frame_ids_2d, + bool as_bgr) { + std::ostringstream oss; + oss << as_bgr << ":"; + for (size_t v = 0; v < filepaths.size(); ++v) { + oss << filepaths[v] << "["; + if (v < frame_ids_2d.size()) { + for (size_t f = 0; f < frame_ids_2d[v].size(); ++f) { + oss << frame_ids_2d[v][f]; + if (f + 1 < frame_ids_2d[v].size()) oss << ","; + } + } + oss << "]"; + if (v + 1 < filepaths.size()) oss << ";"; + } + return oss.str(); +} + +bool PyNvBatchAsyncStreamReader::validate_request(const DecodeResult2D& result, + const std::vector& filepaths, + const std::vector>& frame_ids_2d, + bool as_bgr) { + if (result.as_bgr != as_bgr) return false; + if (result.file_path_list.size() != filepaths.size()) return false; + if (result.frame_id_list_2d.size() != frame_ids_2d.size()) return false; + for (size_t v = 0; v < filepaths.size(); ++v) { + if (result.file_path_list[v] != filepaths[v]) return false; + if (result.frame_id_list_2d[v].size() != frame_ids_2d[v].size()) return false; + for (size_t f = 0; f < frame_ids_2d[v].size(); ++f) { + if (result.frame_id_list_2d[v][f] != frame_ids_2d[v][f]) return false; + } + } + return true; +} + +void PyNvBatchAsyncStreamReader::validate_decode_input( + const std::vector& filepaths, const std::vector>& frame_ids_2d) { + if (filepaths.size() != frame_ids_2d.size()) { + throw std::invalid_argument("filepaths.size() (" + std::to_string(filepaths.size()) + + ") must equal frame_ids_2d.size() (" + + std::to_string(frame_ids_2d.size()) + ")"); + } + if (filepaths.empty()) { + throw std::invalid_argument("filepaths must not be empty"); + } + if (filepaths.size() > static_cast(this->num_of_file)) { + throw std::invalid_argument("Number of files (" + std::to_string(filepaths.size()) + + ") exceeds num_of_file (" + std::to_string(this->num_of_file) + + ") specified at construction."); + } + + const size_t expected_F = frame_ids_2d[0].size(); + if (expected_F == 0) { + throw std::invalid_argument("frame_ids_2d[0] must not be empty"); + } + if (expected_F > static_cast(this->max_frames_per_decode_call)) { + throw std::invalid_argument( + "frames per video (" + std::to_string(expected_F) + ") exceeds max_frames_per_decode_call (" + + std::to_string(this->max_frames_per_decode_call) + ") specified at construction."); + } + for (size_t v = 0; v < frame_ids_2d.size(); ++v) { + if (frame_ids_2d[v].size() != expected_F) { + throw std::invalid_argument( + "frame_ids_2d[" + std::to_string(v) + "].size() (" + + std::to_string(frame_ids_2d[v].size()) + ") must equal frame_ids_2d[0].size() (" + + std::to_string(expected_F) + "); jagged inner lengths are not supported"); + } + } +} + +std::vector PyNvBatchAsyncStreamReader::run_rgb_out_1d( + const std::vector& filepaths, const std::vector& frame_ids, bool as_bgr) { + // Caller (the worker) has already validated outer/inner sizes via + // validate_decode_input. Here we only resolve readers and dispatch in parallel. + std::vector video_readers(filepaths.size()); + + nvtxRangePushA("Get Video Readers (2D worker)"); + for (size_t i = 0; i < filepaths.size(); ++i) { + FixedSizeVideoReaderMap& reader_map = this->VideoReaderMap[i]; + PyNvVideoReader* video_reader = nullptr; + // Only allocate a new reader when there's room AND the file isn't already + // cached, matching PyNvSampleReader::run_rgb_out's memory-leak guard. + if (reader_map.notFull() && !reader_map.contains(filepaths[i])) { + video_reader = + new PyNvVideoReader(filepaths[i], this->gpu_id, this->cu_context, this->cu_stream); + } + auto cur_video_reader = reader_map.find(filepaths[i], video_reader); + video_readers[i] = cur_video_reader; + } + nvtxRangePop(); + + return process_frames_in_parallel(filepaths, frame_ids, video_readers, + [as_bgr](PyNvVideoReader* reader, int frame_id) { + return reader->run_single_rgb_out(frame_id, as_bgr); + }); +} + +void PyNvBatchAsyncStreamReader::Decode(const std::vector& filepaths, + const std::vector>& frame_ids_2d, + bool as_bgr) { + validate_decode_input(filepaths, frame_ids_2d); + + std::unique_lock lock(async_mutex); + + // Drain any prior in-flight task before starting a new one. Matches the + // 1D async behavior (warn + join + discard) but additionally clears stale + // results unconditionally, so the queue can never carry over a previous + // submission's frames into a fresh Decode() call. + if (has_pending_task) { + std::cerr << "[WARNING] PyNvBatchAsyncStreamReader::Decode: A previous async decode task is " + "still running. Waiting for it to complete before starting the new task." + << std::endl; + lock.unlock(); + decode_worker.join(); + lock.lock(); + has_pending_task = false; + } + while (!decode_result_queue.empty()) { + decode_result_queue.pop_front(); + } + + // Snapshot inputs for the worker closure. + auto filepaths_cap = filepaths; + auto frame_ids_cap = frame_ids_2d; + bool as_bgr_cap = as_bgr; + + has_pending_task = true; + decode_worker.start([this, filepaths_cap, frame_ids_cap, as_bgr_cap]() { + DecodeResult2D result; + result.file_path_list = filepaths_cap; + result.frame_id_list_2d = frame_ids_cap; + result.as_bgr = as_bgr_cap; + result.is_ready = false; + + // Worker runs on a fresh std::thread that has no current CUDA context. + // Push our context for the duration of the worker so all cuMemcpyDtoDAsync / + // cuStreamSynchronize calls below have something on the stack. The inner + // PyNvVideoReader path also pushes/pops its own copy of cu_context, which + // nests cleanly with this outer push. + bool ctx_pushed = false; + try { + CUDA_DRVAPI_CALL(cuCtxPushCurrent(this->cu_context)); + ctx_pushed = true; + } catch (...) { + // If we can't even push the context, surface the error to GetBuffer. + result.exception = std::current_exception(); + result.is_ready = true; + decode_result_queue.push_back(result); + std::lock_guard lk(async_mutex); + has_pending_task = false; + return; + } + + try { + nvtxRangePushA("Batch 2D Decode Worker"); + const int V = static_cast(filepaths_cap.size()); + const int F = static_cast(frame_ids_cap[0].size()); + + result.decoded_frames.assign(V, std::vector{}); + for (int v = 0; v < V; ++v) result.decoded_frames[v].reserve(F); + + size_t frame_bytes = 0; + std::tuple ref_shape{}; + std::tuple ref_stride{}; + std::string ref_typestr; + + for (int f = 0; f < F; ++f) { + std::vector fids_at_f(V); + for (int v = 0; v < V; ++v) fids_at_f[v] = frame_ids_cap[v][f]; + + auto frames = this->run_rgb_out_1d(filepaths_cap, fids_at_f, as_bgr_cap); + + if (f == 0) { + // Resolution / format snapshot from the first decoded frame. + ref_shape = frames[0].shape; + ref_stride = frames[0].stride; + ref_typestr = frames[0].typestr; + const size_t H = std::get<0>(ref_shape); + const size_t W = std::get<1>(ref_shape); + frame_bytes = H * W * 3; + + // Per the spec: a single 2D call requires uniform resolution + // across videos, since they all share one aggregator pool. + for (int v = 0; v < V; ++v) { + if (frames[v].shape != ref_shape) { + std::ostringstream oss; + oss << "PyNvBatchAsyncStreamReader: video resolution mismatch in batch — " + << "expected " << H << "x" << W << " (from video 0) but video " << v + << " produced " + << std::get<0>(frames[v].shape) << "x" + << std::get<1>(frames[v].shape) << "."; + throw std::runtime_error(oss.str()); + } + } + + // Pre-allocate the full V*F*frame_bytes block in one shot so + // subsequent AddElement calls hand out sequential offsets. + agg_pool.EnsureSizeAndSoftReset(static_cast(V) * F * frame_bytes, false); + } + + for (int v = 0; v < V; ++v) { + void* dst = agg_pool.AddElement(frame_bytes); + CUDA_DRVAPI_CALL(cuMemcpyDtoDAsync(reinterpret_cast(dst), + frames[v].data, frame_bytes, cu_stream)); + + const std::vector shape_vec = {std::get<0>(ref_shape), + std::get<1>(ref_shape), 3}; + const std::vector stride_vec = {std::get<0>(ref_stride), + std::get<1>(ref_stride), + std::get<2>(ref_stride)}; + result.decoded_frames[v].emplace_back( + shape_vec, stride_vec, ref_typestr, + reinterpret_cast(cu_stream), + reinterpret_cast(dst), + /*readOnly=*/false, /*isBGR=*/as_bgr_cap); + } + } + + // Single terminal sync: all decode kernels + V*F D2D copies on cu_stream + // are FIFO; one sync drains the whole pipeline so the result becomes + // GPU-visible to any consumer stream by the time GetBuffer returns. + CUDA_DRVAPI_CALL(cuStreamSynchronize(cu_stream)); + + result.is_ready = true; + decode_result_queue.push_back(result); + + nvtxRangePop(); + } catch (...) { + // On failure, soft-reset the pool so the next Decode reuses the same + // allocation. The buffered RGBFrame views become invalid — clear them + // and stash the exception for GetBuffer to rethrow. + agg_pool.SoftRelease(); + result.decoded_frames.clear(); + result.exception = std::current_exception(); + result.is_ready = true; + decode_result_queue.push_back(result); + } + + if (ctx_pushed) { + // Best-effort pop. If this throws, the worker is exiting anyway — + // swallow so we always release async_mutex below. + CUcontext popped = nullptr; + cuCtxPopCurrent(&popped); + } + + { + std::lock_guard lk(async_mutex); + has_pending_task = false; + } + }); +} + +std::vector> PyNvBatchAsyncStreamReader::GetBuffer( + const std::vector& filepaths, const std::vector>& frame_ids_2d, + bool as_bgr) { + { + std::lock_guard lock(async_mutex); + if (!has_pending_task && decode_result_queue.empty()) { + throw std::runtime_error( + "PyNvBatchAsyncStreamReader::GetBuffer: No pending decode task and buffer is empty. " + "Call Decode first before calling GetBuffer."); + } + } + + // Blocks until worker pushes (worker may still be running). + DecodeResult2D result = decode_result_queue.pop_front(); + + if (!result.is_ready) { + throw std::runtime_error( + "PyNvBatchAsyncStreamReader::GetBuffer: Internal error — result not ready when popped."); + } + if (result.exception) { + std::rethrow_exception(result.exception); + } + if (!validate_request(result, filepaths, frame_ids_2d, as_bgr)) { + std::ostringstream oss; + oss << "PyNvBatchAsyncStreamReader::GetBuffer: Request parameters do not match buffered " + "result. Expected: " + << generate_request_key(filepaths, frame_ids_2d, as_bgr) + << ", Got: " + << generate_request_key(result.file_path_list, result.frame_id_list_2d, result.as_bgr); + throw std::runtime_error(oss.str()); + } + + return result.decoded_frames; +} + +void Init_PyNvBatchAsyncStreamReader(py::module& m) { + m.def( + "CreateBatchAsyncStreamReader", + [](int num_of_set, int num_of_file, int max_frames_per_decode_call, int iGpu, + bool suppressNoColorRangeWarning) { + return std::make_shared( + num_of_set, num_of_file, max_frames_per_decode_call, iGpu, suppressNoColorRangeWarning); + }, + py::arg("num_of_set"), py::arg("num_of_file"), py::arg("max_frames_per_decode_call"), + py::arg("iGpu") = 0, py::arg("suppressNoColorRangeWarning") = false, + R"pbdoc( + Create a PyNvBatchAsyncStreamReader for 2D async stream decoding. + + This reader is **async-only** and **2D-only**: it accepts a list of + video files and a 2D list of frame ids (one list per video), submits + the decode in the background, and returns the decoded frames as + ``List[List[RGBFrame]]`` indexed ``[v][f]``. + + Args: + num_of_set: Number of decoder slots per file. + num_of_file: Maximum number of videos per decode call (V upper bound). + max_frames_per_decode_call: Maximum number of frames per video per decode + call (F upper bound). The internal aggregator pool is sized for + this peak. + iGpu: GPU device id. + suppressNoColorRangeWarning: Suppress warning when no color range + can be extracted (limited / MPEG range is assumed). + + Returns: + PyNvBatchAsyncStreamReader instance. + + Example: + >>> reader = CreateBatchAsyncStreamReader( + ... num_of_set=1, num_of_file=6, max_frames_per_decode_call=4) + >>> reader.Decode(filepaths, frame_ids_2d, as_bgr=False) + >>> out = reader.GetBuffer(filepaths, frame_ids_2d, as_bgr=False) + >>> # out[v][f] is an RGBFrame; clone before next Decode() call + )pbdoc"); + + py::class_>( + m, "PyNvBatchAsyncStreamReader", py::module_local(), + R"pbdoc( + NVIDIA GPU-accelerated 2D async stream video decoder. + + This class submits a 2D decode request (V videos × F frames per video) + to a background C++ worker thread and returns the decoded frames as + ``List[List[RGBFrame]]`` indexed ``[v][f]``. It is async-only (no sync + ``Decode`` method) and 2D-only. The 1D ``PyNvSampleReader`` class is + unchanged and serves the 1-frame-per-video case. + + Async model + ~~~~~~~~~~~ + + At most one in-flight task at a time; the internal result buffer holds + a single result. ``Decode()`` returns immediately; ``GetBuffer()`` + blocks until the worker pushes its result. + + Calling ``Decode()`` while a previous task is still pending will: + 1. Print a warning to stderr. + 2. Join the previous worker. + 3. Discard the previous result (whether already pushed or not). + 4. Start the new task. + + Calling ``Decode()`` after a previous task has completed but its result + has not been retrieved will also discard the previous result and start + a new task. Always pair every ``Decode()`` with a matching + ``GetBuffer()`` for the results you want to keep. + + Contracts + ~~~~~~~~~ + + These two contracts are non-negotiable. Read both before doing + anything with the returned frames. + + **Contract 1 — GetBuffer() returns when GPU work is complete.** + The worker performs ``cuStreamSynchronize`` on its internal stream + before pushing the result. By the time ``GetBuffer()`` returns to + Python, all decoder kernels and device-to-device copies for the + returned frames have finished. Downstream torch / CUDA ops can read + the frame data on any stream without further user-level + synchronization. + + **Contract 2 — RGBFrames are invalidated on the next Decode() call.** + The returned ``RGBFrame`` objects are zero-copy views into an internal + aggregator pool. Submitting the next ``Decode()`` reuses the same + pool memory for the new batch's frames. You MUST consume or clone + every frame you want to keep BEFORE the next ``Decode()`` call. + Typical idiom:: + + reader.Decode(files, frame_ids_a, as_bgr=False) + out = reader.GetBuffer(files, frame_ids_a, as_bgr=False) + tensors = [[torch.as_tensor(out[v][f], device="cuda").clone() + for f in range(F)] for v in range(V)] + # Safe to call Decode() again — tensors own their own memory. + reader.Decode(files, frame_ids_b, as_bgr=False) + + Skipping the clone leads to silent data corruption: PyTorch will not + know its tensor's backing memory got overwritten by the next decode. + + Memory sizing + ~~~~~~~~~~~~~ + + The internal aggregator pool is sized at the first ``Decode()`` call + to ``V * max_frames_per_decode_call * H * W * 3`` bytes, where H × W is + derived from the first decoded frame. All videos in a single + ``Decode()`` call must have the same resolution; this is checked at + decode time and a resolution mismatch raises through ``GetBuffer()``. + + See also + ~~~~~~~~ + + - ``samples/SampleBatchAsyncStreamAccess.py`` for the canonical + prefetch loop. + - ``PyNvSampleReader`` for the 1-frame-per-video API. + )pbdoc") + .def(py::init(), py::arg("num_of_set"), py::arg("num_of_file"), + py::arg("max_frames_per_decode_call"), py::arg("iGpu") = 0, + py::arg("suppressNoColorRangeWarning") = false) + .def( + "Decode", + [](std::shared_ptr& reader, + const std::vector& filepaths, + const std::vector>& frame_ids_2d, bool as_bgr) { + try { + reader->Decode(filepaths, frame_ids_2d, as_bgr); + } catch (const std::exception& e) { + throw std::runtime_error(e.what()); + } + }, + py::arg("filepaths"), py::arg("frame_ids"), py::arg("as_bgr") = false, + py::call_guard(), + R"pbdoc( + Submit an async 2D decode task. Returns immediately. + + Args: + filepaths: List of video file paths. ``len(filepaths) <= num_of_file``. + frame_ids: 2D list of frame ids. ``len(frame_ids) == len(filepaths)``; + each inner list must be the same length (no jagged inner dims) + and ``<= max_frames_per_decode_call``. ``frame_ids[v][f]`` is the f-th + frame requested for video v. + as_bgr: Output BGR (True) or RGB (False). + + Raises: + RuntimeError: invalid input dimensions, exceeded construction limits, + jagged inner lengths, or non-positive sizes. + + Discards prior result: + Calling Decode() unconditionally invalidates any prior buffered + result. If a previous task is still running, it is joined first + (with a warning to stderr) and its result discarded. Always pair + every Decode() with a matching GetBuffer() for results you want + to keep. + + Lifetime contract: + Frames previously returned by GetBuffer() become invalid as + soon as you call Decode() again. Clone everything you need to + keep BEFORE this call. See class docstring for details. + )pbdoc") + .def( + "GetBuffer", + [](std::shared_ptr& reader, + const std::vector& filepaths, + const std::vector>& frame_ids_2d, bool as_bgr) { + try { + return reader->GetBuffer(filepaths, frame_ids_2d, as_bgr); + } catch (const std::exception& e) { + throw std::runtime_error(e.what()); + } + }, + py::arg("filepaths"), py::arg("frame_ids"), py::arg("as_bgr") = false, + py::call_guard(), + R"pbdoc( + Block until the pending async task completes; return decoded frames. + + Args: + filepaths, frame_ids, as_bgr: MUST exactly match the request that + was passed to the previous Decode() call. Used to validate + you are retrieving the result you submitted. + + Returns: + ``List[List[RGBFrame]]`` indexed ``[v][f]``, mirroring the shape + of the input ``frame_ids``. Each ``RGBFrame.shape == (H, W, 3)``, + ``dtype == uint8``, lives in GPU memory, and is a zero-copy view + into the reader's internal aggregator pool. + + Contract 1 — GPU-ready on return: + The worker performs ``cuStreamSynchronize`` before pushing the + result, so by the time this call returns, all decoder kernels + and D2D copies are complete on the GPU. Downstream torch / + CUDA ops can read the frames on any stream without further + user-level synchronization. + + Contract 2 — Invalidated on next Decode(): + The returned RGBFrame objects share memory with the reader's + internal pool. Submitting the next Decode() reuses that memory + for the new batch. You MUST clone (e.g. + ``torch.as_tensor(frame, device="cuda").clone()``) every frame + you want to keep BEFORE calling Decode() again. Skipping the + clone is silent data corruption — PyTorch tensors will not + know their backing memory was overwritten. + + Raises: + RuntimeError: no pending task and empty buffer; or request + parameters do not match the buffered result (the result + is then consumed and unrecoverable — same semantics as + the 1D async API). Worker-side exceptions (file not + found, invalid frame id, resolution mismatch across V) + are propagated unchanged. + )pbdoc") + .def( + "clearAllReaders", + [](std::shared_ptr& reader) { reader->clearAllReaders(); }, + R"pbdoc( + Clear all underlying video readers. Waits for pending async task first. + )pbdoc") + .def( + "release_device_memory", + [](std::shared_ptr& reader) { reader->ReleaseMemPools(); }, + R"pbdoc( + Release per-reader memory pools and the 2D aggregator pool. + Decoder state is preserved for efficient forward decoding. + )pbdoc") + .def( + "release_decoder", + [](std::shared_ptr& reader) { reader->ReleaseDecoder(); }, + R"pbdoc( + Release all decoder instances. Readers are re-created lazily on next decode. + )pbdoc"); +} diff --git a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvOnDemandDecoder.cpp b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvOnDemandDecoder.cpp index 03fdf88..cc21bd4 100644 --- a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvOnDemandDecoder.cpp +++ b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvOnDemandDecoder.cpp @@ -53,10 +53,12 @@ static auto ThrowOnCudaError = [](CUresult res, int lineNum = -1) { void Init_PyNvGopDecoder(py::module& m); void Init_PyNvVideoReader(py::module& m); void Init_PyNvSampleReader(py::module& m); +void Init_PyNvBatchAsyncStreamReader(py::module& m); PYBIND11_MODULE(_PyNvOnDemandDecoder, m) { Init_PyNvVideoReader(m); Init_PyNvGopDecoder(m); Init_PyNvSampleReader(m); + Init_PyNvBatchAsyncStreamReader(m); m.doc() = R"pbdoc( accvlab.on_demand_video_decoder diff --git a/packages/on_demand_video_decoder/samples/SampleBatchAsyncStreamAccess.py b/packages/on_demand_video_decoder/samples/SampleBatchAsyncStreamAccess.py new file mode 100644 index 0000000..8caee01 --- /dev/null +++ b/packages/on_demand_video_decoder/samples/SampleBatchAsyncStreamAccess.py @@ -0,0 +1,186 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +``accvlab.on_demand_video_decoder`` - 2D Batch Async Stream Access Sample + +Demonstrates ``PyNvBatchAsyncStreamReader``: an async-only, 2D-indexed video +decoder that returns multiple frames per video per submission, designed for +StreamPETR-like workloads where each iteration consumes a *batch of sweeps* +(V cameras × F frames). + +Key differences from ``SampleStreamAsyncAccess.py`` (the 1D async sample): +- ``Decode`` accepts ``List[List[int]]`` frame_ids instead of ``List[int]`` +- ``GetBuffer`` returns ``List[List[RGBFrame]]`` indexed ``[v][f]`` +- A single in-flight task batches V*F decodes (vs V in the 1D version) +- The internal aggregator pool is sized at construction via + ``max_frames_per_decode_call`` so it can hold all V*F frames simultaneously +""" + +import os +import torch +import accvlab.on_demand_video_decoder as nvc + + +def SampleBatchAsyncStreamAccess(): + """ + Show the canonical prefetch pattern with the 2D async stream reader. + + Per iteration: + iter 0: Decode(batch_0) -> GetBuffer(batch_0) -> clone -> process + + Decode(batch_1) to prefetch the next iteration + iter i: GetBuffer(batch_i) -> clone -> process + + Decode(batch_{i+1}) to prefetch + + Each "batch" is V videos × F frames (a 2D request). + """ + + # ── Configuration ───────────────────────────────────────────────────── + max_num_files_to_use = 6 + max_frames_per_decode_call = 4 + + print("Initializing NVIDIA GPU video decoder for 2D async batch stream access...") + reader = nvc.CreateBatchAsyncStreamReader( + num_of_set=1, + num_of_file=max_num_files_to_use, + max_frames_per_decode_call=max_frames_per_decode_call, + iGpu=0, + ) + print( + f"Decoder initialized on GPU 0 — V <= {max_num_files_to_use} videos, " + f"F <= {max_frames_per_decode_call} frames per video per call" + ) + + # ── Resolve sample clip paths ──────────────────────────────────────── + # + # In a real pipeline (e.g. StreamPETR-like training) the V file paths + # would come from a dataset's per-sample multi-camera record, where each + # entry maps a camera position (CAM_FRONT, CAM_BACK_LEFT, ...) to its + # video file. See ``samples/SampleStreamAsyncAccess.py`` for the 1D + # variant of this pattern. + # + # The hard-coded "moving shape" clips below ship with this package so + # the sample runs out-of-the-box without external data. + base_dir = os.path.dirname(__file__) + sample_clip_dir = os.path.join(base_dir, "..", "data", "sample_clip") + file_path_list = [ + os.path.join(sample_clip_dir, "moving_shape_circle_h265.mp4"), + os.path.join(sample_clip_dir, "moving_shape_ellipse_h265.mp4"), + os.path.join(sample_clip_dir, "moving_shape_hexagon_h265.mp4"), + os.path.join(sample_clip_dir, "moving_shape_rect_h265.mp4"), + os.path.join(sample_clip_dir, "moving_shape_triangle_h265.mp4"), + ] + V = len(file_path_list) + F = max_frames_per_decode_call + print(f"Processing {V} videos × {F} frames per iteration") + + # ── Build the 2D frame_id schedule ─────────────────────────────────── + # In production this schedule would come from your dataset sampler: + # for each training step, it yields a (V × F) array of frame ids that + # together form one batch (e.g. F sweeps × V cameras). The fixed stride + # below is just for demonstration — replace with whatever your sampler + # produces. + num_iterations = 4 + step = 7 # frame stride within a batch + batch_offset = F * step # frame_ids[i+1] starts where frame_ids[i] ends + + def make_batch(iter_idx): + # frame_ids_2d[v][f] = iter_idx * batch_offset + f * step + # All V videos request the same F frames in this sample; real workloads + # can stagger per-video frame ids (the API supports it — see tests). + return [ + [iter_idx * batch_offset + f * step for f in range(F)] + for _ in range(V) + ] + + print(f"\nStarting {num_iterations} prefetched 2D-batch iterations") + print("Pattern: GetBuffer(N) -> process(N) -> Decode(N+1) overlaps with processing") + + # ── Main loop ──────────────────────────────────────────────────────── + for idx in range(num_iterations): + frame_ids_2d = make_batch(idx) + print(f"\n--- Iteration {idx + 1}/{num_iterations} ---") + print(f"Frame ids (V×F = {V}×{F}): {frame_ids_2d[0]}") + + try: + if idx == 0: + # First iteration: synchronous-feeling start (Decode + immediate Get). + print("[Async] Submitting initial decode for batch 0") + reader.Decode(file_path_list, frame_ids_2d, as_bgr=False) + print("[Async] Retrieving batch 0 from buffer") + decoded = reader.GetBuffer(file_path_list, frame_ids_2d, as_bgr=False) + else: + # Subsequent iterations: result was prefetched at end of previous + # iteration; this GetBuffer call may already have data ready. + print(f"[Async] Retrieving prefetched batch {idx} from buffer") + decoded = reader.GetBuffer(file_path_list, frame_ids_2d, as_bgr=False) + + assert len(decoded) == V, f"unexpected V: {len(decoded)}" + for v in range(V): + assert len(decoded[v]) == F, f"unexpected F at v={v}: {len(decoded[v])}" + + # ── CRITICAL: deep-copy before the next Decode() submission ─ + # + # ``decoded[v][f]`` is a zero-copy RGBFrame referencing the reader's + # internal aggregator pool. The pool is overwritten on the next + # Decode() call, so we MUST clone every frame we want to keep + # *before* that next Decode happens. + print("Cloning V×F frames to PyTorch tensors (safe to keep across iterations)") + tensor_grid = [ + [torch.as_tensor(decoded[v][f], device="cuda").clone() + for f in range(F)] + for v in range(V) + ] + + # Now stack however your model wants. Example: [V, F, H, W, 3]. + batch = torch.stack([torch.stack(row, dim=0) for row in tensor_grid], dim=0) + + print( + f"Batch shape: {tuple(batch.shape)}, dtype: {batch.dtype}, " + f"device: {batch.device}" + ) + print(f"Value range: [{batch.min().item()}, {batch.max().item()}]") + + # ── Prefetch the next batch ───────────────────────────────── + # This Decode() returns immediately. The worker decodes batch N+1 + # in the background while we "process" batch N below. + if idx < num_iterations - 1: + next_frame_ids_2d = make_batch(idx + 1) + print(f"[Async] Prefetching batch {idx + 1} ({next_frame_ids_2d[0]})") + reader.Decode(file_path_list, next_frame_ids_2d, as_bgr=False) + + # ── Simulated "process" stage (model forward, etc.) ───────── + # This is where you'd run inference. The prefetched decode is + # happening on the worker thread concurrently. + print("[Processing] (simulated) — prefetched decode is running in parallel") + + except Exception as e: + print(f"Decode failed in iteration {idx + 1}: {type(e).__name__}: {e}") + print("Common causes:") + print(" - frame id past last keyframe / beyond video length") + print(" - filepaths/frame_ids mismatch between Decode and GetBuffer") + print(" - non-existent video file") + print(" - insufficient GPU memory for V*F*H*W*3 aggregator pool") + continue + + print("\n" + "=" * 60) + print("2D async batch stream decoding completed successfully!") + print("=" * 60) + + +if __name__ == "__main__": + print("NVIDIA accvlab.on_demand_video_decoder — 2D Batch Async Stream Sample") + print("=" * 70) + print() + SampleBatchAsyncStreamAccess() diff --git a/packages/on_demand_video_decoder/tests/test_batch_async_stream_decoder.py b/packages/on_demand_video_decoder/tests/test_batch_async_stream_decoder.py new file mode 100644 index 0000000..edbee8d --- /dev/null +++ b/packages/on_demand_video_decoder/tests/test_batch_async_stream_decoder.py @@ -0,0 +1,456 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for ``PyNvBatchAsyncStreamReader`` (2D async stream decoder). + +Layout: + Section A — construction / module exports + Section B — Decode() input validation (independent of decode impl) + Section C — maintenance methods + Section D — functional 2D decode (requires PR2 Decode/GetBuffer impl) + Section E — precision: 2D output must bit-match sequential 1D calls + Section F — async behavior: in-flight slot, request validation, error paths + +Sections D / E / F are marked with ``requires_decode_impl`` so they auto-skip +on PR1 (skeleton-only) and auto-enable once PR2 lands the real worker body. +""" + +import pytest +import torch + +import utils +import accvlab.on_demand_video_decoder as nvc + + +# --------------------------------------------------------------------------- +# Stage detection: is Decode actually implemented? +# --------------------------------------------------------------------------- + +def _probe_decode_implemented(): + """Return True iff PyNvBatchAsyncStreamReader.Decode is implemented. + + During PR1 the C++ Decode body raises ``"not yet implemented (PR2)"``; + PR2 makes this probe return True naturally because the call path either + succeeds or raises a different real error (file not found, etc.). + """ + r = nvc.CreateBatchAsyncStreamReader(num_of_set=1, num_of_file=1, max_frames_per_decode_call=1) + try: + r.Decode(["__probe__.mp4"], [[0]], False) + except RuntimeError as e: + if "not yet implemented" in str(e).lower(): + return False + except Exception: + pass + return True + + +_DECODE_IMPLEMENTED = _probe_decode_implemented() +requires_decode_impl = pytest.mark.skipif( + not _DECODE_IMPLEMENTED, + reason="Decode/GetBuffer not yet implemented (PR1 skeleton). Auto-enables in PR2.", +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _select_sample_videos(): + """Return a list of sample video file paths from the project's test data. + + No fallback — the data dir is guaranteed to exist by the project layout. + """ + files = utils.select_random_clip(utils.get_data_dir()) + assert files is not None and len(files) > 0, "test data missing" + return files + + +def _reference_decode_via_1d(files, frame_ids_2d, as_bgr): + """Compute the ground-truth 2D decode result by calling the synchronous + 1D API once per frame index, accumulating cloned tensors. + + Returns out[v][f] as a torch.Tensor (already cloned, safe to keep). + """ + V = len(files) + F = len(frame_ids_2d[0]) + reader_1d = nvc.CreateSampleReader(num_of_set=1, num_of_file=V, iGpu=0) + + out = [[None] * F for _ in range(V)] + for f in range(F): + fids_at_f = [frame_ids_2d[v][f] for v in range(V)] + frames_1d = reader_1d.DecodeN12ToRGB(files, fids_at_f, as_bgr) + # The per-reader pool inside PyNvSampleReader is overwritten on the next + # call, so we MUST clone each frame before the loop advances. + for v in range(V): + out[v][f] = torch.as_tensor(frames_1d[v], device="cuda").clone() + return out + + +def _make_reader(V=4, F=8): + return nvc.CreateBatchAsyncStreamReader( + num_of_set=1, num_of_file=V, max_frames_per_decode_call=F, iGpu=0 + ) + + +# =========================================================================== +# Section A — construction / module exports +# =========================================================================== + +def test_module_exports(): + """Factory and class are re-exported at the package top level.""" + assert hasattr(nvc, "CreateBatchAsyncStreamReader") + assert hasattr(nvc, "PyNvBatchAsyncStreamReader") + + +def test_construct_valid(): + """Construction with valid args succeeds and exposes the expected methods.""" + r = _make_reader() + methods = {m for m in dir(r) if not m.startswith("_")} + expected = {"Decode", "GetBuffer", "clearAllReaders", + "release_device_memory", "release_decoder"} + assert expected.issubset(methods), f"missing methods: {expected - methods}" + + +def test_destructor_clean(): + """Reader destructs cleanly with no pending task.""" + r = _make_reader() + del r + + +@pytest.mark.parametrize( + "kwargs", + [ + dict(num_of_set=0, num_of_file=1, max_frames_per_decode_call=1), + dict(num_of_set=-1, num_of_file=1, max_frames_per_decode_call=1), + dict(num_of_set=1, num_of_file=0, max_frames_per_decode_call=1), + dict(num_of_set=1, num_of_file=-1, max_frames_per_decode_call=1), + dict(num_of_set=1, num_of_file=1, max_frames_per_decode_call=0), + dict(num_of_set=1, num_of_file=1, max_frames_per_decode_call=-1), + ], +) +def test_construct_rejects_invalid_args(kwargs): + """Non-positive sizing arguments are rejected at construction.""" + with pytest.raises((ValueError, RuntimeError)): + nvc.CreateBatchAsyncStreamReader(**kwargs) + + +# =========================================================================== +# Section B — Decode() input validation +# =========================================================================== + +def test_validate_size_mismatch(): + """filepaths.size() != frame_ids_2d.size() rejected at entry.""" + files = _select_sample_videos() + r = _make_reader(V=len(files)) + bad_frame_ids = [[0]] # length 1, but files has more entries + with pytest.raises(RuntimeError, match=r"filepaths\.size\(\).*frame_ids_2d\.size\(\)"): + r.Decode(files, bad_frame_ids, False) + + +def test_validate_empty_filepaths(): + """Empty filepaths list is rejected.""" + r = _make_reader() + with pytest.raises(RuntimeError, match=r"filepaths must not be empty"): + r.Decode([], [], False) + + +def test_validate_too_many_files(): + """Exceeding num_of_file is rejected at entry.""" + files = _select_sample_videos() + r = _make_reader(V=1) # num_of_file=1, but we'll pass len(files) > 1 + if len(files) <= 1: + pytest.skip("need at least 2 sample videos for this test") + with pytest.raises(RuntimeError, match=r"exceeds num_of_file"): + r.Decode(files, [[0]] * len(files), False) + + +def test_validate_too_many_frames(): + """Exceeding max_frames_per_decode_call is rejected at entry.""" + files = _select_sample_videos() + r = _make_reader(V=len(files), F=4) + with pytest.raises(RuntimeError, match=r"exceeds max_frames_per_decode_call"): + r.Decode(files, [list(range(100))] * len(files), False) + + +def test_validate_jagged_inner_lengths(): + """Per-video inner lists must be the same length across V.""" + files = _select_sample_videos() + if len(files) < 2: + pytest.skip("need at least 2 sample videos for this test") + r = _make_reader(V=len(files)) + # First video asks for 3 frames, second asks for 2. + jagged = [[0, 7, 14], [0, 7]] + [[0, 7, 14]] * (len(files) - 2) + with pytest.raises(RuntimeError, match=r"jagged inner lengths are not supported"): + r.Decode(files, jagged, False) + + +def test_validate_empty_inner_list(): + """Inner frame_ids list must be non-empty.""" + files = _select_sample_videos() + r = _make_reader(V=len(files)) + with pytest.raises(RuntimeError, match=r"frame_ids_2d\[0\] must not be empty"): + r.Decode(files, [[]] * len(files), False) + + +# =========================================================================== +# Section C — maintenance methods +# =========================================================================== + +def test_maintenance_idle_callable(): + """Maintenance methods are safe no-ops when no task is pending.""" + r = _make_reader() + r.clearAllReaders() + r.release_device_memory() + r.release_decoder() + # Order-independent and idempotent. + r.release_decoder() + r.clearAllReaders() + r.release_device_memory() + + +# =========================================================================== +# Section D — functional 2D decode (requires PR2) +# =========================================================================== + +@requires_decode_impl +def test_decode_basic_2d_shape(): + """Decode returns List[List[RGBFrame]] with the expected outer/inner shape.""" + files = _select_sample_videos() + V = len(files) + F = 4 + frame_ids_2d = [[0, 7, 14, 21]] * V + + r = _make_reader(V=V, F=F) + r.Decode(files, frame_ids_2d, as_bgr=False) + out = r.GetBuffer(files, frame_ids_2d, as_bgr=False) + + assert len(out) == V, f"outer len should be V={V}, got {len(out)}" + for v in range(V): + assert len(out[v]) == F, f"out[{v}] inner len should be F={F}, got {len(out[v])}" + + +@requires_decode_impl +def test_decode_basic_2d_dtype_and_device(): + """Each frame is a uint8, 3-channel tensor on CUDA.""" + files = _select_sample_videos() + V = len(files) + F = 2 + frame_ids_2d = [[0, 7]] * V + + r = _make_reader(V=V, F=F) + r.Decode(files, frame_ids_2d, as_bgr=False) + out = r.GetBuffer(files, frame_ids_2d, as_bgr=False) + + for v in range(V): + for f in range(F): + t = torch.as_tensor(out[v][f], device="cuda") + assert t.dtype == torch.uint8, f"out[{v}][{f}].dtype = {t.dtype}" + assert t.ndim == 3, f"out[{v}][{f}].ndim = {t.ndim}" + assert t.shape[-1] == 3, f"out[{v}][{f}].shape = {tuple(t.shape)}" + assert t.device.type == "cuda" + + +@requires_decode_impl +def test_decode_single_frame_per_video(): + """F=1 is supported (degenerates to behavior similar to 1D API).""" + files = _select_sample_videos() + V = len(files) + frame_ids_2d = [[0]] * V + + r = _make_reader(V=V, F=1) + r.Decode(files, frame_ids_2d, as_bgr=False) + out = r.GetBuffer(files, frame_ids_2d, as_bgr=False) + + assert len(out) == V + for v in range(V): + assert len(out[v]) == 1 + + +@requires_decode_impl +def test_decode_single_video_multi_frame(): + """V=1 with multiple frames is supported.""" + files = _select_sample_videos() + single = [files[0]] + F = 4 + frame_ids_2d = [[0, 7, 14, 21]] + + r = _make_reader(V=1, F=F) + r.Decode(single, frame_ids_2d, as_bgr=False) + out = r.GetBuffer(single, frame_ids_2d, as_bgr=False) + + assert len(out) == 1 + assert len(out[0]) == F + + +# =========================================================================== +# Section E — precision: 2D output must bit-match sequential 1D calls +# =========================================================================== + +@requires_decode_impl +@pytest.mark.parametrize("as_bgr", [False, True]) +def test_precision_matches_1d_reference(as_bgr): + """2D output is bit-identical to F sequential 1D DecodeN12ToRGB calls. + + The 2D worker internally loops the same sync 1D path F times, so the + pixel data must match exactly (uint8, atol=0, rtol=0). + """ + files = _select_sample_videos() + V = len(files) + F = 4 + frame_ids_2d = [[0, 7, 14, 21]] * V + + # Ground truth via sequential 1D + clone + ref = _reference_decode_via_1d(files, frame_ids_2d, as_bgr) + + # Under test + r2d = _make_reader(V=V, F=F) + r2d.Decode(files, frame_ids_2d, as_bgr=as_bgr) + out_2d = r2d.GetBuffer(files, frame_ids_2d, as_bgr=as_bgr) + + for v in range(V): + for f in range(F): + actual = torch.as_tensor(out_2d[v][f], device="cuda") + torch.testing.assert_close( + actual, ref[v][f], atol=0, rtol=0, + msg=lambda m, vv=v, ff=f: ( + f"pixel mismatch at v={vv}, f={ff}, as_bgr={as_bgr}: {m}" + ), + ) + + +@requires_decode_impl +def test_precision_matches_1d_with_different_frame_sets_per_video(): + """Each video can request a different set of frame ids; result still matches 1D.""" + files = _select_sample_videos() + V = len(files) + F = 3 + + # Stagger frame ids per video + frame_ids_2d = [[v * 2 + i * 5 for i in range(F)] for v in range(V)] + + ref = _reference_decode_via_1d(files, frame_ids_2d, as_bgr=False) + + r2d = _make_reader(V=V, F=F) + r2d.Decode(files, frame_ids_2d, as_bgr=False) + out_2d = r2d.GetBuffer(files, frame_ids_2d, as_bgr=False) + + for v in range(V): + for f in range(F): + actual = torch.as_tensor(out_2d[v][f], device="cuda") + torch.testing.assert_close(actual, ref[v][f], atol=0, rtol=0) + + +# =========================================================================== +# Section F — async behavior: in-flight slot, request validation, error paths +# =========================================================================== + +@requires_decode_impl +def test_get_buffer_without_decode_raises(): + """GetBuffer with no pending task raises RuntimeError.""" + files = _select_sample_videos() + r = _make_reader(V=len(files)) + with pytest.raises(RuntimeError): + r.GetBuffer(files, [[0]] * len(files), False) + + +@requires_decode_impl +def test_get_buffer_request_mismatch_files_raises(): + """GetBuffer with different filepaths than Decode raises.""" + files = _select_sample_videos() + V = len(files) + if V < 2: + pytest.skip("need at least 2 sample videos") + frame_ids_2d = [[0]] * V + + r = _make_reader(V=V) + r.Decode(files, frame_ids_2d, False) + + swapped = files[:] + swapped[0], swapped[1] = swapped[1], swapped[0] + with pytest.raises(RuntimeError): + r.GetBuffer(swapped, frame_ids_2d, False) + + +@requires_decode_impl +def test_get_buffer_request_mismatch_frames_raises(): + """GetBuffer with different frame_ids than Decode raises.""" + files = _select_sample_videos() + V = len(files) + frame_ids_2d = [[0]] * V + + r = _make_reader(V=V) + r.Decode(files, frame_ids_2d, False) + + different = [[7]] * V + with pytest.raises(RuntimeError): + r.GetBuffer(files, different, False) + + +@requires_decode_impl +def test_get_buffer_request_mismatch_as_bgr_raises(): + """GetBuffer with different as_bgr flag than Decode raises.""" + files = _select_sample_videos() + V = len(files) + frame_ids_2d = [[0]] * V + + r = _make_reader(V=V) + r.Decode(files, frame_ids_2d, as_bgr=False) + with pytest.raises(RuntimeError): + r.GetBuffer(files, frame_ids_2d, as_bgr=True) + + +@requires_decode_impl +def test_resubmit_keeps_only_latest_result(): + """Two Decode() calls back-to-back: only the second result is retrievable. + + GetBuffer() pops then validates — on a mismatch the result is consumed and + cannot be retrieved a second time (same semantics as the 1D async API). + So the strongest check we can make is: after re-Decoding, only the *new* + parameters retrieve a result; the *old* parameters error. + """ + files = _select_sample_videos() + V = len(files) + F = 2 + frame_ids_a = [[0, 7]] * V + frame_ids_b = [[14, 21]] * V + + # Variant 1: re-Decode then Get with new params — should succeed. + r = _make_reader(V=V, F=F) + r.Decode(files, frame_ids_a, False) + r.Decode(files, frame_ids_b, False) + out_b = r.GetBuffer(files, frame_ids_b, False) + assert len(out_b) == V and len(out_b[0]) == F + + # Variant 2: re-Decode then Get with old params — should mismatch. + r2 = _make_reader(V=V, F=F) + r2.Decode(files, frame_ids_a, False) + r2.Decode(files, frame_ids_b, False) + with pytest.raises(RuntimeError, match=r"do not match buffered result"): + r2.GetBuffer(files, frame_ids_a, False) + + +@requires_decode_impl +def test_invalid_file_propagates_exception(): + """A decode error in the worker (e.g. nonexistent file) is rethrown at Get.""" + files = _select_sample_videos() + V = len(files) + bad_files = list(files) + bad_files[0] = "/__definitely_not_a_real_file__.mp4" + frame_ids_2d = [[0]] * V + + r = _make_reader(V=V) + r.Decode(bad_files, frame_ids_2d, False) + with pytest.raises(RuntimeError): + r.GetBuffer(bad_files, frame_ids_2d, False)