Unify producer batching across all read blocks (#400)#401
Merged
Conversation
…ayoga-io/datayoga into batch_size_in_std_read_block
Brainstormed design for unifying batch handling across all 7 producer blocks (std/read, files/read_csv, relational/read, parquet/read, redis/read_stream, azure/read_event_hub, http/receiver). Closes the gap behind #294, #295, #296, #377 by making the Producer base class own batching via a new produce_chunks() hook. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Task-by-task TDD plan covering: schema fragment loader, Producer base class, and per-producer migrations (std/read, files/read_csv, parquet/read, relational/read, redis/read_stream, http/receiver, azure/read_event_hub), plus autogen + docs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Explicitly aclose() the producer async generator on consumer exit so the underlying pump task and aiohttp server are torn down cleanly. Removes a "Task was destroyed but it is pending!" warning at test teardown. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…-unification # Conflicts: # core/src/datayoga_core/blocks/std/read/block.py # core/src/datayoga_core/blocks/std/read/block.schema.json # docs/reference/blocks/std_read.md # schemas/job.schema.json
One-line docstrings on every method, helper, and inner function added or modified by this PR: Producer base class, all 7 migrated producers, and the per-block test helpers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pt (#400) - Run isort/autopep8 on test files; collapse the blank line between third-party imports (pytest, etc.) and datayoga_core imports that isort flagged. - Rewrite the $inherit resolution in scripts/generate-docs.sh to use only the Python standard library, so the docs CI job (which installs only node) no longer hits ModuleNotFoundError on prometheus_client when importing datayoga_core. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
unit-tests CI does `pip install .[test]`. The azure/read_event_hub test module imports the block, which imports azure.eventhub at module load. Without azure-eventhub in the test extras, pytest's collection fails on ModuleNotFoundError. Other producer test modules (parquet, redis, http, relational) already work because their backing deps are in [test]. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
formatting-check runs prettier --check on all .md including these. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR centralizes producer batching in the Producer base class so read blocks expose consistent batch_size behavior, adds shared schema fragments for batchable/streamable producers, and updates docs/tests around the migration.
Changes:
- Adds
Producer.produce_chunks()and base re-chunking/flush behavior. - Migrates read producers to the new hook and adds
fetch_size/max_batch_sizewhere needed. - Adds
$inheritschema resolution plus regenerated schemas/reference docs.
Reviewed changes
Copilot reviewed 46 out of 54 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
core/src/datayoga_core/producer.py |
Implements base producer batching and flush logic. |
core/src/datayoga_core/schema_utils.py |
Adds $inherit schema-fragment resolver. |
core/src/datayoga_core/block.py |
Resolves inherited schema fragments for block validation. |
core/src/datayoga_core/job.py |
Resolves inherited fragments in generated job schema. |
core/src/datayoga_core/resources/schemas/batchable.schema.json |
Defines shared batch_size property. |
core/src/datayoga_core/resources/schemas/streamable.schema.json |
Defines shared streaming batch_size/flush_ms properties. |
core/src/datayoga_core/blocks/*/read*/block.py |
Migrates producer implementations to produce_chunks(). |
core/src/datayoga_core/blocks/*/block.schema.json |
Uses shared batching schema fragments. |
core/src/datayoga_core/tests/* and block tests |
Adds coverage for schema inheritance and producer batching. |
core/pyproject.toml |
Adds Azure test dependencies. |
scripts/generate-docs.sh |
Resolves $inherit before reference-doc generation. |
schemas/job.schema.json |
Regenerated aggregate schema with inherited properties. |
docs/reference/** |
Regenerated reference docs for producer batching properties. |
docs/processing-strategies.md |
Documents producer batching, streaming flush, and Azure migration. |
docs/superpowers/specs/2026-05-28-producer-batching-unification-design.md |
Adds design spec. |
docs/superpowers/plans/2026-05-28-producer-batching-unification.md |
Adds implementation plan. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
#400) Two correctness fixes flagged by the Copilot PR reviewer: 1. Source errors no longer become silent EOS. The pump now captures non-cancellation exceptions and re-raises them on the consumer side after flushing the partial buffer, so a Redis disconnect, broken CSV, or DB read error fails the job loudly instead of being treated as end-of-stream against truncated input. 2. The internal queue is now bounded (maxsize=1), restoring the backpressure the old yield-driven model had. Without this, large bounded sources (parquet, relational, csv) could pre-load the entire table/file into memory while downstream was processing batch 1. The pump's `finally: put(EOS)` is skipped on cancellation to avoid deadlocking against a full queue. Also: corrected processing-strategies docs to say "up to batch_size" instead of "exactly batch_size", since partial batches fire on EOS and flush_ms timeout. Three new tests: - test_source_errors_propagate_instead_of_silent_eos - test_source_error_flushes_buffer_before_raising - test_pump_does_not_outrun_consumer_unboundedly Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
One-liner docstrings on every test_* function: producer batching, schema inherit, and per-block read tests. Matches the docstring coverage applied to production code earlier in this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Audit pass: the test literally asserted
block.properties.get("max_batch_size", 300) == 300
which is testing the standard library, not Block code. Removed.
Remaining 4 azure tests cover validation, schema shape, accepted properties,
and the documented breaking-change behavior.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The 2225-line plan was execution scaffolding for the work that now ships. The spec remains as the architectural record. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per review discussion: drop the custom \$inherit extension and use
JSON Schema's standard allOf + \$ref composition instead. The on-disk
schemas are now idiomatic JSON Schema, understood by any standards-
compliant tool.
Changes:
- Each producer block schema gains "\$schema": draft/2019-09 and uses
allOf: [{"\$ref": "../../../resources/schemas/<fragment>.schema.json"}]
to inherit batch_size (and flush_ms for streaming producers).
- additionalProperties: false -> unevaluatedProperties: false, which is
composition-aware (the additionalProperties + allOf interaction is a
known JSON Schema gotcha that rejects allOf-contributed properties).
- schema_utils.resolve_inherits -> resolve_refs: walks the schema,
inlines local-file \$refs recursively, detects cycles. The validation
code path (Block.validate, Job.get_json_schema) stays unchanged —
resolved schemas are flat.
- Tests in test_schema_inherit.py rewritten for the new mechanics:
inlining, transitive resolution, cycle detection, non-local refs
passthrough, default base-dir fallback.
- generate-docs.sh: walks standard \$ref instead of \$inherit, and also
flattens allOf properties for jsonschema2mk's benefit (docs-only).
- Aggregate schemas/job.schema.json regenerated.
External \$ref-aware tools (IDE plugins, OpenAPI exporters) can now
follow the schemas without our custom resolver. jsonschema2mk is the
one tool that doesn't grok \$ref, so the docs generator keeps its
pre-resolution step.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four corrections after PR review and migration to standard JSON Schema: - Status: Design -> Implemented in PR #401. - flush_ms code snippet now shows the bounded queue (maxsize=1) and source-error propagation that Copilot review surfaced, plus the cancelled-flag dance that the bounded queue requires. - Producer ABC risk note: produce_chunks is the new override hook but not formally @AbstractMethod, so legacy produce() overrides keep working (correcting an earlier overstatement). - Event Hub schema risk note: we use unevaluatedProperties: false, not additionalProperties: false (composition-aware). - Drop CHANGELOG mention (no CHANGELOG in this repo; PR description carries the breaking change note). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Section said schema gains additionalProperties: false; actually it gains unevaluatedProperties: false (composition-aware). Also reframed the "rejects batch_size: 300 loudly" claim, which was always wrong — that literal property name still validates after the rename, just with new semantics. Typos are what additionalProperties/unevaluatedProperties catches. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The file now tests schema_utils.resolve_refs (standard JSON Schema \$ref resolution), not the old custom \$inherit extension. Filename was stale. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot re-review caught a real regression: my PR set count=batch_size on all xreadgroup calls, including the pending-entry-list reads (id="0"). Combined with the unconditional flip to read_pending=False after the first call, this meant: only the first batch_size pending entries got processed per job session; anything beyond was stranded until restart. The "stay in pending mode until empty" attempt I tried first didn't work: XREADGROUP id="0" always returns from the start of the PEL (since the producer doesn't ack inside produce_chunks), so a smaller count just makes us re-read the same first page forever. Fix: revert to the pre-PR semantic for the pending read — count=None drains the entire PEL in one call. Keep count=batch_size for the new-message read (id=">") so the #377 batching contract still applies to live streams. The producer's yield-as-a-chunk behavior (the actual fix for #377) is unchanged. Updated tests: - test_redis_new_message_read_uses_count_equal_to_batch_size: pending call uses count=None, new-message call uses count=batch_size - test_redis_drains_full_pel_in_one_call_even_when_larger_than_batch_size: 20 pending entries drain in a single call; base class re-chunks to batch_size=5 -> four batches of 5 Real-Redis smoke against Redis 7 with PEL=25, batch_size=5: produces batches=[5,5,5,5,5], all PEL delivered, no re-reads. The other Copilot comment (sync redis-py with block=0 freezes the asyncio event loop) is a real architectural concern but pre-existing — same behavior in pre-PR code. Deferred to a follow-up issue if needed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three final verification additions: 1. Property-based rechunker tests (hypothesis): probe ~1000 generated chunk-size/batch-size combinations to verify the rechunker's invariants — record conservation, order preservation, all batches well-formed, no empty emissions, partial-only-at-end. Adds hypothesis to test extras. Catches the class of bug Copilot flagged where my existing tests only covered specific inputs, not the contract. 2. test_external_task_cancellation_cleans_up_pump: simulates the Job.shutdown / Job.run cancellation path (cancelling the outer task that iterates produce()) and verifies no producer pump task is orphaned afterward. The spec claims this works; now there's a test. 3. mypy fix: Producer.DEFAULT_FLUSH_MS was inferred as None-only, making subclass overrides with int fail strict type-checking. Now typed as Optional[int]. mypy clean on all 9 changed source files. 90 tests pass (was 84: +5 property tests, +1 external-cancel test). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This was referenced May 31, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #400. Side-effect closes #293, #294, #295, #296, #377.
Summary
Move batching out of individual producer blocks into the
Producerbase class so every read block has consistentbatch_sizebehavior, and three buggy producers stop yielding single records.Producer.produce_chunks()override hook; baseproduce()re-chunks subclass output into batches of up tobatch_size(smaller on EOS orflush_ms), with bounded backpressure (asyncio.Queue(maxsize=1)) and source-error propagation.batchable.schema.json,streamable.schema.json) + new$inheritconvention resolved at load time byschema_utils.resolve_inherits(wired intoBlock.get_json_schemaandJob.get_json_schema).std/read,files/read_csv,parquet/read,relational/read,redis/read_stream,http/receiver,azure/read_event_hub.relational/readgains a separatefetch_size(default 10000) so DB round-trip size stays independent of pipelinebatch_size.generate-docs.shresolves$inheritbeforejsonschema2mk(stdlib-only Python, no datayoga_core import) so per-block reference docs render inherited properties.Spec:
docs/superpowers/specs/2026-05-28-producer-batching-unification-design.mdBreaking change
azure/read_event_hub.batch_sizeis renamed tomax_batch_size. The namebatch_sizenow means pipeline batch size on this block, consistent with every other producer.Users with
batch_size: <N>in YAML forazure/read_event_hub(where it used to control the SDK callback size) must rename it tomax_batch_size: <N>to preserve previous behavior. The literalbatch_size: <N>still validates but with the new pipeline-level meaning. Schema now also hasadditionalProperties: falseso typos fail loudly.Other behavior notes (non-breaking, worth a callout)
redis/read_streamsnapshot mode now drains all available pending+new records before exiting, rather than exiting after the first non-pending read. This is a strict improvement but a subtle semantic shift.batch_size(default 1000). Streaming producers (redis/read_stream,http/receiver,azure/read_event_hub) also exposeflush_ms(default 1000) — partial batches flush after that many ms of inactivity instead of being held indefinitely.Test plan
redis_to_relationalpasses end-to-end against real Redis (closesredis.read_streamproducer sends messages one by one, should be in batches based on batch_size arg #377)files/read_csv → std/writeruns from the scaffold and produces expected output; smoke-tested std/read, files/read_csv, parquet/read end-to-end with batchingredis/read_streamsnapshot mode with batch_size=10 on 25 messages produces batches of[10, 10, 5], acks correctlyFileNotFoundErrorpropagates (not silent EOS)max_batch_sizeinstead ofbatch_sizeif SDK callback size was being configuredReview status
🤖 Generated with Claude Code