Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
124bb93
Use sa.sql.text for SQL statement execution in Block class
spicy-sauce Apr 1, 2025
4773f9a
Add batch processing capability to Block class and update schema for …
spicy-sauce Apr 1, 2025
c02ab04
update json schemas
invalid-email-address Apr 1, 2025
3a43bc1
update autogenerated docs
invalid-email-address Apr 1, 2025
0b51d63
Fix log message to remove unnecessary exclamation mark in batch size …
spicy-sauce Apr 1, 2025
4bf88eb
Merge branch 'batch_size_in_std_read_block' of https://github.com/dat…
spicy-sauce Apr 1, 2025
13fc0a7
Merge main into feature/when-block-in-redis-lookup
spicy-sauce Dec 28, 2025
8f4f992
update json schemas
spicy-sauce Dec 28, 2025
807d61a
Increase timeout for integration tests from 10 to 15 minutes
spicy-sauce Dec 28, 2025
633d9bf
Add design spec for producer batching unification (#400)
spicy-sauce May 28, 2026
0696f9c
Add implementation plan for producer batching unification (#400)
spicy-sauce May 28, 2026
5c178b6
Add $inherit schema fragment resolver (#400)
spicy-sauce May 28, 2026
cb126c6
Tighten $inherit resolver: reject non-list, guard against nested (#400)
spicy-sauce May 28, 2026
0931918
Remove unused List import in schema_utils (#400)
spicy-sauce May 28, 2026
c9dbe92
Producer base class re-chunks via produce_chunks (#400)
spicy-sauce May 28, 2026
f1311d8
Migrate std/read to produce_chunks (#400, #296)
spicy-sauce May 28, 2026
12c13fb
Migrate files/read_csv to produce_chunks (#400)
spicy-sauce May 28, 2026
1af0c66
Migrate parquet/read to produce_chunks, fix one-by-one yield (#400, #…
spicy-sauce May 28, 2026
85ac26a
Migrate relational/read to produce_chunks, add fetch_size (#400, #295)
spicy-sauce May 28, 2026
3b72998
Migrate http/receiver to produce_chunks (#400)
spicy-sauce May 28, 2026
0b774ac
Migrate redis/read_stream to batched xreadgroup (#400, #377)
spicy-sauce May 28, 2026
38cf4ec
Migrate azure/read_event_hub; rename batch_size -> max_batch_size (#4…
spicy-sauce May 28, 2026
b67bc4a
Regenerate JSON schemas and reference docs after producer batching (#…
spicy-sauce May 28, 2026
6069465
Resolve $inherit before jsonschema2mk so block docs include batch_siz…
spicy-sauce May 28, 2026
9dbc5d3
Document producer batching model in processing-strategies (#400)
spicy-sauce May 28, 2026
5c2eab4
Clean up http/receiver test teardown (#400)
spicy-sauce May 28, 2026
50102aa
Merge remote-tracking branch 'origin/main' into 400-producer-batching…
spicy-sauce May 28, 2026
05f4b01
Add docstrings to all methods touched in this PR (#400)
spicy-sauce May 28, 2026
0b8d8f7
Fix CI: isort formatting + stdlib-only $inherit resolver in docs scri…
spicy-sauce May 28, 2026
4e3b3fc
Add azure-eventhub deps to test extras for CI (#400)
spicy-sauce May 28, 2026
a3d9275
Format superpowers spec and plan with prettier (#400)
spicy-sauce May 28, 2026
5734e03
Address Copilot review: propagate source errors + bounded backpressur…
spicy-sauce May 28, 2026
0dda422
Add docstrings to all test functions added in this PR (#400)
spicy-sauce May 28, 2026
c1e2e71
Drop meaningless test_max_batch_size_defaults_to_300_when_omitted (#400)
spicy-sauce May 28, 2026
056f8cf
Drop retrospective implementation plan; keep design spec (#400)
spicy-sauce May 28, 2026
78ef675
Switch from custom \$inherit to standard JSON Schema composition (#400)
spicy-sauce May 28, 2026
eae77e3
Update spec to reflect what shipped (#400)
spicy-sauce May 28, 2026
2d2bcc4
Spec: correct Event Hub migration note (#400)
spicy-sauce May 28, 2026
26a271b
Rename test_schema_inherit.py -> test_schema_refs.py (#400)
spicy-sauce May 31, 2026
a583362
Fix redis/read_stream PEL pagination regression (#400)
spicy-sauce May 31, 2026
9c2c59b
Add property-based tests, external-cancel test, mypy fix (#400)
spicy-sauce May 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ python = "^3.8"
PyYAML = "^6.0"
sqlglot = "^10.4.3"

hypothesis = { version = "^6.0", optional = true }
mock = { version = "^4.0.3", optional = true }
pytest = { version = "^7.1.2", optional = true }
pytest-aioresponses = { version = "^0.2.0", optional = true }
Expand Down Expand Up @@ -66,8 +67,11 @@ sqlserver = ["pymssql", "SQLAlchemy"]

test = [
"aiohttp",
"azure-eventhub",
"azure-eventhub-checkpointstoreblob-aio",
"cassandra-driver",
"fastparquet",
"hypothesis",
"ibm_db_sa",
"mock",
"oracledb",
Expand Down
4 changes: 3 additions & 1 deletion core/src/datayoga_core/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def get_json_schema(self) -> Dict[str, Any]:
os.path.dirname(os.path.realpath(sys.modules[self.__module__].__file__)),
"block.schema.json")
logger.debug(f"loading schema from {json_schema_file}")
return utils.read_json(json_schema_file)
# Lazy import: schema_utils -> utils -> block creates a circular import at module load.
from datayoga_core.schema_utils import resolve_refs
return resolve_refs(utils.read_json(json_schema_file), schema_path=json_schema_file)

@abstractmethod
def init(self, context: Optional[Context] = None):
Expand Down
72 changes: 21 additions & 51 deletions core/src/datayoga_core/blocks/azure/read_event_hub/block.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import asyncio
import logging
from typing import AsyncGenerator, List, Optional
from typing import Any, AsyncGenerator, Dict, List, Optional

import orjson
from azure.eventhub import EventData, PartitionContext
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import \
BlobCheckpointStore
from datayoga_core.context import Context
from datayoga_core.producer import Message
from datayoga_core.producer import Producer as DyProducer

logger = logging.getLogger("dy")
Expand All @@ -17,67 +16,47 @@
class Block(DyProducer):
"""Azure Event Hub block for reading events."""

def init(self, context: Optional[Context] = None):
"""Initializes the block.
DEFAULT_FLUSH_MS = 1000

Args:
context (Context, optional): The block context. Defaults to None.
"""
def init(self, context: Optional[Context] = None):
"""Constructs the Event Hub consumer client and the internal message queue."""
logger.debug(f"Initializing {self.get_block_name()}")

self.batch_size = self.properties.get("batch_size", 300)

self.max_batch_size = int(self.properties.get("max_batch_size", 300))
self.consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=self.properties["event_hub_connection_string"],
consumer_group=self.properties["event_hub_consumer_group_name"],
eventhub_name=self.properties["event_hub_name"],
checkpoint_store=BlobCheckpointStore.from_connection_string(
self.properties["checkpoint_store_connection_string"],
self.properties["checkpoint_store_container_name"])
self.properties["checkpoint_store_container_name"]),
)
self.events: Dict[Any, Any] = {}
self.messages: asyncio.Queue = asyncio.Queue()

self.events = {} # Retrieved events by sequence number, used for acknowledging them once processed
self.messages = asyncio.Queue()

async def produce(self) -> AsyncGenerator[List[Message], None]:
"""Starts the event receiving process and yield batches of messages.

Yields:
AsyncGenerator[List[Message], None]: A generator of message batches.
"""
async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]:
"""Starts the receive loop and yields one chunk per drained-queue snapshot."""
logger.debug(f"Running {self.get_block_name()}")

logger.debug("Starting event receiving process")
asyncio.create_task(self.receive_batch())

while True:
if not self.messages.empty():
batch = []
while not self.messages.empty():
message = await self.messages.get()
batch.append(message)

yield batch

await asyncio.sleep(0.1)
first = await self.messages.get()
chunk = [first]
while not self.messages.empty():
chunk.append(self.messages.get_nowait())
yield chunk

async def receive_batch(self):
"""Receives events in batches from the Event Hub."""
"""Runs the Azure SDK receive loop, dispatching each batch to `on_event_batch`."""
await self.consumer_client.receive_batch(
on_event_batch=self.on_event_batch,
max_batch_size=self.batch_size,
starting_position="-1", # read from the beginning of the partition.
max_batch_size=self.max_batch_size,
starting_position="-1",
)

async def on_event_batch(self, partition_context: PartitionContext, events: List[EventData]):
"""Processes each batch of events received from the Event Hub.

Args:
partition_context (PartitionContext): The partition context.
events (List[EventData]): The list of events in the batch.
"""
"""SDK callback: parses each event body as JSON and enqueues it for delivery."""
logger.debug(f"Received batch of events from partition: {partition_context.partition_id}")

for event in events:
try:
payload = orjson.loads(event.body_as_str(encoding="UTF-8"))
Expand All @@ -89,24 +68,15 @@ async def on_event_batch(self, partition_context: PartitionContext, events: List
logger.error(e)

async def complete_events(self, msg_ids: List[str]):
"""Completes the events and update the checkpoint.

Args:
msg_ids (List[str]): The list of message IDs to complete.
"""
"""Updates the partition checkpoint for each previously-delivered message id."""
for msg_id in msg_ids:
logger.debug(f"Acking {msg_id} event")
event, partition_context = self.events.pop(msg_id, (None, None))

if event is not None:
await partition_context.update_checkpoint(event)
else:
logger.warning(f"Couldn't find event {msg_id} for acknowledging")

def ack(self, msg_ids: List[str]):
"""Acknowledges the completion of events.

Args:
msg_ids (List[str]): The list of message IDs to acknowledge.
"""
"""Schedules checkpoint updates for the given message ids."""
asyncio.create_task(self.complete_events(msg_ids))
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"title": "azure.read_event_hub",
"description": "Read from Azure Event Hub",
"type": "object",
"allOf": [{ "$ref": "../../../resources/schemas/streamable.schema.json" }],
"properties": {
"event_hub_connection_string": {
"type": "string",
Expand All @@ -23,12 +25,14 @@
"type": "string",
"description": "The name of the container within the checkpoint store to store the checkpoints."
},
"batch_size": {
"max_batch_size": {
"type": "integer",
"description": "The maximum number of events to receive in each batch.",
"minimum": 1,
"description": "Maximum number of events to receive in each SDK callback. Renamed from the previous batch_size which used to mean this. Defaults to 300.",
"default": 300
}
},
"unevaluatedProperties": false,
"required": [
"event_hub_connection_string",
"event_hub_consumer_group_name",
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import pytest
from datayoga_core.blocks.azure.read_event_hub.block import Block
from jsonschema import ValidationError


def _minimal_props(extra=None):
"""Returns a minimal set of properties accepted by the Event Hub block schema."""
base = {
"event_hub_connection_string": "Endpoint=sb://x/;SharedAccessKeyName=k;SharedAccessKey=v;EntityPath=eh",
"event_hub_consumer_group_name": "$Default",
"event_hub_name": "eh",
"checkpoint_store_connection_string": "DefaultEndpointsProtocol=https;AccountName=a;AccountKey=k==",
"checkpoint_store_container_name": "chk",
}
if extra:
base.update(extra)
return base


def test_unknown_property_rejected_by_validation():
"""unevaluatedProperties: false catches typos like 'batch_sz'."""
with pytest.raises(ValidationError):
Block(_minimal_props({"batch_sz": 300}))


def test_max_batch_size_accepted():
"""The renamed SDK-level property is now max_batch_size."""
block = Block(_minimal_props({"max_batch_size": 500, "batch_size": 100}))
assert block.properties["max_batch_size"] == 500
assert block.properties["batch_size"] == 100


def test_renamed_schema_uses_unevaluated_properties_with_streamable():
"""Schema after rename: max_batch_size locally, streamable contributes
batch_size + flush_ms via allOf $ref, and unevaluatedProperties=false
rejects anything else."""
block = Block(_minimal_props())
schema = block.get_json_schema()
assert schema.get("unevaluatedProperties") is False
assert "max_batch_size" in schema["properties"]
# batch_size and flush_ms come from the inlined streamable fragment via allOf
fragment_props = schema["allOf"][0]["properties"]
assert "batch_size" in fragment_props
assert "flush_ms" in fragment_props


def test_batch_size_300_is_silently_repurposed():
"""A user upgrading from a pre-rename version with batch_size: 300 (which
used to mean SDK callback size) will see their YAML still validate, but
batch_size now means pipeline batch size. Documented as breaking change."""
block = Block(_minimal_props({"batch_size": 300}))
assert block.properties["batch_size"] == 300
assert "max_batch_size" not in block.properties
35 changes: 15 additions & 20 deletions core/src/datayoga_core/blocks/files/read_csv/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,49 @@
from contextlib import suppress
from csv import DictReader
from itertools import count, islice
from typing import AsyncGenerator, List, Optional
from typing import Any, AsyncGenerator, Dict, List, Optional

from datayoga_core.context import Context
from datayoga_core.producer import Message
from datayoga_core.producer import Producer as DyProducer

logger = logging.getLogger("dy")


class Block(DyProducer, metaclass=ABCMeta):
"""Producer block that reads records from a CSV file."""

def init(self, context: Optional[Context] = None):
"""Initializes the block: resolves the CSV file path and reader options."""
logger.debug(f"Initializing {self.get_block_name()}")
csv_file = self.properties["file"]

if os.path.isabs(csv_file) or context is None:
self.file = csv_file
else:
self.file = os.path.join(context.properties.get("data_path"), csv_file)

logger.debug(f"file: {self.file}")

self.encoding = self.properties.get("encoding", "utf-8")
self.batch_size = self.properties.get("batch_size", 1000)
self.fields = self.properties.get("fields")
self.skip = self.properties.get("skip", 0)
self.delimiter = self.properties.get("delimiter", ",")
self.quotechar = self.properties.get("quotechar", "\"")

async def produce(self) -> AsyncGenerator[List[Message], None]:
async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]:
"""Yields successive `batch_size`-sized chunks of CSV rows."""
logger.debug("Reading CSV")
batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE))

with open(self.file, "r", encoding=self.encoding) as read_obj:
reader = DictReader(read_obj, fieldnames=self.fields, delimiter=self.delimiter, quotechar=self.quotechar)
counter = iter(count())

reader = DictReader(read_obj, fieldnames=self.fields,
delimiter=self.delimiter, quotechar=self.quotechar)
for _ in range(self.skip):
with suppress(StopIteration):
next(reader)

counter = iter(count())
while True:
sliced = islice(reader, self.batch_size)
records = [{self.MSG_ID_FIELD: f"{next(counter)}", **record} for record in sliced]

if not records:
logger.debug(f"Done reading {self.file}")
chunk = [
{self.MSG_ID_FIELD: f"{next(counter)}", **record}
for record in islice(reader, batch_size)
]
if not chunk:
return

logger.debug(f"Producing {len(records)} records")

yield records
yield chunk
10 changes: 3 additions & 7 deletions core/src/datayoga_core/blocks/files/read_csv/block.schema.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"title": "files.read_csv",
"description": "Read data from CSV",
"type": "object",
"allOf": [{ "$ref": "../../../resources/schemas/batchable.schema.json" }],
"properties": {
"file": {
"description": "Filename. Can contain a regexp or glob expression",
Expand Down Expand Up @@ -39,12 +41,6 @@
"maxLength": 1,
"default": ","
},
"batch_size": {
"description": "Number of records to read per batch",
"type": "number",
"minimum": 1,
"default": 1000
},
"quotechar": {
"description": "A one-character string used to quote fields containing special characters, such as the delimiter or quotechar, or which contain new-line characters. It defaults to '",
"type": "string",
Expand All @@ -53,7 +49,7 @@
"default": "\""
}
},
"additionalProperties": false,
"unevaluatedProperties": false,
"required": ["file"],
"examples": [
{
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pathlib import Path

import pytest
from datayoga_core.blocks.files.read_csv.block import Block


async def _drain(producer):
"""Collects all batches emitted by a producer until end-of-stream."""
out = []
async for batch in producer.produce():
out.append(batch)
return out


@pytest.fixture
def csv_path(tmp_path) -> Path:
"""Writes a 2500-row CSV with a single header row to a temp path."""
p = tmp_path / "data.csv"
rows = ["fname,lname"] + [f"first{i},last{i}" for i in range(2500)]
p.write_text("\n".join(rows) + "\n", encoding="utf-8")
return p


@pytest.mark.asyncio
async def test_csv_batches_to_batch_size(csv_path):
"""2500 CSV rows with batch_size=1000 yields batches of [1000, 1000, 500]."""
block = Block({"file": str(csv_path), "batch_size": 1000})
block.init()
batches = await _drain(block)
assert [len(b) for b in batches] == [1000, 1000, 500]
assert all(Block.MSG_ID_FIELD in r for b in batches for r in b)
assert batches[0][0]["fname"] == "first0"


@pytest.mark.asyncio
async def test_csv_default_batch_size(csv_path):
"""Without batch_size in properties, the default 1000 is applied."""
block = Block({"file": str(csv_path)})
block.init()
batches = await _drain(block)
assert [len(b) for b in batches] == [1000, 1000, 500]
Loading
Loading