diff --git a/core/pyproject.toml b/core/pyproject.toml index 9939fd53..2aeca188 100644 --- a/core/pyproject.toml +++ b/core/pyproject.toml @@ -28,6 +28,7 @@ python = "^3.8" PyYAML = "^6.0" sqlglot = "^10.4.3" +hypothesis = { version = "^6.0", optional = true } mock = { version = "^4.0.3", optional = true } pytest = { version = "^7.1.2", optional = true } pytest-aioresponses = { version = "^0.2.0", optional = true } @@ -66,8 +67,11 @@ sqlserver = ["pymssql", "SQLAlchemy"] test = [ "aiohttp", + "azure-eventhub", + "azure-eventhub-checkpointstoreblob-aio", "cassandra-driver", "fastparquet", + "hypothesis", "ibm_db_sa", "mock", "oracledb", diff --git a/core/src/datayoga_core/block.py b/core/src/datayoga_core/block.py index 29363953..2dd6300b 100644 --- a/core/src/datayoga_core/block.py +++ b/core/src/datayoga_core/block.py @@ -56,7 +56,9 @@ def get_json_schema(self) -> Dict[str, Any]: os.path.dirname(os.path.realpath(sys.modules[self.__module__].__file__)), "block.schema.json") logger.debug(f"loading schema from {json_schema_file}") - return utils.read_json(json_schema_file) + # Lazy import: schema_utils -> utils -> block creates a circular import at module load. + from datayoga_core.schema_utils import resolve_refs + return resolve_refs(utils.read_json(json_schema_file), schema_path=json_schema_file) @abstractmethod def init(self, context: Optional[Context] = None): diff --git a/core/src/datayoga_core/blocks/azure/read_event_hub/block.py b/core/src/datayoga_core/blocks/azure/read_event_hub/block.py index d91497ed..ba4173ba 100644 --- a/core/src/datayoga_core/blocks/azure/read_event_hub/block.py +++ b/core/src/datayoga_core/blocks/azure/read_event_hub/block.py @@ -1,6 +1,6 @@ import asyncio import logging -from typing import AsyncGenerator, List, Optional +from typing import Any, AsyncGenerator, Dict, List, Optional import orjson from azure.eventhub import EventData, PartitionContext @@ -8,7 +8,6 @@ from azure.eventhub.extensions.checkpointstoreblobaio import \ BlobCheckpointStore from datayoga_core.context import Context -from datayoga_core.producer import Message from datayoga_core.producer import Producer as DyProducer logger = logging.getLogger("dy") @@ -17,67 +16,47 @@ class Block(DyProducer): """Azure Event Hub block for reading events.""" - def init(self, context: Optional[Context] = None): - """Initializes the block. + DEFAULT_FLUSH_MS = 1000 - Args: - context (Context, optional): The block context. Defaults to None. - """ + def init(self, context: Optional[Context] = None): + """Constructs the Event Hub consumer client and the internal message queue.""" logger.debug(f"Initializing {self.get_block_name()}") - - self.batch_size = self.properties.get("batch_size", 300) - + self.max_batch_size = int(self.properties.get("max_batch_size", 300)) self.consumer_client = EventHubConsumerClient.from_connection_string( conn_str=self.properties["event_hub_connection_string"], consumer_group=self.properties["event_hub_consumer_group_name"], eventhub_name=self.properties["event_hub_name"], checkpoint_store=BlobCheckpointStore.from_connection_string( self.properties["checkpoint_store_connection_string"], - self.properties["checkpoint_store_container_name"]) + self.properties["checkpoint_store_container_name"]), ) + self.events: Dict[Any, Any] = {} + self.messages: asyncio.Queue = asyncio.Queue() - self.events = {} # Retrieved events by sequence number, used for acknowledging them once processed - self.messages = asyncio.Queue() - - async def produce(self) -> AsyncGenerator[List[Message], None]: - """Starts the event receiving process and yield batches of messages. - - Yields: - AsyncGenerator[List[Message], None]: A generator of message batches. - """ + async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + """Starts the receive loop and yields one chunk per drained-queue snapshot.""" logger.debug(f"Running {self.get_block_name()}") - logger.debug("Starting event receiving process") asyncio.create_task(self.receive_batch()) while True: - if not self.messages.empty(): - batch = [] - while not self.messages.empty(): - message = await self.messages.get() - batch.append(message) - - yield batch - - await asyncio.sleep(0.1) + first = await self.messages.get() + chunk = [first] + while not self.messages.empty(): + chunk.append(self.messages.get_nowait()) + yield chunk async def receive_batch(self): - """Receives events in batches from the Event Hub.""" + """Runs the Azure SDK receive loop, dispatching each batch to `on_event_batch`.""" await self.consumer_client.receive_batch( on_event_batch=self.on_event_batch, - max_batch_size=self.batch_size, - starting_position="-1", # read from the beginning of the partition. + max_batch_size=self.max_batch_size, + starting_position="-1", ) async def on_event_batch(self, partition_context: PartitionContext, events: List[EventData]): - """Processes each batch of events received from the Event Hub. - - Args: - partition_context (PartitionContext): The partition context. - events (List[EventData]): The list of events in the batch. - """ + """SDK callback: parses each event body as JSON and enqueues it for delivery.""" logger.debug(f"Received batch of events from partition: {partition_context.partition_id}") - for event in events: try: payload = orjson.loads(event.body_as_str(encoding="UTF-8")) @@ -89,24 +68,15 @@ async def on_event_batch(self, partition_context: PartitionContext, events: List logger.error(e) async def complete_events(self, msg_ids: List[str]): - """Completes the events and update the checkpoint. - - Args: - msg_ids (List[str]): The list of message IDs to complete. - """ + """Updates the partition checkpoint for each previously-delivered message id.""" for msg_id in msg_ids: logger.debug(f"Acking {msg_id} event") event, partition_context = self.events.pop(msg_id, (None, None)) - if event is not None: await partition_context.update_checkpoint(event) else: logger.warning(f"Couldn't find event {msg_id} for acknowledging") def ack(self, msg_ids: List[str]): - """Acknowledges the completion of events. - - Args: - msg_ids (List[str]): The list of message IDs to acknowledge. - """ + """Schedules checkpoint updates for the given message ids.""" asyncio.create_task(self.complete_events(msg_ids)) diff --git a/core/src/datayoga_core/blocks/azure/read_event_hub/block.schema.json b/core/src/datayoga_core/blocks/azure/read_event_hub/block.schema.json index 908c211c..f014b63f 100644 --- a/core/src/datayoga_core/blocks/azure/read_event_hub/block.schema.json +++ b/core/src/datayoga_core/blocks/azure/read_event_hub/block.schema.json @@ -1,7 +1,9 @@ { + "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "azure.read_event_hub", "description": "Read from Azure Event Hub", "type": "object", + "allOf": [{ "$ref": "../../../resources/schemas/streamable.schema.json" }], "properties": { "event_hub_connection_string": { "type": "string", @@ -23,12 +25,14 @@ "type": "string", "description": "The name of the container within the checkpoint store to store the checkpoints." }, - "batch_size": { + "max_batch_size": { "type": "integer", - "description": "The maximum number of events to receive in each batch.", + "minimum": 1, + "description": "Maximum number of events to receive in each SDK callback. Renamed from the previous batch_size which used to mean this. Defaults to 300.", "default": 300 } }, + "unevaluatedProperties": false, "required": [ "event_hub_connection_string", "event_hub_consumer_group_name", diff --git a/core/src/datayoga_core/blocks/azure/read_event_hub/tests/__init__.py b/core/src/datayoga_core/blocks/azure/read_event_hub/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/src/datayoga_core/blocks/azure/read_event_hub/tests/test_event_hub.py b/core/src/datayoga_core/blocks/azure/read_event_hub/tests/test_event_hub.py new file mode 100644 index 00000000..b18fca3b --- /dev/null +++ b/core/src/datayoga_core/blocks/azure/read_event_hub/tests/test_event_hub.py @@ -0,0 +1,53 @@ +import pytest +from datayoga_core.blocks.azure.read_event_hub.block import Block +from jsonschema import ValidationError + + +def _minimal_props(extra=None): + """Returns a minimal set of properties accepted by the Event Hub block schema.""" + base = { + "event_hub_connection_string": "Endpoint=sb://x/;SharedAccessKeyName=k;SharedAccessKey=v;EntityPath=eh", + "event_hub_consumer_group_name": "$Default", + "event_hub_name": "eh", + "checkpoint_store_connection_string": "DefaultEndpointsProtocol=https;AccountName=a;AccountKey=k==", + "checkpoint_store_container_name": "chk", + } + if extra: + base.update(extra) + return base + + +def test_unknown_property_rejected_by_validation(): + """unevaluatedProperties: false catches typos like 'batch_sz'.""" + with pytest.raises(ValidationError): + Block(_minimal_props({"batch_sz": 300})) + + +def test_max_batch_size_accepted(): + """The renamed SDK-level property is now max_batch_size.""" + block = Block(_minimal_props({"max_batch_size": 500, "batch_size": 100})) + assert block.properties["max_batch_size"] == 500 + assert block.properties["batch_size"] == 100 + + +def test_renamed_schema_uses_unevaluated_properties_with_streamable(): + """Schema after rename: max_batch_size locally, streamable contributes + batch_size + flush_ms via allOf $ref, and unevaluatedProperties=false + rejects anything else.""" + block = Block(_minimal_props()) + schema = block.get_json_schema() + assert schema.get("unevaluatedProperties") is False + assert "max_batch_size" in schema["properties"] + # batch_size and flush_ms come from the inlined streamable fragment via allOf + fragment_props = schema["allOf"][0]["properties"] + assert "batch_size" in fragment_props + assert "flush_ms" in fragment_props + + +def test_batch_size_300_is_silently_repurposed(): + """A user upgrading from a pre-rename version with batch_size: 300 (which + used to mean SDK callback size) will see their YAML still validate, but + batch_size now means pipeline batch size. Documented as breaking change.""" + block = Block(_minimal_props({"batch_size": 300})) + assert block.properties["batch_size"] == 300 + assert "max_batch_size" not in block.properties diff --git a/core/src/datayoga_core/blocks/files/read_csv/block.py b/core/src/datayoga_core/blocks/files/read_csv/block.py index c4bca6f6..8e94f1f7 100644 --- a/core/src/datayoga_core/blocks/files/read_csv/block.py +++ b/core/src/datayoga_core/blocks/files/read_csv/block.py @@ -4,54 +4,49 @@ from contextlib import suppress from csv import DictReader from itertools import count, islice -from typing import AsyncGenerator, List, Optional +from typing import Any, AsyncGenerator, Dict, List, Optional from datayoga_core.context import Context -from datayoga_core.producer import Message from datayoga_core.producer import Producer as DyProducer logger = logging.getLogger("dy") class Block(DyProducer, metaclass=ABCMeta): + """Producer block that reads records from a CSV file.""" def init(self, context: Optional[Context] = None): + """Initializes the block: resolves the CSV file path and reader options.""" logger.debug(f"Initializing {self.get_block_name()}") csv_file = self.properties["file"] - if os.path.isabs(csv_file) or context is None: self.file = csv_file else: self.file = os.path.join(context.properties.get("data_path"), csv_file) - logger.debug(f"file: {self.file}") - self.encoding = self.properties.get("encoding", "utf-8") - self.batch_size = self.properties.get("batch_size", 1000) self.fields = self.properties.get("fields") self.skip = self.properties.get("skip", 0) self.delimiter = self.properties.get("delimiter", ",") self.quotechar = self.properties.get("quotechar", "\"") - async def produce(self) -> AsyncGenerator[List[Message], None]: + async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + """Yields successive `batch_size`-sized chunks of CSV rows.""" logger.debug("Reading CSV") + batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE)) with open(self.file, "r", encoding=self.encoding) as read_obj: - reader = DictReader(read_obj, fieldnames=self.fields, delimiter=self.delimiter, quotechar=self.quotechar) - counter = iter(count()) - + reader = DictReader(read_obj, fieldnames=self.fields, + delimiter=self.delimiter, quotechar=self.quotechar) for _ in range(self.skip): with suppress(StopIteration): next(reader) - + counter = iter(count()) while True: - sliced = islice(reader, self.batch_size) - records = [{self.MSG_ID_FIELD: f"{next(counter)}", **record} for record in sliced] - - if not records: - logger.debug(f"Done reading {self.file}") + chunk = [ + {self.MSG_ID_FIELD: f"{next(counter)}", **record} + for record in islice(reader, batch_size) + ] + if not chunk: return - - logger.debug(f"Producing {len(records)} records") - - yield records + yield chunk diff --git a/core/src/datayoga_core/blocks/files/read_csv/block.schema.json b/core/src/datayoga_core/blocks/files/read_csv/block.schema.json index 39e7118a..dc837561 100644 --- a/core/src/datayoga_core/blocks/files/read_csv/block.schema.json +++ b/core/src/datayoga_core/blocks/files/read_csv/block.schema.json @@ -1,7 +1,9 @@ { + "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "files.read_csv", "description": "Read data from CSV", "type": "object", + "allOf": [{ "$ref": "../../../resources/schemas/batchable.schema.json" }], "properties": { "file": { "description": "Filename. Can contain a regexp or glob expression", @@ -39,12 +41,6 @@ "maxLength": 1, "default": "," }, - "batch_size": { - "description": "Number of records to read per batch", - "type": "number", - "minimum": 1, - "default": 1000 - }, "quotechar": { "description": "A one-character string used to quote fields containing special characters, such as the delimiter or quotechar, or which contain new-line characters. It defaults to '", "type": "string", @@ -53,7 +49,7 @@ "default": "\"" } }, - "additionalProperties": false, + "unevaluatedProperties": false, "required": ["file"], "examples": [ { diff --git a/core/src/datayoga_core/blocks/files/read_csv/tests/__init__.py b/core/src/datayoga_core/blocks/files/read_csv/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/src/datayoga_core/blocks/files/read_csv/tests/test_read_csv.py b/core/src/datayoga_core/blocks/files/read_csv/tests/test_read_csv.py new file mode 100644 index 00000000..a479910a --- /dev/null +++ b/core/src/datayoga_core/blocks/files/read_csv/tests/test_read_csv.py @@ -0,0 +1,41 @@ +from pathlib import Path + +import pytest +from datayoga_core.blocks.files.read_csv.block import Block + + +async def _drain(producer): + """Collects all batches emitted by a producer until end-of-stream.""" + out = [] + async for batch in producer.produce(): + out.append(batch) + return out + + +@pytest.fixture +def csv_path(tmp_path) -> Path: + """Writes a 2500-row CSV with a single header row to a temp path.""" + p = tmp_path / "data.csv" + rows = ["fname,lname"] + [f"first{i},last{i}" for i in range(2500)] + p.write_text("\n".join(rows) + "\n", encoding="utf-8") + return p + + +@pytest.mark.asyncio +async def test_csv_batches_to_batch_size(csv_path): + """2500 CSV rows with batch_size=1000 yields batches of [1000, 1000, 500].""" + block = Block({"file": str(csv_path), "batch_size": 1000}) + block.init() + batches = await _drain(block) + assert [len(b) for b in batches] == [1000, 1000, 500] + assert all(Block.MSG_ID_FIELD in r for b in batches for r in b) + assert batches[0][0]["fname"] == "first0" + + +@pytest.mark.asyncio +async def test_csv_default_batch_size(csv_path): + """Without batch_size in properties, the default 1000 is applied.""" + block = Block({"file": str(csv_path)}) + block.init() + batches = await _drain(block) + assert [len(b) for b in batches] == [1000, 1000, 500] diff --git a/core/src/datayoga_core/blocks/http/receiver/block.py b/core/src/datayoga_core/blocks/http/receiver/block.py index f325e56b..ab0fa60a 100644 --- a/core/src/datayoga_core/blocks/http/receiver/block.py +++ b/core/src/datayoga_core/blocks/http/receiver/block.py @@ -3,35 +3,40 @@ from asyncio import Queue from contextlib import suppress from itertools import count -from typing import AsyncGenerator, List, Optional +from typing import Any, AsyncGenerator, Dict, List, Optional import orjson from aiohttp.web import (BaseRequest, HTTPInternalServerError, HTTPOk, Response, Server, ServerRunner, TCPSite) from datayoga_core.context import Context -from datayoga_core.producer import Message from datayoga_core.producer import Producer as DyProducer logger = logging.getLogger("dy") class Block(DyProducer, metaclass=ABCMeta): + """Producer block that exposes an HTTP endpoint and emits POSTed JSON bodies.""" + port: int host: str + DEFAULT_FLUSH_MS = 1000 def init(self, context: Optional[Context] = None): + """Reads host/port from properties; the HTTP server is started in produce_chunks.""" logger.debug(f"Initializing {self.get_block_name()}") self.port = int(self.properties.get("port", 8080)) self.host = self.properties.get("host", "0.0.0.0") - async def produce(self) -> AsyncGenerator[List[Message], None]: - queue = Queue(maxsize=1000) + async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + """Starts the HTTP server, then yields one chunk per drained queue snapshot.""" + queue: Queue = Queue(maxsize=1000) async def handler(request: BaseRequest) -> Response: + """Parses the incoming HTTP body as JSON and enqueues it for delivery.""" try: queue.put_nowait(orjson.loads(await request.read())) return HTTPOk() - except Exception: # noqa + except Exception: logger.exception("Got exception while parsing request:") return HTTPInternalServerError() @@ -43,11 +48,13 @@ async def handler(request: BaseRequest) -> Response: try: counter = iter(count()) - while True: - data = await queue.get() - yield [{self.MSG_ID_FIELD: f"{next(counter)}", **data}] - + first = await queue.get() + chunk = [{self.MSG_ID_FIELD: f"{next(counter)}", **first}] + while not queue.empty(): + record = queue.get_nowait() + chunk.append({self.MSG_ID_FIELD: f"{next(counter)}", **record}) + yield chunk finally: with suppress(Exception): await srv.stop() diff --git a/core/src/datayoga_core/blocks/http/receiver/block.schema.json b/core/src/datayoga_core/blocks/http/receiver/block.schema.json index c5189b5f..1f93ccd5 100644 --- a/core/src/datayoga_core/blocks/http/receiver/block.schema.json +++ b/core/src/datayoga_core/blocks/http/receiver/block.schema.json @@ -1,7 +1,9 @@ { + "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "http.receiver", "description": "Receives HTTP requests and process the data.", "type": "object", + "allOf": [{ "$ref": "../../../resources/schemas/streamable.schema.json" }], "properties": { "host": { "description": "Host to listen", @@ -14,7 +16,7 @@ "default": 8080 } }, - "additionalProperties": false, + "unevaluatedProperties": false, "examples": [ { "host": "localhost", diff --git a/core/src/datayoga_core/blocks/http/receiver/tests/__init__.py b/core/src/datayoga_core/blocks/http/receiver/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/src/datayoga_core/blocks/http/receiver/tests/test_http_receiver.py b/core/src/datayoga_core/blocks/http/receiver/tests/test_http_receiver.py new file mode 100644 index 00000000..ee187f71 --- /dev/null +++ b/core/src/datayoga_core/blocks/http/receiver/tests/test_http_receiver.py @@ -0,0 +1,50 @@ +import asyncio + +import aiohttp +import pytest +from datayoga_core.blocks.http.receiver.block import Block + + +def _free_port(): + """Returns an unused TCP port on localhost.""" + import socket + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +@pytest.mark.asyncio +async def test_http_receiver_batches_incoming_requests(): + """60 POSTs with batch_size=50 + flush_ms=200 yield at least one full batch of 50.""" + port = _free_port() + block = Block({"host": "127.0.0.1", "port": port, + "batch_size": 50, "flush_ms": 200}) + block.init() + + received = [] + gen = block.produce() + + async def consumer(): + """Drains the producer until 60 records have arrived, then closes the generator.""" + try: + async for batch in gen: + received.append(batch) + if sum(len(b) for b in received) >= 60: + return + finally: + await gen.aclose() + + consumer_task = asyncio.create_task(consumer()) + await asyncio.sleep(0.2) # let server start + + async with aiohttp.ClientSession() as session: + for i in range(60): + async with session.post(f"http://127.0.0.1:{port}", json={"i": i}) as r: + assert r.status == 200 + + await asyncio.wait_for(consumer_task, timeout=5) + + flat = [r for b in received for r in b] + assert len(flat) == 60 + assert any(len(b) == 50 for b in received) + assert all(Block.MSG_ID_FIELD in r for r in flat) diff --git a/core/src/datayoga_core/blocks/parquet/read/block.py b/core/src/datayoga_core/blocks/parquet/read/block.py index f72e6490..f82604ee 100644 --- a/core/src/datayoga_core/blocks/parquet/read/block.py +++ b/core/src/datayoga_core/blocks/parquet/read/block.py @@ -1,10 +1,10 @@ import logging import os from abc import ABCMeta -from typing import AsyncGenerator, List, Optional +from itertools import count +from typing import Any, AsyncGenerator, Dict, List, Optional from datayoga_core.context import Context -from datayoga_core.producer import Message from datayoga_core.producer import Producer as DyProducer from fastparquet import ParquetFile @@ -12,25 +12,25 @@ class Block(DyProducer, metaclass=ABCMeta): + """Producer block that reads records from a Parquet file.""" def init(self, context: Optional[Context] = None): + """Initializes the block: resolves the Parquet file path.""" logger.debug(f"Initializing {self.get_block_name()}") parquet_file = self.properties["file"] - if os.path.isabs(parquet_file) or context is None: self.file = parquet_file else: self.file = os.path.join(context.properties.get("data_path"), parquet_file) - logger.debug(f"file: {self.file}") - async def produce(self) -> AsyncGenerator[List[Message], None]: + async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + """Yields one chunk per Parquet row group; the base class re-chunks to `batch_size`.""" logger.debug("Reading parquet") - pf = ParquetFile(self.file) - - count = 0 + counter = iter(count()) for df in pf.iter_row_groups(): - for _, data in df.iterrows(): - yield [{self.MSG_ID_FIELD: str(count), **data.to_dict()}] - count += 1 + yield [ + {self.MSG_ID_FIELD: str(next(counter)), **row.to_dict()} + for _, row in df.iterrows() + ] diff --git a/core/src/datayoga_core/blocks/parquet/read/block.schema.json b/core/src/datayoga_core/blocks/parquet/read/block.schema.json index 13bcec76..777c23c4 100644 --- a/core/src/datayoga_core/blocks/parquet/read/block.schema.json +++ b/core/src/datayoga_core/blocks/parquet/read/block.schema.json @@ -1,14 +1,16 @@ { + "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "parquet.read", "description": "Read data from parquet", "type": "object", + "allOf": [{ "$ref": "../../../resources/schemas/batchable.schema.json" }], "properties": { "file": { "description": "Filename. Can contain a regexp or glob expression", "type": "string" } }, - "additionalProperties": false, + "unevaluatedProperties": false, "required": ["file"], "examples": [ { diff --git a/core/src/datayoga_core/blocks/parquet/read/tests/__init__.py b/core/src/datayoga_core/blocks/parquet/read/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/src/datayoga_core/blocks/parquet/read/tests/test_parquet_read.py b/core/src/datayoga_core/blocks/parquet/read/tests/test_parquet_read.py new file mode 100644 index 00000000..546f77d9 --- /dev/null +++ b/core/src/datayoga_core/blocks/parquet/read/tests/test_parquet_read.py @@ -0,0 +1,46 @@ +from pathlib import Path + +import pandas as pd +import pytest +from datayoga_core.blocks.parquet.read.block import Block + + +async def _drain(producer): + """Collects all batches emitted by a producer until end-of-stream.""" + out = [] + async for batch in producer.produce(): + out.append(batch) + return out + + +@pytest.fixture +def parquet_path(tmp_path) -> Path: + """Writes a 2500-row Parquet file with three row groups (1000, 1000, 500).""" + p = tmp_path / "data.parquet" + df = pd.DataFrame({"i": list(range(2500))}) + from fastparquet import write as fp_write + fp_write(str(p), df, row_group_offsets=1000) + return p + + +@pytest.mark.asyncio +async def test_parquet_batches_to_batch_size(parquet_path): + """2500 rows across three row groups, batch_size=1000 -> [1000, 1000, 500].""" + block = Block({"file": str(parquet_path), "batch_size": 1000}) + block.init() + batches = await _drain(block) + assert [len(b) for b in batches] == [1000, 1000, 500] + flat = [r for b in batches for r in b] + assert flat[0]["i"] == 0 + assert all(Block.MSG_ID_FIELD in r for r in flat) + + +@pytest.mark.asyncio +async def test_parquet_rechunks_across_row_groups(parquet_path): + """Batches honor batch_size regardless of underlying row-group boundaries.""" + # row groups are [1000, 1000, 500]; batch_size=750 should give batches of + # [750, 750, 750, 250] regardless of row group boundaries. + block = Block({"file": str(parquet_path), "batch_size": 750}) + block.init() + batches = await _drain(block) + assert [len(b) for b in batches] == [750, 750, 750, 250] diff --git a/core/src/datayoga_core/blocks/redis/read_stream/block.py b/core/src/datayoga_core/blocks/redis/read_stream/block.py index 667ed02d..ad166a8c 100644 --- a/core/src/datayoga_core/blocks/redis/read_stream/block.py +++ b/core/src/datayoga_core/blocks/redis/read_stream/block.py @@ -1,23 +1,25 @@ import logging -from typing import AsyncGenerator, List, Optional +from typing import Any, AsyncGenerator, Dict, List, Optional import datayoga_core.blocks.redis.utils as redis_utils import orjson from datayoga_core.connection import Connection from datayoga_core.context import Context -from datayoga_core.producer import Message from datayoga_core.producer import Producer as DyProducer logger = logging.getLogger("dy") class Block(DyProducer): + """Producer block that reads messages from a Redis stream consumer group.""" + + DEFAULT_FLUSH_MS = 1000 + def init(self, context: Optional[Context] = None): + """Connects to Redis and ensures the consumer group exists on the target stream.""" logger.debug(f"Initializing {self.get_block_name()}") - connection_details = Connection.get_connection_details(self.properties["connection"], context) self.redis_client = redis_utils.get_client(connection_details) - self.stream = self.properties["stream_name"] self.snapshot = self.properties.get("snapshot", False) self.consumer_group = f'datayoga_job_{context.properties.get("job_name", "") if context else ""}' @@ -27,29 +29,50 @@ def init(self, context: Optional[Context] = None): logger.info(f"Creating a new {self.consumer_group} consumer group associated with the {self.stream}") self.redis_client.xgroup_create(self.stream, self.consumer_group, 0) - async def produce(self) -> AsyncGenerator[List[Message], None]: - logger.debug(f"Running {self.get_block_name()}") + async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + """Reads pending then new stream messages via XREADGROUP, yielding each response as a chunk. + Pending entries (id="0") are drained in a single unbounded XREADGROUP + call (count=None) — this matches pre-PR behavior. Paginating PEL via + count is not safe with a non-acking producer because XREADGROUP id="0" + always returns from the start of PEL, so a smaller count would just + re-read the same first page forever. New-message reads (id=">") use + count=batch_size to bound the Redis network response size. + """ + logger.debug(f"Running {self.get_block_name()}") + batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE)) read_pending = True + while True: - # Read pending messages (fetched by us before but not acknowledged) in the first time, then consume new messages - streams = self.redis_client.xreadgroup(self.consumer_group, self.requesting_consumer, { - self.stream: "0" if read_pending else ">"}, None, 100 if self.snapshot else 0) + streams = self.redis_client.xreadgroup( + self.consumer_group, self.requesting_consumer, + {self.stream: "0" if read_pending else ">"}, + count=None if read_pending else batch_size, + block=100 if self.snapshot else 0, + ) + yielded_any = False for stream in streams: logger.debug(f"Messages in {self.stream} stream (pending: {read_pending}):\n\t{stream}") + chunk: List[Dict[str, Any]] = [] for key, value in stream[1]: payload = orjson.loads(value[next(iter(value))]) payload[self.MSG_ID_FIELD] = key - yield [payload] + chunk.append(payload) + if chunk: + yielded_any = True + yield chunk - # Quit after consuming pending current messages in case of snapshot - if self.snapshot and not read_pending: - break + if self.snapshot and not read_pending and not yielded_any: + return + # Flip unconditionally after the first pending-read call: count=None + # drained the entire PEL in that single call, so there's no more + # pending work to do this session. read_pending = False def ack(self, msg_ids: List[str]): + """Acknowledges the given message ids with XACK on the stream consumer group.""" for msg_id in msg_ids: logger.info(f"Acking {msg_id} message in {self.stream} stream of {self.consumer_group} consumer group") self.redis_client.xack(self.stream, self.consumer_group, msg_id) diff --git a/core/src/datayoga_core/blocks/redis/read_stream/block.schema.json b/core/src/datayoga_core/blocks/redis/read_stream/block.schema.json index bc2d148c..4411149f 100644 --- a/core/src/datayoga_core/blocks/redis/read_stream/block.schema.json +++ b/core/src/datayoga_core/blocks/redis/read_stream/block.schema.json @@ -1,7 +1,9 @@ { + "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "redis.read_stream", "description": "Read from Redis stream", "type": "object", + "allOf": [{ "$ref": "../../../resources/schemas/streamable.schema.json" }], "properties": { "connection": { "description": "Connection name", "type": "string" }, "stream_name": { @@ -16,6 +18,6 @@ "default": false } }, - "additionalProperties": false, + "unevaluatedProperties": false, "required": ["connection", "stream_name"] } diff --git a/core/src/datayoga_core/blocks/redis/read_stream/tests/__init__.py b/core/src/datayoga_core/blocks/redis/read_stream/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/src/datayoga_core/blocks/redis/read_stream/tests/test_redis_read_stream.py b/core/src/datayoga_core/blocks/redis/read_stream/tests/test_redis_read_stream.py new file mode 100644 index 00000000..de003da8 --- /dev/null +++ b/core/src/datayoga_core/blocks/redis/read_stream/tests/test_redis_read_stream.py @@ -0,0 +1,109 @@ +from unittest.mock import MagicMock + +import pytest +from datayoga_core.blocks.redis.read_stream.block import Block + + +def _mk_block(properties, redis_client): + """Builds a redis/read_stream Block bypassing its real init() (mocks the Redis client).""" + block = Block.__new__(Block) + block.properties = properties + block.redis_client = redis_client + block.stream = "mystream" + block.snapshot = properties.get("_snapshot", True) + block.consumer_group = "g" + block.requesting_consumer = "c" + return block + + +def _xreadgroup_count(call): + """Extract the count arg from an xreadgroup call regardless of kw/positional.""" + if "count" in call.kwargs: + return call.kwargs["count"] + if len(call.args) >= 4: + return call.args[3] + return None + + +def _xreadgroup_id(call): + """Extract the stream-id dict value from an xreadgroup call.""" + streams = call.kwargs.get("streams") or (call.args[2] if len(call.args) >= 3 else {}) + return next(iter(streams.values())) if streams else None + + +@pytest.mark.asyncio +async def test_redis_new_message_read_uses_count_equal_to_batch_size(): + """xreadgroup for new messages (id='>') uses count=batch_size (closes #377).""" + redis = MagicMock() + payload_a = (b"1-0", {b"data": b'{"i": 1}'}) + payload_b = (b"2-0", {b"data": b'{"i": 2}'}) + redis.xreadgroup.side_effect = [ + [(b"mystream", [payload_a, payload_b])], # pending (drained in one call, count=None) + [(b"mystream", [])], # new-read empty -> exit + ] + + block = _mk_block({"batch_size": 250, "_snapshot": True}, redis) + batches = [] + async for b in block.produce(): + batches.append(b) + + # First call is pending (id="0"); it uses count=None (drain). + pending_call = redis.xreadgroup.call_args_list[0] + assert _xreadgroup_id(pending_call) == "0" + assert _xreadgroup_count(pending_call) is None, \ + "pending read should use count=None to drain PEL in one call" + + # Subsequent new-message calls (id=">") use count=batch_size. + new_calls = [c for c in redis.xreadgroup.call_args_list if _xreadgroup_id(c) == ">"] + assert new_calls, "expected at least one new-message read" + for c in new_calls: + assert _xreadgroup_count(c) == 250, \ + f"new-message read should use count=batch_size, got count={_xreadgroup_count(c)}" + + +@pytest.mark.asyncio +async def test_redis_yields_records_as_a_batch_not_one_by_one(): + """A 5-record xreadgroup response yields one batch of 5, not five batches of 1.""" + redis = MagicMock() + pages = [(f"{i}-0".encode(), {b"data": f'{{"i": {i}}}'.encode()}) for i in range(5)] + redis.xreadgroup.side_effect = [ + [(b"mystream", pages)], # pending drained in one call + [(b"mystream", [])], # new-read empty -> exit + ] + + block = _mk_block({"batch_size": 100, "_snapshot": True}, redis) + batches = [] + async for b in block.produce(): + batches.append(b) + + assert [len(b) for b in batches] == [5] + assert batches[0][0]["i"] == 0 + + +@pytest.mark.asyncio +async def test_redis_drains_full_pel_in_one_call_even_when_larger_than_batch_size(): + """Pending reads use count=None so the entire PEL drains in a single call. + The base class re-chunks the result to batch_size. This avoids the + Copilot-flagged pagination bug where count=batch_size + XREADGROUP id='0' + would re-read the same first page forever (since the producer doesn't ack + inside produce_chunks).""" + redis = MagicMock() + # Simulate a PEL of 20 entries returned in one xreadgroup call. + pel = [(f"{i}-0".encode(), {b"data": f'{{"i": {i}}}'.encode()}) for i in range(20)] + redis.xreadgroup.side_effect = [ + [(b"mystream", pel)], # entire PEL in one call (count=None) + [(b"mystream", [])], # new-read empty -> exit + ] + + block = _mk_block({"batch_size": 5, "_snapshot": True}, redis) + batches = [] + async for b in block.produce(): + batches.append(b) + + # All 20 pending entries are delivered; the base class re-chunks them + # to batch_size=5 → four batches of 5. + assert [len(b) for b in batches] == [5, 5, 5, 5] + # Only ONE pending read was made (PEL drained in one shot). + pending_calls = [c for c in redis.xreadgroup.call_args_list if _xreadgroup_id(c) == "0"] + assert len(pending_calls) == 1, \ + f"expected exactly 1 pending read (count=None drains all), got {len(pending_calls)}" diff --git a/core/src/datayoga_core/blocks/relational/read/block.py b/core/src/datayoga_core/blocks/relational/read/block.py index 97d8dcdd..4dd8f026 100644 --- a/core/src/datayoga_core/blocks/relational/read/block.py +++ b/core/src/datayoga_core/blocks/relational/read/block.py @@ -1,23 +1,26 @@ import logging -from typing import AsyncGenerator, List, Optional +from typing import Any, AsyncGenerator, Dict, List, Optional import sqlalchemy as sa from datayoga_core import utils from datayoga_core.blocks.relational import utils as relational_utils from datayoga_core.context import Context -from datayoga_core.producer import Message from datayoga_core.producer import Producer as DyProducer logger = logging.getLogger("dy") class Block(DyProducer): + """Producer block that reads rows from a SQL-compatible relational database.""" + + DEFAULT_FETCH_SIZE = 10000 def init(self, context: Optional[Context] = None): + """Initializes the engine, autoloads the target table, and opens a connection.""" self.engine, self.db_type = relational_utils.get_engine( self.properties["connection"], context, - autocommit=False + autocommit=False, ) self.schema = self.properties.get("schema") @@ -32,16 +35,17 @@ def init(self, context: Optional[Context] = None): logger.debug(f"Connecting to {self.db_type}") self.connection = self.engine.connect() - async def produce(self) -> AsyncGenerator[List[Message], None]: + async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + """Yields each `fetchmany(fetch_size)` result as a chunk; the base class re-chunks to `batch_size`.""" + fetch_size = int(self.properties.get("fetch_size", self.DEFAULT_FETCH_SIZE)) result = self.connection.execution_options(stream_results=True).execute(self.tbl.select()) - while True: - chunk = result.fetchmany(10000) - if not chunk: - break - for row in chunk: - yield [utils.add_uid(dict(row._asdict()))] + rows = result.fetchmany(fetch_size) + if not rows: + return + yield [utils.add_uid(dict(row._asdict())) for row in rows] def stop(self): + """Closes the database connection and disposes of the engine.""" self.connection.close() self.engine.dispose() diff --git a/core/src/datayoga_core/blocks/relational/read/block.schema.json b/core/src/datayoga_core/blocks/relational/read/block.schema.json index 4a65a8fc..29f5715a 100644 --- a/core/src/datayoga_core/blocks/relational/read/block.schema.json +++ b/core/src/datayoga_core/blocks/relational/read/block.schema.json @@ -1,8 +1,10 @@ { + "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "relational.read", "description": "Read a table from an SQL-compatible data store", "type": "object", - "additionalProperties": false, + "allOf": [{ "$ref": "../../../resources/schemas/batchable.schema.json" }], + "unevaluatedProperties": false, "examples": [ { "id": "read_snowflake", @@ -41,6 +43,12 @@ "title": "name of column" }, "examples": [["fname", { "lname": "last_name" }]] + }, + "fetch_size": { + "type": "integer", + "minimum": 1, + "description": "Driver-level rows fetched per round-trip. Defaults to 10000.", + "default": 10000 } }, "required": ["connection", "table"] diff --git a/core/src/datayoga_core/blocks/relational/read/tests/__init__.py b/core/src/datayoga_core/blocks/relational/read/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/src/datayoga_core/blocks/relational/read/tests/test_relational_read.py b/core/src/datayoga_core/blocks/relational/read/tests/test_relational_read.py new file mode 100644 index 00000000..47528712 --- /dev/null +++ b/core/src/datayoga_core/blocks/relational/read/tests/test_relational_read.py @@ -0,0 +1,87 @@ +from unittest.mock import MagicMock + +import pytest +from datayoga_core.blocks.relational.read.block import Block + + +async def _drain(producer): + """Collects all batches emitted by a producer until end-of-stream.""" + out = [] + async for batch in producer.produce(): + out.append(batch) + return out + + +def _fake_result(rows): + """Builds a fake SQLAlchemy result that returns rows in fetchmany chunks.""" + state = {"i": 0} + + def fetchmany(n): + i = state["i"] + chunk = rows[i:i + n] + state["i"] += len(chunk) + return chunk + + res = MagicMock() + res.fetchmany.side_effect = fetchmany + res.execution_options.return_value = res + return res + + +class _Row: + """Stand-in for a SQLAlchemy Row exposing only `_asdict()`.""" + + def __init__(self, d): + """Stores the underlying dict that `_asdict()` will return.""" + self._d = d + + def _asdict(self): + """Returns the stored dict, matching SQLAlchemy Row's API.""" + return self._d + + +def _mk_block(properties, fake_result): + """Builds a relational/read Block without running its real init() (mocks engine/connection).""" + block = Block.__new__(Block) + block.properties = properties + block.connection = MagicMock() + block.tbl = MagicMock() + block.tbl.select.return_value = "SELECT *" + block.connection.execution_options.return_value.execute.return_value = fake_result + return block + + +@pytest.mark.asyncio +async def test_relational_read_yields_batches_not_rows(): + """2500 rows with batch_size=1000 yield [1000, 1000, 500], not 2500 single-row batches.""" + rows = [_Row({"i": i}) for i in range(2500)] + fake_result = _fake_result(rows) + block = _mk_block({"batch_size": 1000}, fake_result) + batches = await _drain(block) + assert [len(b) for b in batches] == [1000, 1000, 500] + + +@pytest.mark.asyncio +async def test_relational_read_fetch_size_independent_of_batch_size(): + """fetch_size controls driver round-trips; batch_size controls downstream batches; both are decoupled.""" + rows = [_Row({"i": i}) for i in range(5000)] + fake_result = _fake_result(rows) + block = _mk_block({"batch_size": 1000, "fetch_size": 2500}, fake_result) + batches = await _drain(block) + # Downstream batches are still batch_size=1000 + assert [len(b) for b in batches] == [1000, 1000, 1000, 1000, 1000] + # Driver fetched in fetch_size=2500 chunks (2500 + 2500 + 0) + fetch_sizes = [c.args[0] for c in fake_result.fetchmany.call_args_list] + assert fetch_sizes[0] == 2500 + assert fetch_sizes[1] == 2500 + + +@pytest.mark.asyncio +async def test_relational_read_default_fetch_size_is_10000(): + """When fetch_size is omitted, the driver-level fetchmany is called with 10000.""" + rows = [_Row({"i": i}) for i in range(500)] + fake_result = _fake_result(rows) + block = _mk_block({}, fake_result) + await _drain(block) + fetch_sizes = [c.args[0] for c in fake_result.fetchmany.call_args_list] + assert fetch_sizes[0] == 10000 diff --git a/core/src/datayoga_core/blocks/std/read/block.py b/core/src/datayoga_core/blocks/std/read/block.py index e0b60b13..8ff15811 100644 --- a/core/src/datayoga_core/blocks/std/read/block.py +++ b/core/src/datayoga_core/blocks/std/read/block.py @@ -6,59 +6,42 @@ import orjson from datayoga_core.context import Context -from datayoga_core.producer import Message from datayoga_core.producer import Producer as DyProducer logger = logging.getLogger("dy") class Block(DyProducer): + """Producer block that reads JSON records from standard input.""" def init(self, context: Optional[Context] = None): + """Initializes the block.""" logger.debug(f"Initializing {self.get_block_name()}") - self.batch_size = int(self.properties.get("batch_size", 1000)) - logger.info(f"Using batch size: {self.batch_size}") - async def process_batch(self, records: List[Dict[str, Any]]) -> AsyncGenerator[List[Message], None]: - """Process records and yield batches according to batch_size""" - batch = [] - for record in records: - batch.append(self.get_message(record)) + async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + """Reads all stdin records and yields them as a single chunk. - # When batch is full, yield it - if len(batch) >= self.batch_size: - logger.info(f"Yielding batch of {len(batch)} records") - yield batch - batch = [] - - # Yield any remaining records - if batch: - logger.info(f"Yielding final batch of {len(batch)} records") - yield batch - - async def produce(self) -> AsyncGenerator[List[Message], None]: - if select.select([sys.stdin, ], [], [], 0.0)[0]: - # piped data exists - all_records = [] - for data in sys.stdin: - all_records.extend(self.get_records(data)) + The base class re-chunks the output to `batch_size` records per batch. + """ + if select.select([sys.stdin], [], [], 0.0)[0]: + all_records: List[Dict[str, Any]] = [] + for line in sys.stdin: + all_records.extend(self.get_records(line)) else: - # interactive mode print("Enter data to process:") - data = input() - all_records = self.get_records(data) + all_records = self.get_records(input()) - async for batch in self.process_batch(all_records): - yield batch + if all_records: + yield [self.get_message(record) for record in all_records] @staticmethod def get_records(data: str) -> List[Dict[str, Any]]: + """Parses a JSON string into a list of records (wraps single objects in a list).""" records = orjson.loads(data) - if isinstance(records, dict): records = [records] - return records - def get_message(self, record: Dict[str, Any]) -> Message: + def get_message(self, record: Dict[str, Any]) -> Dict[str, Any]: + """Returns the record with a generated message id field added.""" return {self.MSG_ID_FIELD: str(uuid.uuid4()), **record} diff --git a/core/src/datayoga_core/blocks/std/read/block.schema.json b/core/src/datayoga_core/blocks/std/read/block.schema.json index 38ad05af..5d825898 100644 --- a/core/src/datayoga_core/blocks/std/read/block.schema.json +++ b/core/src/datayoga_core/blocks/std/read/block.schema.json @@ -1,12 +1,9 @@ { + "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "std.read", "description": "Read from the standard input", "type": "object", - "properties": { - "batch_size": { - "type": "integer", - "description": "Number of records to process in a single batch", - "default": 1000 - } - } + "allOf": [{ "$ref": "../../../resources/schemas/batchable.schema.json" }], + "properties": {}, + "unevaluatedProperties": false } diff --git a/core/src/datayoga_core/blocks/std/read/tests/__init__.py b/core/src/datayoga_core/blocks/std/read/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/src/datayoga_core/blocks/std/read/tests/test_std_read.py b/core/src/datayoga_core/blocks/std/read/tests/test_std_read.py new file mode 100644 index 00000000..6ec3d933 --- /dev/null +++ b/core/src/datayoga_core/blocks/std/read/tests/test_std_read.py @@ -0,0 +1,33 @@ +from unittest.mock import patch + +import orjson +import pytest +from datayoga_core.blocks.std.read.block import Block + + +async def _drain(producer): + """Collects all batches emitted by a producer until end-of-stream.""" + out = [] + async for batch in producer.produce(): + out.append(batch) + return out + + +@pytest.mark.asyncio +async def test_std_read_batches_to_batch_size(): + """2500 stdin records with batch_size=1000 yield batches of [1000, 1000, 500].""" + payload = [{"i": i} for i in range(2500)] + fake_stdin = [orjson.dumps(payload).decode()] + + block = Block({"batch_size": 1000}) + block.init() + + with patch("datayoga_core.blocks.std.read.block.select.select", + return_value=([object()], [], [])), \ + patch("datayoga_core.blocks.std.read.block.sys.stdin", fake_stdin): + batches = await _drain(block) + + assert [len(b) for b in batches] == [1000, 1000, 500] + flat = [r for b in batches for r in b] + assert flat[0]["i"] == 0 + assert all(Block.MSG_ID_FIELD in r for r in flat) diff --git a/core/src/datayoga_core/job.py b/core/src/datayoga_core/job.py index 082dde7c..710d84e6 100644 --- a/core/src/datayoga_core/job.py +++ b/core/src/datayoga_core/job.py @@ -237,10 +237,12 @@ def get_json_schema(whitelisted_blocks: Optional[List[str]] = None) -> Dict[str, # Now build the sorted lists block_types = [] block_schemas = [] + # Lazy import: schema_utils -> utils -> block creates a circular import at module load. + from datayoga_core.schema_utils import resolve_refs for block_type, schema_path in block_info: block_types.append(block_type) # load schema file - schema = utils.read_json(f"{schema_path}") + schema = resolve_refs(utils.read_json(f"{schema_path}"), schema_path=f"{schema_path}") # append to the array of allOf for the full schema # we use allOf for better error reporting block_schemas.append({ diff --git a/core/src/datayoga_core/producer.py b/core/src/datayoga_core/producer.py index e32b2e01..a199d446 100644 --- a/core/src/datayoga_core/producer.py +++ b/core/src/datayoga_core/producer.py @@ -1,30 +1,121 @@ -from abc import abstractmethod -from typing import Any, AsyncGenerator, Dict, List +import asyncio +import logging +from contextlib import suppress +from typing import Any, AsyncGenerator, Dict, List, Optional from .block import Block +logger = logging.getLogger("dy") + class Message: + """A message produced by a producer block.""" + def __init__(self, msg_id: str, value: Dict[str, Any]): + """Initializes a message with an id and a payload value.""" self.msg_id = msg_id self.value = value class Producer(Block): + """Base class for producer (read) blocks. + + Subclasses override `produce_chunks()` to yield chunks of any size from + the source. The default `produce()` re-chunks them to exactly `batch_size` + records per batch (smaller on flush_ms timeout or end-of-stream). + + Legacy subclasses may still override `produce()` directly. They bypass + the base-class batching and `produce_chunks` is not called. + """ - @abstractmethod - async def produce(self) -> AsyncGenerator[List[Message], None]: - """Produces data + DEFAULT_BATCH_SIZE: int = 1000 + DEFAULT_FLUSH_MS: Optional[int] = None # streaming subclasses override to enable timeout flush - Returns: - AsyncGenerator[List[Message], None]: A generator of message batches. + async def produce_chunks(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + """Yield natural-size chunks from the source. + + Subclasses should override this method. The base-class `produce()` + will re-chunk the output to exact `batch_size` slices. """ - raise NotImplementedError + raise NotImplementedError( + f"{type(self).__name__} must override produce_chunks() or produce()" + ) + # Make this an async generator for type-checking purposes. + yield # pragma: no cover - def ack(self, msg_ids: List[str]): - """Sends acknowledge for the message IDs of the records that have been processed + async def produce(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + """Re-chunks `produce_chunks()` output into batches of up to `batch_size`. + + Each batch is exactly `batch_size` except for the last batch on + end-of-stream and any partial batch flushed by `flush_ms` inactivity. + + Reads `batch_size` and `flush_ms` from properties lazily so subclasses + don't need to remember to call `super().init()`. - Args: - msg_ids (List[str]): Message IDs + Source errors raised by `produce_chunks()` propagate to the caller (the + job aborts) rather than being treated as a silent end-of-stream. The + background pump uses a bounded queue so source reads cannot outpace + downstream consumption — the existing backpressure is preserved. """ + batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE)) + flush_ms = self.properties.get("flush_ms", self.DEFAULT_FLUSH_MS) + timeout = (flush_ms / 1000) if flush_ms else None + + # maxsize=1 keeps the pump exactly one chunk ahead of the consumer, + # which restores the natural backpressure the old yield-driven model had. + queue: asyncio.Queue = asyncio.Queue(maxsize=1) + EOS = object() + pump_error: List[BaseException] = [] # length 0 or 1 + + async def pump(): + """Drains produce_chunks() into the queue; signals EOS on exit and captures errors.""" + cancelled = False + try: + async for chunk in self.produce_chunks(): + if chunk: + await queue.put(chunk) + except asyncio.CancelledError: + cancelled = True + raise + except BaseException as exc: + pump_error.append(exc) + finally: + # Skip the EOS put when cancelled — the consumer's finally is + # awaiting us, the queue may be full (maxsize=1), and putting + # would deadlock. The consumer won't read EOS anyway. + if not cancelled: + await queue.put(EOS) + + pump_task = asyncio.create_task(pump()) + buffer: List[Dict[str, Any]] = [] + try: + while True: + try: + item = await asyncio.wait_for(queue.get(), timeout=timeout) + except asyncio.TimeoutError: + if buffer: + yield buffer + buffer = [] + continue + + if item is EOS: + if buffer: + yield buffer + if pump_error: + # Re-raise the source error so the job fails loudly + # instead of treating a truncated read as success. + raise pump_error[0] + return + + buffer.extend(item) + while len(buffer) >= batch_size: + yield buffer[:batch_size] + buffer = buffer[batch_size:] + finally: + pump_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await pump_task + + def ack(self, msg_ids: List[str]): + """Sends acknowledge for the message IDs of records that have been processed.""" pass diff --git a/core/src/datayoga_core/resources/schemas/batchable.schema.json b/core/src/datayoga_core/resources/schemas/batchable.schema.json new file mode 100644 index 00000000..c04fb8fa --- /dev/null +++ b/core/src/datayoga_core/resources/schemas/batchable.schema.json @@ -0,0 +1,14 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "batchable", + "description": "Producer batching mixin: declares batch_size for producers that yield records in batches.", + "type": "object", + "properties": { + "batch_size": { + "type": "integer", + "minimum": 1, + "description": "Maximum number of records yielded per downstream batch.", + "default": 1000 + } + } +} diff --git a/core/src/datayoga_core/resources/schemas/streamable.schema.json b/core/src/datayoga_core/resources/schemas/streamable.schema.json new file mode 100644 index 00000000..0bdba461 --- /dev/null +++ b/core/src/datayoga_core/resources/schemas/streamable.schema.json @@ -0,0 +1,20 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "streamable", + "description": "Streaming producer mixin: declares batch_size and flush_ms for producers reading from continuous sources.", + "type": "object", + "properties": { + "batch_size": { + "type": "integer", + "minimum": 1, + "description": "Maximum number of records yielded per downstream batch.", + "default": 1000 + }, + "flush_ms": { + "type": ["integer", "null"], + "minimum": 1, + "description": "If set, flush a partial batch after this many ms of inactivity. null or omitted = wait until batch_size or end-of-stream.", + "default": 1000 + } + } +} diff --git a/core/src/datayoga_core/schema_utils.py b/core/src/datayoga_core/schema_utils.py new file mode 100644 index 00000000..c170b733 --- /dev/null +++ b/core/src/datayoga_core/schema_utils.py @@ -0,0 +1,88 @@ +"""Schema composition helpers. + +Producer block schemas use standard JSON Schema composition via `$ref` + +`allOf` (with `unevaluatedProperties: false` to allow inherited properties). +At validation time we want to keep the simple `jsonschema.validate(instance, +schema)` code path, so we resolve any local-file `$ref`s into the schema +ahead of time. The on-disk schemas remain standard JSON Schema; only the +in-memory form is flattened. + +Example: a block schema like + + {"allOf": [{"$ref": "../../../resources/schemas/batchable.schema.json"}], + "properties": {...}, + "unevaluatedProperties": false} + +becomes + + {"allOf": [], + "properties": {...}, + "unevaluatedProperties": false} + +after `resolve_refs(schema, schema_path)`. +""" +from __future__ import annotations + +import copy +from os import path +from typing import Any, Dict, Optional, Set + +from datayoga_core import utils + + +def resolve_refs(schema: Dict[str, Any], schema_path: Optional[str] = None) -> Dict[str, Any]: + """Return a copy of `schema` with local-file `$ref`s inlined recursively. + + Args: + schema: The schema to resolve. + schema_path: Filesystem path the schema was loaded from. Used to + resolve relative `$ref` paths. If None, refs are resolved against + the bundled/non-bundled resources/schemas directory. + + Returns: + A new schema with all local-file $refs replaced by the referenced + document's contents. Non-local refs (http://, #fragments) and + non-existent files pass through unchanged or raise depending on form. + + Raises: + FileNotFoundError: A local-file $ref points at a file that doesn't exist. + ValueError: A circular $ref chain is detected. + """ + if schema_path is not None: + base_dir = path.dirname(path.abspath(schema_path)) + else: + base_dir = utils.get_resource_path("schemas") + + return _resolve_node(schema, base_dir, visited=set()) + + +def _resolve_node(node: Any, base_dir: str, visited: Set[str]) -> Any: + if isinstance(node, dict): + ref = node.get("$ref") + if isinstance(ref, str) and _is_local_file_ref(ref): + target = path.normpath(path.join(base_dir, ref)) + if target in visited: + raise ValueError(f"Circular $ref detected resolving '{ref}' at {target}") + if not path.isfile(target): + raise FileNotFoundError( + f"$ref target not found: '{ref}' resolved to {target}" + ) + fragment = utils.read_json(target) + visited.add(target) + try: + resolved = _resolve_node(fragment, path.dirname(target), visited) + finally: + visited.discard(target) + return resolved + return {k: _resolve_node(v, base_dir, visited) for k, v in node.items()} + if isinstance(node, list): + return [_resolve_node(item, base_dir, visited) for item in node] + return copy.copy(node) + + +def _is_local_file_ref(ref: str) -> bool: + """A $ref is a local file ref if it looks like a path to a .json/.schema.json + file with no URI scheme and no in-document fragment.""" + if ref.startswith("#") or "://" in ref: + return False + return ref.endswith(".json") diff --git a/core/src/datayoga_core/tests/__init__.py b/core/src/datayoga_core/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/src/datayoga_core/tests/test_producer_batching.py b/core/src/datayoga_core/tests/test_producer_batching.py new file mode 100644 index 00000000..c3fe94b0 --- /dev/null +++ b/core/src/datayoga_core/tests/test_producer_batching.py @@ -0,0 +1,296 @@ +import asyncio +from typing import AsyncGenerator, List, Optional + +import pytest +from datayoga_core.context import Context +from datayoga_core.producer import Message, Producer + + +def _msg(i: int) -> dict: + """Builds a record carrying the producer MSG_ID_FIELD and a numeric value.""" + return {Producer.MSG_ID_FIELD: str(i), "v": i} + + +class FakeProducer(Producer): + """Producer driven by a scripted list of chunks plus optional sleeps.""" + + def __init__(self, properties=None, *, chunks=None, sleep_before=None): + """Configures the scripted chunks and optional per-chunk sleep delays.""" + # schema for a FakeProducer; declare batch_size/flush_ms so validation passes + self._test_schema = { + "type": "object", + "properties": { + "batch_size": {"type": "integer", "minimum": 1}, + "flush_ms": {"type": ["integer", "null"], "minimum": 1}, + }, + } + self._chunks = chunks or [] + self._sleep_before = sleep_before or [] + super().__init__(properties or {}) + + def get_json_schema(self): + """Returns the in-memory test schema (avoids reading from disk).""" + return self._test_schema + + def init(self, context: Optional[Context] = None): + """No-op init; FakeProducer doesn't need any setup.""" + pass + + async def produce_chunks(self) -> AsyncGenerator[List[Message], None]: + """Yields the scripted chunks, optionally sleeping before each one.""" + for i, chunk in enumerate(self._chunks): + if i < len(self._sleep_before) and self._sleep_before[i]: + await asyncio.sleep(self._sleep_before[i]) + yield chunk + + +async def _drain(producer: Producer): + """Collects all batches emitted by a producer until end-of-stream.""" + out = [] + async for batch in producer.produce(): + out.append(batch) + return out + + +@pytest.mark.asyncio +async def test_rechunks_one_large_chunk(): + """One 5000-record chunk + batch_size=1000 yields five batches of 1000.""" + chunks = [[_msg(i) for i in range(5000)]] + p = FakeProducer({"batch_size": 1000}, chunks=chunks) + batches = await _drain(p) + assert [len(b) for b in batches] == [1000, 1000, 1000, 1000, 1000] + + +@pytest.mark.asyncio +async def test_accumulates_small_chunks_and_flushes_on_eos(): + """Small chunks (200+300+400=900) are accumulated; the partial batch flushes on EOS.""" + chunks = [[_msg(i) for i in range(200)], + [_msg(i) for i in range(200, 500)], + [_msg(i) for i in range(500, 900)]] + p = FakeProducer({"batch_size": 1000}, chunks=chunks) + batches = await _drain(p) + assert [len(b) for b in batches] == [900] + + +@pytest.mark.asyncio +async def test_partial_final_batch_on_eos(): + """1500 records + batch_size=1000 yields [1000, 500] — the trailing partial fires on EOS.""" + chunks = [[_msg(i) for i in range(1500)]] + p = FakeProducer({"batch_size": 1000}, chunks=chunks) + batches = await _drain(p) + assert [len(b) for b in batches] == [1000, 500] + + +@pytest.mark.asyncio +async def test_empty_chunks_are_ignored(): + """Empty chunks from produce_chunks() don't produce empty batches.""" + chunks = [[], [_msg(1), _msg(2)], [], [_msg(3)]] + p = FakeProducer({"batch_size": 10}, chunks=chunks) + batches = await _drain(p) + assert [len(b) for b in batches] == [3] + + +@pytest.mark.asyncio +async def test_flush_ms_emits_partial_on_inactivity(): + """With flush_ms set, a partial batch is emitted on source inactivity, not held to EOS.""" + # one chunk of 2 records, then a 300ms wait before EOS; flush_ms=100 should + # flush the partial batch of 2 well before EOS. + chunks = [[_msg(1), _msg(2)], [_msg(3)]] + sleeps = [0, 0.3] + p = FakeProducer({"batch_size": 100, "flush_ms": 100}, + chunks=chunks, sleep_before=sleeps) + + received = [] + started = asyncio.get_event_loop().time() + timings = [] + async for batch in p.produce(): + timings.append(asyncio.get_event_loop().time() - started) + received.append(batch) + + assert [len(b) for b in received] == [2, 1] + # first flush happens because of inactivity (~100ms), not waiting for chunk 2 + assert timings[0] < 0.25, f"expected first flush before 250ms, got {timings[0]}" + + +@pytest.mark.asyncio +async def test_no_flush_ms_holds_records_until_eos(): + """Without flush_ms, accumulated records stay buffered until batch_size or EOS.""" + chunks = [[_msg(1)], [_msg(2)]] + sleeps = [0, 0.1] + p = FakeProducer({"batch_size": 100}, chunks=chunks, sleep_before=sleeps) + batches = await _drain(p) + assert [len(b) for b in batches] == [2] # combined on EOS, never flushed mid-stream + + +@pytest.mark.asyncio +async def test_consumer_cancellation_cleans_up_pump(): + """Closing the producer generator cancels the pump cleanly (no orphaned task warnings).""" + chunks = [[_msg(i)] for i in range(1000)] + p = FakeProducer({"batch_size": 10, "flush_ms": 50}, chunks=chunks, + sleep_before=[0.05] * 1000) + + gen = p.produce() + first = await gen.__anext__() + assert len(first) >= 1 + await gen.aclose() + # If pump task wasn't cleaned up we'd see a "Task was destroyed but it is + # pending!" warning here. Sleep briefly so the loop has a chance to surface it. + await asyncio.sleep(0.1) + + +@pytest.mark.asyncio +async def test_external_task_cancellation_cleans_up_pump(): + """When the task iterating produce() is cancelled (e.g., Job.run is cancelled + by the runtime), the producer's pump task must clean up. This is the + Job-shutdown scenario: an external cancellation propagates through the + async-for loop into the producer generator's finally.""" + chunks = [[_msg(i)] for i in range(10_000)] + p = FakeProducer({"batch_size": 5, "flush_ms": 50}, chunks=chunks, + sleep_before=[0.01] * 10_000) + + async def consume(): + # Mirrors Job.run's iteration pattern. + async for batch in p.produce(): + pass # downstream processing would happen here + + consumer_task = asyncio.create_task(consume()) + await asyncio.sleep(0.05) # let the producer ramp up — some batches arrive + consumer_task.cancel() + with pytest.raises(asyncio.CancelledError): + await consumer_task + # Give the loop a moment to settle any pending finalizers. + await asyncio.sleep(0.1) + + # No producer pump task should remain after cancellation. We identify the + # pump specifically by Producer.produce..pump in its qualname, + # since the test's own name happens to contain "pump". + remaining = [t for t in asyncio.all_tasks() if not t.done()] + pump_tasks = [ + t for t in remaining + if "Producer.produce" in (t.get_coro().__qualname__ or "") + ] + assert not pump_tasks, \ + f"orphaned producer pump tasks after cancellation: " \ + f"{[t.get_coro().__qualname__ for t in pump_tasks]}" + + +class _BoomProducer(Producer): + """Producer whose produce_chunks() raises after emitting some chunks.""" + + def __init__(self, properties, *, before_error, error): + """Configures how many chunks to emit before raising.""" + self._test_schema = { + "type": "object", + "properties": {"batch_size": {"type": "integer", "minimum": 1}}, + } + self._before_error = before_error + self._error = error + super().__init__(properties) + + def get_json_schema(self): + """Returns the in-memory test schema (avoids reading from disk).""" + return self._test_schema + + def init(self, context: Optional[Context] = None): + """No-op init; _BoomProducer doesn't need any setup.""" + pass + + async def produce_chunks(self) -> AsyncGenerator[List[Message], None]: + """Emits the scripted lead-in chunks, then raises the configured exception.""" + for chunk in self._before_error: + yield chunk + raise self._error + + +@pytest.mark.asyncio +async def test_source_errors_propagate_instead_of_silent_eos(): + """A failing source must abort the consumer, not look like clean EOS.""" + p = _BoomProducer( + {"batch_size": 100}, + before_error=[[_msg(1), _msg(2)]], + error=RuntimeError("source connection lost"), + ) + with pytest.raises(RuntimeError, match="source connection lost"): + async for _ in p.produce(): + pass + + +@pytest.mark.asyncio +async def test_source_error_flushes_buffer_before_raising(): + """Partial buffer is yielded before the error propagates, so already-read + records aren't dropped on top of the error.""" + p = _BoomProducer( + {"batch_size": 1000}, + before_error=[[_msg(1), _msg(2), _msg(3)]], + error=RuntimeError("disk read failed"), + ) + received = [] + with pytest.raises(RuntimeError, match="disk read failed"): + async for batch in p.produce(): + received.append(batch) + assert [len(b) for b in received] == [3] + + +class _CountingProducer(Producer): + """Producer that records how many chunks it has been allowed to emit. + + Used to prove the base class applies backpressure (the pump stays no more + than one chunk ahead of the consumer when maxsize=1). + """ + + def __init__(self, properties, *, num_chunks, chunk_size, on_emit): + """Configures how many fixed-size chunks to emit and a per-emit hook.""" + self._test_schema = { + "type": "object", + "properties": {"batch_size": {"type": "integer", "minimum": 1}}, + } + self._num_chunks = num_chunks + self._chunk_size = chunk_size + self._on_emit = on_emit + super().__init__(properties) + + def get_json_schema(self): + """Returns the in-memory test schema (avoids reading from disk).""" + return self._test_schema + + def init(self, context: Optional[Context] = None): + """No-op init; _CountingProducer doesn't need any setup.""" + pass + + async def produce_chunks(self) -> AsyncGenerator[List[Message], None]: + """Yields num_chunks fixed-size chunks, calling on_emit after each yield.""" + for i in range(self._num_chunks): + yield [_msg(i * self._chunk_size + j) for j in range(self._chunk_size)] + self._on_emit(i + 1) + + +@pytest.mark.asyncio +async def test_pump_does_not_outrun_consumer_unboundedly(): + """With the default bounded queue, the pump stays close to the consumer. + + Without backpressure, the pump would emit all 1000 chunks before the + consumer reads any. With maxsize=1 the pump can be at most ~2 chunks + ahead at any moment (one being put, one queued). + """ + emitted_count = [0] + + def record_emit(n): + emitted_count[0] = n + + p = _CountingProducer( + {"batch_size": 100}, + num_chunks=1000, + chunk_size=100, + on_emit=record_emit, + ) + + gen = p.produce() + # Pull one batch and observe how far ahead the pump got. + await gen.__anext__() + # Yield once so the pump gets a chance to advance after the consumer + # took one chunk off the queue. + await asyncio.sleep(0) + ahead = emitted_count[0] + await gen.aclose() + # Pump should be at most a handful of chunks ahead, not all 1000. + assert ahead <= 5, f"pump emitted {ahead} chunks while consumer pulled 1" diff --git a/core/src/datayoga_core/tests/test_producer_batching_properties.py b/core/src/datayoga_core/tests/test_producer_batching_properties.py new file mode 100644 index 00000000..d288c3a1 --- /dev/null +++ b/core/src/datayoga_core/tests/test_producer_batching_properties.py @@ -0,0 +1,141 @@ +"""Property-based tests for the Producer base-class rechunker. + +Where `test_producer_batching.py` asserts specific outputs for specific inputs, +this file uses Hypothesis to generate arbitrary chunk-size sequences and probe +the rechunker's invariants. Catches the class of bug where the code works for +the inputs you tested but breaks somewhere in the wider input space. +""" +import asyncio +from typing import AsyncGenerator, Dict, List, Optional + +import pytest +from datayoga_core.context import Context +from datayoga_core.producer import Producer +from hypothesis import given, settings +from hypothesis import strategies as st + + +class _ScriptedProducer(Producer): + """Producer driven by a scripted list of chunk-sizes; each chunk has + sequential integer payloads.""" + + def __init__(self, properties, *, chunk_sizes): + """Wires the schema and chunk script.""" + self._test_schema = { + "type": "object", + "properties": {"batch_size": {"type": "integer", "minimum": 1}}, + } + self._chunk_sizes = chunk_sizes + super().__init__(properties) + + def get_json_schema(self): + """In-memory schema (no disk read).""" + return self._test_schema + + def init(self, context: Optional[Context] = None): + """No-op.""" + pass + + async def produce_chunks(self) -> AsyncGenerator[List[Dict], None]: + """Yield chunks of the scripted sizes, with sequential payload ids.""" + counter = 0 + for size in self._chunk_sizes: + chunk = [{Producer.MSG_ID_FIELD: str(counter + i), "v": counter + i} + for i in range(size)] + counter += size + yield chunk + + +async def _drain(producer: Producer): + out = [] + async for batch in producer.produce(): + out.append(batch) + return out + + +# Strategies +chunk_sizes_strategy = st.lists( + st.integers(min_value=0, max_value=200), + min_size=0, + max_size=20, +) +batch_size_strategy = st.integers(min_value=1, max_value=300) + + +@given(chunk_sizes=chunk_sizes_strategy, batch_size=batch_size_strategy) +@settings(max_examples=200, deadline=2000) +def test_property_record_conservation(chunk_sizes, batch_size): + """The total number of records yielded downstream equals the total number + yielded by produce_chunks. No records lost; none duplicated.""" + p = _ScriptedProducer({"batch_size": batch_size}, chunk_sizes=chunk_sizes) + batches = asyncio.run(_drain(p)) + expected_total = sum(chunk_sizes) + actual_total = sum(len(b) for b in batches) + assert actual_total == expected_total, \ + f"chunk_sizes={chunk_sizes}, batch_size={batch_size}: " \ + f"expected {expected_total} records, got {actual_total}" + + +@given(chunk_sizes=chunk_sizes_strategy, batch_size=batch_size_strategy) +@settings(max_examples=200, deadline=2000) +def test_property_record_order_preserved(chunk_sizes, batch_size): + """Records flow downstream in the same order produce_chunks emits them. + Re-chunking doesn't shuffle.""" + p = _ScriptedProducer({"batch_size": batch_size}, chunk_sizes=chunk_sizes) + batches = asyncio.run(_drain(p)) + flat = [r["v"] for b in batches for r in b] + expected = list(range(sum(chunk_sizes))) + assert flat == expected, \ + f"chunk_sizes={chunk_sizes}, batch_size={batch_size}: order mismatch" + + +@given(chunk_sizes=chunk_sizes_strategy, batch_size=batch_size_strategy) +@settings(max_examples=200, deadline=2000) +def test_property_batch_sizes_well_formed(chunk_sizes, batch_size): + """Every batch is non-empty AND has length ≤ batch_size. All batches except + possibly the last have length == batch_size (the last may be partial on EOS).""" + p = _ScriptedProducer({"batch_size": batch_size}, chunk_sizes=chunk_sizes) + batches = asyncio.run(_drain(p)) + for i, b in enumerate(batches): + assert len(b) > 0, f"batch {i} is empty: {batches}" + assert len(b) <= batch_size, f"batch {i} exceeds batch_size: {len(b)} > {batch_size}" + # All non-final batches should be exactly batch_size (no time-based flush + # here since flush_ms is not set). + for i, b in enumerate(batches[:-1]): + assert len(b) == batch_size, \ + f"batch {i} is partial mid-stream: len={len(b)}, batch_size={batch_size}" + + +@given(chunk_sizes=chunk_sizes_strategy, batch_size=batch_size_strategy) +@settings(max_examples=200, deadline=2000) +def test_property_no_empty_emissions(chunk_sizes, batch_size): + """If produce_chunks emits empty chunks, the base class doesn't propagate + them downstream.""" + # Inject empty chunks throughout the sequence. + chunks_with_empties = [] + for size in chunk_sizes: + chunks_with_empties.append(0) # empty + chunks_with_empties.append(size) + p = _ScriptedProducer({"batch_size": batch_size}, chunk_sizes=chunks_with_empties) + batches = asyncio.run(_drain(p)) + for i, b in enumerate(batches): + assert len(b) > 0, f"empty batch emitted at index {i}" + + +@given(num_records=st.integers(min_value=0, max_value=500), + batch_size=st.integers(min_value=1, max_value=100)) +@settings(max_examples=100, deadline=2000) +def test_property_partial_final_batch_only(num_records, batch_size): + """When all records come in one big chunk, the output is N full batches plus + optionally one partial batch — never a partial in the middle.""" + p = _ScriptedProducer({"batch_size": batch_size}, chunk_sizes=[num_records]) + batches = asyncio.run(_drain(p)) + if num_records == 0: + assert batches == [], "expected no batches for empty source" + return + expected_full, remainder = divmod(num_records, batch_size) + sizes = [len(b) for b in batches] + if remainder == 0: + assert sizes == [batch_size] * expected_full + else: + assert sizes == [batch_size] * expected_full + [remainder] diff --git a/core/src/datayoga_core/tests/test_schema_refs.py b/core/src/datayoga_core/tests/test_schema_refs.py new file mode 100644 index 00000000..dce5024d --- /dev/null +++ b/core/src/datayoga_core/tests/test_schema_refs.py @@ -0,0 +1,118 @@ +"""Tests for the $ref pre-resolver in `schema_utils.resolve_refs`. + +Block schemas use standard JSON Schema composition (`allOf` + `$ref` to +local fragment files). We pre-resolve those refs at load time so the +in-memory schema is self-contained. +""" +import json +from pathlib import Path + +import pytest +from datayoga_core.schema_utils import resolve_refs + +SCHEMAS_DIR = Path(__file__).resolve().parent.parent / "resources" / "schemas" +BATCHABLE = SCHEMAS_DIR / "batchable.schema.json" + + +def test_resolve_refs_inlines_local_ref(tmp_path): + """A {'$ref': 'localfile.json'} node is replaced inline with the file's contents.""" + fragment = {"type": "object", "properties": {"x": {"type": "integer"}}} + frag_path = tmp_path / "frag.schema.json" + frag_path.write_text(json.dumps(fragment)) + + schema = { + "type": "object", + "allOf": [{"$ref": "frag.schema.json"}], + "properties": {"y": {"type": "string"}}, + } + schema_path = tmp_path / "host.schema.json" + resolved = resolve_refs(schema, schema_path=str(schema_path)) + + assert resolved["allOf"][0] == fragment + assert "$ref" not in json.dumps(resolved) + + +def test_resolve_refs_no_ref_passthrough(tmp_path): + """Schemas with no `$ref` come out structurally equal.""" + schema = {"type": "object", "properties": {"x": {"type": "string"}}} + resolved = resolve_refs(schema, schema_path=str(tmp_path / "host.schema.json")) + assert resolved == schema + + +def test_resolve_refs_resolves_transitively(tmp_path): + """A fragment that itself contains `$ref` is resolved all the way.""" + leaf = {"type": "object", "properties": {"leaf_prop": {"type": "integer"}}} + (tmp_path / "leaf.schema.json").write_text(json.dumps(leaf)) + + middle = {"allOf": [{"$ref": "leaf.schema.json"}]} + (tmp_path / "middle.schema.json").write_text(json.dumps(middle)) + + schema = {"allOf": [{"$ref": "middle.schema.json"}]} + resolved = resolve_refs(schema, schema_path=str(tmp_path / "host.schema.json")) + + # middle's $ref to leaf was resolved as part of the resolution of host's $ref to middle + assert resolved == {"allOf": [{"allOf": [leaf]}]} + + +def test_resolve_refs_missing_file_raises(tmp_path): + """A `$ref` pointing at a missing local file raises FileNotFoundError.""" + schema = {"allOf": [{"$ref": "does_not_exist.schema.json"}]} + with pytest.raises(FileNotFoundError, match="does_not_exist.schema.json"): + resolve_refs(schema, schema_path=str(tmp_path / "host.schema.json")) + + +def test_resolve_refs_detects_circular(tmp_path): + """A → B → A cycle raises ValueError, not infinite recursion.""" + (tmp_path / "a.schema.json").write_text('{"allOf": [{"$ref": "b.schema.json"}]}') + (tmp_path / "b.schema.json").write_text('{"allOf": [{"$ref": "a.schema.json"}]}') + + schema = {"allOf": [{"$ref": "a.schema.json"}]} + with pytest.raises(ValueError, match="Circular"): + resolve_refs(schema, schema_path=str(tmp_path / "host.schema.json")) + + +def test_resolve_refs_ignores_non_local_refs(tmp_path): + """`$ref` values like '#/$defs/x' or 'http://...' are left untouched.""" + schema = { + "allOf": [ + {"$ref": "#/$defs/internal"}, + {"$ref": "https://json-schema.org/draft/2019-09/schema"}, + ], + "$defs": {"internal": {"type": "integer"}}, + } + resolved = resolve_refs(schema, schema_path=str(tmp_path / "host.schema.json")) + assert resolved == schema + + +def test_resolve_refs_against_real_fragment(): + """resolve_refs against the actual batchable fragment in the repo works.""" + # Simulate loading a block schema whose path is at depth blocks/X/Y/. + schema = { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "type": "object", + "allOf": [{"$ref": "../../../resources/schemas/batchable.schema.json"}], + "properties": {"connection": {"type": "string"}}, + "unevaluatedProperties": False, + } + # Pick any real block path so the relative $ref resolves. + block_path = ( + Path(__file__).resolve().parent.parent + / "blocks" / "std" / "read" / "block.schema.json" + ) + resolved = resolve_refs(schema, schema_path=str(block_path)) + # The batchable fragment is inlined inside allOf + assert resolved["allOf"][0]["properties"]["batch_size"]["default"] == 1000 + + +def test_resolve_refs_default_base_dir(): + """When schema_path is None, refs resolve against resources/schemas/.""" + schema = {"allOf": [{"$ref": "batchable.schema.json"}]} + resolved = resolve_refs(schema) + assert resolved["allOf"][0]["properties"]["batch_size"]["default"] == 1000 + + +def test_resolve_refs_default_base_dir_with_missing_file(): + """Without schema_path, refs pointing at unknown files in the resources dir raise.""" + schema = {"allOf": [{"$ref": "nope.schema.json"}]} + with pytest.raises(FileNotFoundError): + resolve_refs(schema) diff --git a/docs/processing-strategies.md b/docs/processing-strategies.md index 8e9e83be..2cf186b3 100644 --- a/docs/processing-strategies.md +++ b/docs/processing-strategies.md @@ -64,6 +64,44 @@ Rate limit allows to set guards for the frequency of processing in a given time The Rate limit strategy defines the number of requests per given time interval. For example, 5 requests a minute. When the limit is reached, processing for this Step will pause until the time period elapses to allow additional calls. +## Producer Batching + +Every producer block (any block that reads from a source — `std/read`, `files/read_csv`, `parquet/read`, `relational/read`, `redis/read_stream`, `azure/read_event_hub`, `http/receiver`) accepts a `batch_size` property. The producer base class re-chunks the source's output into batches of up to `batch_size` records, regardless of how the source delivers them (per row, per row group, per `fetchmany`, per network message). The last batch on end-of-stream and any partial batch flushed by `flush_ms` may be smaller. + +```yaml +input: + uses: files.read_csv + with: + file: people.csv + batch_size: 500 # downstream steps process 500 records per call +``` + +Default: `1000`. + +### Streaming producers and `flush_ms` + +Streaming producers (`redis/read_stream`, `azure/read_event_hub`, `http/receiver`) also accept `flush_ms`. If no new records arrive within that many milliseconds, any partial batch is flushed downstream instead of being held until `batch_size` is reached. + +```yaml +input: + uses: redis.read_stream + with: + connection: my_redis + stream_name: events + batch_size: 1000 + flush_ms: 500 # emit a partial batch after 500ms of inactivity +``` + +Default: `1000` ms. Set to `null` to disable time-based flushing (records are held until `batch_size` or end-of-stream). + +### `relational/read` and `fetch_size` + +`relational/read` exposes an extra `fetch_size` property that controls how many rows are pulled from the database driver per round-trip, independent of the pipeline `batch_size`. Default: `10000`. Tune lower for memory pressure with wide rows; tune higher if you want fewer DB round-trips and downstream processing is the bottleneck. + +### `azure/read_event_hub` migration note + +In earlier versions, `batch_size` on `azure/read_event_hub` controlled the SDK callback batch size, not the pipeline batch size. As of #400 it has been renamed to `max_batch_size` to match the SDK semantic, and `batch_size` now consistently means pipeline batch size as it does for every other producer. + ## Mix and Match The processing strategies can be mixed to fit the specific use case. For example, reading records from a Stream one by one, pushing into a parallel processor to perform a transformation, batched and fanned out to multiple processes to load into a relational database in bulk diff --git a/docs/reference/batchable.md b/docs/reference/batchable.md new file mode 100644 index 00000000..4c344fa8 --- /dev/null +++ b/docs/reference/batchable.md @@ -0,0 +1,24 @@ +--- +parent: Reference +nav_order: 1 +--- + +# batchable + +Producer batching mixin: declares batch_size for producers that yield records in batches. + + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**batch\_size**|`integer`|Maximum number of records yielded per downstream batch.
Default: `1000`
Minimum: `1`
|| + +**Example** + +```yaml +batch_size: 1000 + +``` + + diff --git a/docs/reference/blocks/azure_read_event_hub.md b/docs/reference/blocks/azure_read_event_hub.md index b247fb30..72bb4ef6 100644 --- a/docs/reference/blocks/azure_read_event_hub.md +++ b/docs/reference/blocks/azure_read_event_hub.md @@ -12,17 +12,21 @@ Read from Azure Event Hub |Name|Type|Description|Required| |----|----|-----------|--------| +|**batch\_size**|`integer`|Maximum number of records yielded per downstream batch.
Default: `1000`
Minimum: `1`
|no| +|**flush\_ms**|`integer`, `null`|If set, flush a partial batch after this many ms of inactivity. null or omitted = wait until batch_size or end-of-stream.
Default: `1000`
Minimum: `1`
|no| |**event\_hub\_connection\_string**|`string`|The connection string for the Azure Event Hub namespace.
|yes| |**event\_hub\_consumer\_group\_name**|`string`|The name of the consumer group to read events from.
|yes| |**event\_hub\_name**|`string`|The name of the Azure Event Hub.
|yes| |**checkpoint\_store\_connection\_string**|`string`|The connection string for the Azure Storage account used as the checkpoint store.
|yes| |**checkpoint\_store\_container\_name**|`string`|The name of the container within the checkpoint store to store the checkpoints.
|yes| -|**batch\_size**|`integer`|The maximum number of events to receive in each batch.
Default: `300`
|no| +|**max\_batch\_size**|`integer`|Maximum number of events to receive in each SDK callback. Renamed from the previous batch_size which used to mean this. Defaults to 300.
Default: `300`
Minimum: `1`
|no| **Example** ```yaml -batch_size: 300 +batch_size: 1000 +flush_ms: 1000 +max_batch_size: 300 ``` diff --git a/docs/reference/blocks/files_read_csv.md b/docs/reference/blocks/files_read_csv.md index 3f47237f..4a03458e 100644 --- a/docs/reference/blocks/files_read_csv.md +++ b/docs/reference/blocks/files_read_csv.md @@ -12,15 +12,14 @@ Read data from CSV |Name|Type|Description|Required| |----|----|-----------|--------| +|**batch\_size**|`integer`|Maximum number of records yielded per downstream batch.
Default: `1000`
Minimum: `1`
|no| |**file**|`string`|Filename. Can contain a regexp or glob expression
|yes| |**encoding**|`string`|Encoding to use for reading the file
Default: `"utf-8"`
|no| |[**fields**](#fields)
(List of columns to use)|`string[]`|List of columns to use for extract
Minimal Length: `1`
|no| |**skip**|`number`|Number of lines to skip
Default: `0`
Minimum: `0`
|no| |**delimiter**|`string`|Delimiter to use for splitting the csv records
Default: `","`
Minimal Length: `1`
Maximal Length: `1`
|no| -|**batch\_size**|`number`|Number of records to read per batch
Default: `1000`
Minimum: `1`
|no| |**quotechar**|`string`|A one-character string used to quote fields containing special characters, such as the delimiter or quotechar, or which contain new-line characters. It defaults to '
Default: `"\""`
Minimal Length: `1`
Maximal Length: `1`
|no| -**Additional Properties:** not allowed **Example** ```yaml diff --git a/docs/reference/blocks/http_receiver.md b/docs/reference/blocks/http_receiver.md index 749cadb4..1cad6824 100644 --- a/docs/reference/blocks/http_receiver.md +++ b/docs/reference/blocks/http_receiver.md @@ -12,10 +12,11 @@ Receives HTTP requests and process the data. |Name|Type|Description|Required| |----|----|-----------|--------| +|**batch\_size**|`integer`|Maximum number of records yielded per downstream batch.
Default: `1000`
Minimum: `1`
|| +|**flush\_ms**|`integer`, `null`|If set, flush a partial batch after this many ms of inactivity. null or omitted = wait until batch_size or end-of-stream.
Default: `1000`
Minimum: `1`
|| |**host**|`string`|Host to listen
Default: `"0.0.0.0"`
|| |**port**|`integer`|Port to listen
Default: `8080`
|| -**Additional Properties:** not allowed **Example** ```yaml diff --git a/docs/reference/blocks/parquet_read.md b/docs/reference/blocks/parquet_read.md index ba08da29..19a1c1b3 100644 --- a/docs/reference/blocks/parquet_read.md +++ b/docs/reference/blocks/parquet_read.md @@ -12,9 +12,9 @@ Read data from parquet |Name|Type|Description|Required| |----|----|-----------|--------| +|**batch\_size**|`integer`|Maximum number of records yielded per downstream batch.
Default: `1000`
Minimum: `1`
|no| |**file**|`string`|Filename. Can contain a regexp or glob expression
|yes| -**Additional Properties:** not allowed **Example** ```yaml diff --git a/docs/reference/blocks/redis_read_stream.md b/docs/reference/blocks/redis_read_stream.md index 3c3b6043..317e4497 100644 --- a/docs/reference/blocks/redis_read_stream.md +++ b/docs/reference/blocks/redis_read_stream.md @@ -12,14 +12,17 @@ Read from Redis stream |Name|Type|Description|Required| |----|----|-----------|--------| +|**batch\_size**|`integer`|Maximum number of records yielded per downstream batch.
Default: `1000`
Minimum: `1`
|no| +|**flush\_ms**|`integer`, `null`|If set, flush a partial batch after this many ms of inactivity. null or omitted = wait until batch_size or end-of-stream.
Default: `1000`
Minimum: `1`
|no| |**connection**|`string`|Connection name
|yes| |**stream\_name**
(Source stream name)|`string`|Source stream name
|yes| |**snapshot**
(Snapshot current entries and quit)|`boolean`|Snapshot current entries and quit
Default: `false`
|no| -**Additional Properties:** not allowed **Example** ```yaml +batch_size: 1000 +flush_ms: 1000 snapshot: false ``` diff --git a/docs/reference/blocks/relational_read.md b/docs/reference/blocks/relational_read.md index 1b11df44..409d6adb 100644 --- a/docs/reference/blocks/relational_read.md +++ b/docs/reference/blocks/relational_read.md @@ -12,12 +12,13 @@ Read a table from an SQL-compatible data store |Name|Type|Description|Required| |----|----|-----------|--------| +|**batch\_size**|`integer`|Maximum number of records yielded per downstream batch.
Default: `1000`
Minimum: `1`
|no| |**connection**
(The connection to use for loading)|`string`|Logical connection name as defined in the connections.dy.yaml
|yes| |**schema**
(The table schema of the table)|`string`|If left blank, the default schema of this connection will be used as defined in the connections.dy.yaml
|no| |**table**
(The table name)|`string`|Table name
|yes| |[**columns**](#columns)
(Optional subset of columns to load)|`array`||no| +|**fetch\_size**|`integer`|Driver-level rows fetched per round-trip. Defaults to 10000.
Default: `10000`
Minimum: `1`
|no| -**Additional Properties:** not allowed **Example** ```yaml diff --git a/docs/reference/blocks/relational_write.md b/docs/reference/blocks/relational_write.md index 34e54fed..a8ebabfb 100644 --- a/docs/reference/blocks/relational_write.md +++ b/docs/reference/blocks/relational_write.md @@ -24,15 +24,6 @@ Write into a SQL-compatible data store |[**inactive\_record\_mapping**](#inactive_record_mapping)
(Used for \`TYPE2\` load\_strategy\. The columns mapping to use to close out an active record)|`array`|A list of columns to use. Use any valid SQL expression for the source. If 'target' is omitted, will default to the name of the source column
Default:
|no| **Additional Properties:** not allowed -  - -**No properties.** - -  -**Not [required1]:** -**No properties.** - - **Example** ```yaml diff --git a/docs/reference/blocks/std_read.md b/docs/reference/blocks/std_read.md index aca1c24a..9f858f42 100644 --- a/docs/reference/blocks/std_read.md +++ b/docs/reference/blocks/std_read.md @@ -12,7 +12,7 @@ Read from the standard input |Name|Type|Description|Required| |----|----|-----------|--------| -|**batch\_size**|`integer`|Number of records to process in a single batch
Default: `1000`
|| +|**batch\_size**|`integer`|Maximum number of records yielded per downstream batch.
Default: `1000`
Minimum: `1`
|| **Example** diff --git a/docs/reference/connections.md b/docs/reference/connections.md index bfc2b8d0..580fbb39 100644 --- a/docs/reference/connections.md +++ b/docs/reference/connections.md @@ -1,6 +1,6 @@ --- parent: Reference -nav_order: 1 +nav_order: 2 --- # Connections diff --git a/docs/reference/job.md b/docs/reference/job.md index ed88211d..615a6da8 100644 --- a/docs/reference/job.md +++ b/docs/reference/job.md @@ -1,6 +1,6 @@ --- parent: Reference -nav_order: 2 +nav_order: 3 --- # Job diff --git a/docs/reference/streamable.md b/docs/reference/streamable.md new file mode 100644 index 00000000..49f499cd --- /dev/null +++ b/docs/reference/streamable.md @@ -0,0 +1,26 @@ +--- +parent: Reference +nav_order: 4 +--- + +# streamable + +Streaming producer mixin: declares batch_size and flush_ms for producers reading from continuous sources. + + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**batch\_size**|`integer`|Maximum number of records yielded per downstream batch.
Default: `1000`
Minimum: `1`
|| +|**flush\_ms**|`integer`, `null`|If set, flush a partial batch after this many ms of inactivity. null or omitted = wait until batch_size or end-of-stream.
Default: `1000`
Minimum: `1`
|| + +**Example** + +```yaml +batch_size: 1000 +flush_ms: 1000 + +``` + + diff --git a/docs/superpowers/specs/2026-05-28-producer-batching-unification-design.md b/docs/superpowers/specs/2026-05-28-producer-batching-unification-design.md new file mode 100644 index 00000000..ffc1c7c9 --- /dev/null +++ b/docs/superpowers/specs/2026-05-28-producer-batching-unification-design.md @@ -0,0 +1,393 @@ +# Producer batching unification + +**Status:** Implemented in PR #401 +**Date:** 2026-05-28 +**Issue:** #400 +**Closes:** #293, #294, #295, #296, #377 (as a side effect of the refactor) + +## Problem + +Seven producer blocks each handle (or fail to handle) batching differently: + +| Producer | Bounded/Streaming | `batch_size` today | Behavior | +| ---------------------- | ----------------- | ------------------------------------------------------------------------------------- | ------------------------------------------ | +| `std/read` | bounded | yes, default 1000 _(on `batch_size_in_std_read_block` branch)_ | custom `process_batch` accumulator | +| `files/read_csv` | bounded | yes, default 1000 | own `islice(reader, batch_size)` loop | +| `relational/read` | bounded | **no** — hardcoded `fetchmany(10000)` | yields one row at a time downstream (bug) | +| `parquet/read` | bounded | **no** | yields one row at a time (bug) | +| `redis/read_stream` | streaming | **no** | yields one record at a time (bug #377) | +| `azure/read_event_hub` | streaming | yes, default 300, **but** controls _SDK callback batch size_, not pipeline batch size | drains internal queue in unbounded batches | +| `http/receiver` | streaming | **no** | yields one record per HTTP request (bug) | + +Four are actively buggy (yielding single records into the pipeline when batches are intended). One uses `batch_size` with a different semantic. Each producer that has implemented batching has done it differently. + +The duplication is the root cause of issues #294, #295, #296, and #377 — all four are the same gap, in different blocks. + +## Goal + +Make the `Producer` base class own batching. Subclasses describe how to fetch records; the base class controls the size and timing of batches yielded to the pipeline. + +After the change: + +- `batch_size` means the same thing in every producer: the maximum number of records yielded per downstream batch. +- Adding a new producer cannot reintroduce the "yield single records" bug — there's no place for it to happen. +- Streaming producers get an optional `flush_ms` so partial batches flush on inactivity instead of being held indefinitely. + +Non-goals: changing the `Job`/`Step` pipeline, adding new sources, restructuring the Result/payload model (that's #245). + +## Design + +### Base-class contract + +```python +# core/src/datayoga_core/producer.py + +class Producer(Block): + DEFAULT_BATCH_SIZE = 1000 + DEFAULT_FLUSH_MS = None # streaming subclasses override + + @abstractmethod + async def produce_chunks(self) -> AsyncGenerator[List[Message], None]: + """Yield natural chunks of any size. Base class re-chunks to batch_size.""" + raise NotImplementedError + + async def produce(self) -> AsyncGenerator[List[Message], None]: + """Public entry point. Reads chunks from produce_chunks() and re-emits + in batches of up to batch_size (smaller on EOS or flush_ms), with + bounded backpressure and source-error propagation.""" + ... +``` + +Subclasses override `produce_chunks` instead of `produce`. They emit chunks of any size — whatever's natural to the source (a Parquet row group, a `fetchmany` result, an `xreadgroup` response, an Event Hub callback batch, a single record). + +The base class accumulates chunks and re-emits them in batches of up to `batch_size`, flushing whatever's left on end-of-stream and (for streaming sources) on `flush_ms` inactivity. + +### `batch_size` and `flush_ms` are read lazily + +`produce()` reads `self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE)` on first call, not in `init()`. This avoids the "subclass forgot `super().init(context)`" footgun. + +### `flush_ms` implementation + +For streaming sources, partial batches must flush on inactivity, otherwise a low-traffic stream could hold records indefinitely. + +Implementation uses an internal **bounded** queue + background pump task. The pump captures source errors and re-raises on the consumer side, so failures aren't silently treated as EOS: + +```python +async def produce(self) -> AsyncGenerator[List[Dict[str, Any]], None]: + batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE)) + flush_ms = self.properties.get("flush_ms", self.DEFAULT_FLUSH_MS) + timeout = (flush_ms / 1000) if flush_ms else None + + # maxsize=1 preserves the natural backpressure the old yield-driven model + # had: the pump can be at most one chunk ahead of the consumer. + queue: asyncio.Queue = asyncio.Queue(maxsize=1) + EOS = object() + pump_error: List[BaseException] = [] # captured non-cancellation errors + + async def pump(): + cancelled = False + try: + async for chunk in self.produce_chunks(): + if chunk: + await queue.put(chunk) + except asyncio.CancelledError: + cancelled = True + raise + except BaseException as exc: + pump_error.append(exc) + finally: + # Skip the EOS put on cancellation — the consumer's finally is + # awaiting us and the queue may be full; putting would deadlock. + if not cancelled: + await queue.put(EOS) + + pump_task = asyncio.create_task(pump()) + buffer: List[Dict[str, Any]] = [] + try: + while True: + try: + item = await asyncio.wait_for(queue.get(), timeout=timeout) + except asyncio.TimeoutError: + if buffer: + yield buffer + buffer = [] + continue + + if item is EOS: + if buffer: + yield buffer + if pump_error: + raise pump_error[0] # propagate source error to caller + return + + buffer.extend(item) + while len(buffer) >= batch_size: + yield buffer[:batch_size] + buffer = buffer[batch_size:] + finally: + pump_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await pump_task +``` + +Why a queue and not `asyncio.wait_for(anext(gen), timeout)`: cancelling `__anext__` on an async generator with side effects (open connections, partial reads) can leave it in a broken state. Cancelling the _pump task_ boundary is safe; the generator finishes its current chunk before the pump's `try/finally` runs. + +Why `maxsize=1` and the `cancelled` flag: an unbounded queue removes backpressure — the pump could pre-load an entire parquet or relational table into memory while the consumer is processing batch 1 (flagged by Copilot review). Bounding at 1 keeps memory flat at the cost of a deadlock when the consumer is cancelled mid-flow (the pump's `finally: put(EOS)` blocks against a full queue). The `cancelled` flag skips the EOS put on cancellation, since the consumer is gone and EOS doesn't need to be delivered. + +Why `pump_error`: catching all exceptions in the pump and letting it terminate via EOS would silently truncate input on a source failure (Redis disconnect, broken CSV, DB error) — the consumer would see clean end-of-stream against partial data. Capturing the exception and re-raising on the consumer side makes the job fail loudly instead (also flagged by Copilot review). + +`flush_ms = None` ⇒ `timeout = None` ⇒ `queue.get()` waits forever ⇒ no time-based flush. Bounded sources don't set `flush_ms` and aren't affected. + +### Schema composition (standard JSON Schema) + +Two shared fragments in `core/src/datayoga_core/resources/schemas/` declare the common properties: + +- `batchable.schema.json` declares `batch_size`. +- `streamable.schema.json` declares both `batch_size` and `flush_ms`. + +Each block schema uses standard JSON Schema composition: `allOf` + `$ref` to the fragment file, plus `unevaluatedProperties: false` (rather than `additionalProperties: false`) so the fragment-contributed properties are recognized as evaluated. Example: + +```json +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "std.read", + "type": "object", + "allOf": [{ "$ref": "../../../resources/schemas/batchable.schema.json" }], + "properties": {}, + "unevaluatedProperties": false +} +``` + +At load time, `schema_utils.resolve_refs(schema, schema_path)` walks the schema, finds any local-file `$ref` (relative path, ends in `.json`, no URI scheme, no in-document fragment), and inlines the referenced file's contents in place. The resulting in-memory schema is self-contained — no remaining `$ref`s — so `Block.validate()` keeps using the simple `jsonschema.validate(instance, schema)` code path. The on-disk schemas remain standards-compliant; the resolution is purely a runtime detail to avoid threading a `RefResolver` through every validation site. + +`unevaluatedProperties: false` (introduced in draft 2019-09) is what makes composition + strict property validation work: with `additionalProperties: false`, a property contributed by an `allOf` member would be rejected as "additional" at the parent level. `unevaluatedProperties` is composition-aware. + +External tools that ARE `$ref`-aware (IDE schema validators, OpenAPI exporters) read the on-disk schemas correctly without our resolver. The `jsonschema2mk` docs generator is not `$ref`-aware, so `scripts/generate-docs.sh` pre-resolves `$ref` and flattens `allOf` properties for docs rendering only. + +### Per-producer changes + +**`std/read`** (bounded) + +Replace `process_batch` with a single-chunk yield. Base class slices. + +```python +async def produce_chunks(self): + if select.select([sys.stdin], [], [], 0.0)[0]: + all_records = [r for line in sys.stdin for r in self.get_records(line)] + else: + print("Enter data to process:") + all_records = self.get_records(input()) + if all_records: + yield [self.get_message(r) for r in all_records] +``` + +**`files/read_csv`** (bounded) + +Drops the `islice` loop; yield in `batch_size` chunks. Base class re-emits. + +```python +async def produce_chunks(self): + batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE)) + with open(self.file, "r", encoding=self.encoding) as f: + reader = DictReader(f, fieldnames=self.fields, + delimiter=self.delimiter, quotechar=self.quotechar) + for _ in range(self.skip): + next(reader, None) + counter = iter(count()) + while True: + chunk = [{self.MSG_ID_FIELD: f"{next(counter)}", **r} + for r in islice(reader, batch_size)] + if not chunk: + return + yield chunk +``` + +**`relational/read`** (bounded) + +`batch_size` uses the framework default (1000). `fetch_size` defaults to **10000** to preserve today's driver-roundtrip count as the no-config baseline. Result: strict improvement vs. today (downstream goes from 1-record batches to 1000-record batches; DB roundtrips stay at 10000). + +```python +class Block(DyProducer): + DEFAULT_FETCH_SIZE = 10000 + + async def produce_chunks(self): + fetch_size = int(self.properties.get("fetch_size", self.DEFAULT_FETCH_SIZE)) + result = self.connection.execution_options(stream_results=True).execute(self.tbl.select()) + while True: + rows = result.fetchmany(fetch_size) + if not rows: + return + yield [utils.add_uid(dict(r._asdict())) for r in rows] +``` + +Schema adds optional `fetch_size` with default 10000. + +**`parquet/read`** (bounded) + +Fix one-by-one yield. Each row group becomes one chunk; base class re-emits in `batch_size` slices. + +```python +async def produce_chunks(self): + pf = ParquetFile(self.file) + counter = iter(count()) + for df in pf.iter_row_groups(): + yield [{self.MSG_ID_FIELD: str(next(counter)), **row.to_dict()} + for _, row in df.iterrows()] +``` + +**`redis/read_stream`** (streaming, closes #377) + +Use `count=batch_size` on `xreadgroup`. Yield each batch as a chunk. Class overrides `DEFAULT_FLUSH_MS = 1000`. + +```python +class Block(DyProducer): + DEFAULT_FLUSH_MS = 1000 + +async def produce_chunks(self): + batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE)) + read_pending = True + while True: + streams = self.redis_client.xreadgroup( + self.consumer_group, self.requesting_consumer, + {self.stream: "0" if read_pending else ">"}, + count=batch_size, + block=100 if self.snapshot else 0, # streaming blocks forever; snapshot polls briefly + ) + for stream in streams: + chunk = [] + for key, value in stream[1]: + payload = orjson.loads(value[next(iter(value))]) + payload[self.MSG_ID_FIELD] = key + chunk.append(payload) + if chunk: + yield chunk + if self.snapshot and not read_pending: + return + read_pending = False +``` + +`flush_ms` (default 1000) ensures partial batches flush during low-volume periods. The pump task can sit blocked inside `xreadgroup` indefinitely — that's fine, because the pump and the consumer side of the base-class queue are decoupled. When a single message finally arrives, it lands in the queue immediately and `flush_ms` flushes the partial batch downstream. + +**`azure/read_event_hub`** (streaming, breaking change) + +Existing `batch_size` property → renamed `max_batch_size` (matches SDK semantic, default 300). New `batch_size` (pipeline semantic, default 1000) comes from the streamable fragment. + +```python +class Block(DyProducer): + DEFAULT_FLUSH_MS = 1000 + + def init(self, context=None): + self.max_batch_size = int(self.properties.get("max_batch_size", 300)) + # ... existing client setup ... + self.events = {} + self.messages = asyncio.Queue() + + async def produce_chunks(self): + asyncio.create_task(self.receive_batch()) # uses self.max_batch_size + while True: + msg = await self.messages.get() + chunk = [msg] + while not self.messages.empty(): + chunk.append(self.messages.get_nowait()) + yield chunk +``` + +**Migration:** Users with `batch_size: 300` in YAML thinking it controls SDK callbacks must rename to `max_batch_size: 300`. No backward-compat shim. The literal `batch_size: 300` still validates after the rename but now means pipeline batch size, not SDK callback size — that semantic shift is documented in the PR description. + +The schema for `azure/read_event_hub` also gains `unevaluatedProperties: false` (it had no `additionalProperties` declaration before). Typos like `batch_sz: 300` now fail validation loudly with a clear error. + +**`http/receiver`** (streaming) + +Drain the queue per chunk; `flush_ms` flushes partial batches when traffic is low. + +```python +class Block(DyProducer): + DEFAULT_FLUSH_MS = 1000 + + async def produce_chunks(self): + queue: Queue = Queue(maxsize=1000) + async def handler(request): + try: + queue.put_nowait(orjson.loads(await request.read())) + return HTTPOk() + except Exception: + logger.exception("Got exception while parsing request:") + return HTTPInternalServerError() + runner = ServerRunner(Server(handler)) + await runner.setup() + srv = TCPSite(runner, self.host, self.port) + await srv.start() + try: + counter = iter(count()) + while True: + msg = await queue.get() + chunk = [{self.MSG_ID_FIELD: f"{next(counter)}", **msg}] + while not queue.empty(): + chunk.append({self.MSG_ID_FIELD: f"{next(counter)}", **queue.get_nowait()}) + yield chunk + finally: + with suppress(Exception): + await srv.stop() +``` + +### Defaults summary + +| Producer | `batch_size` | `flush_ms` | Other | +| ---------------------- | ------------ | ---------- | ---------------------------------------------------- | +| `std/read` | 1000 | — | — | +| `files/read_csv` | 1000 | — | — | +| `relational/read` | 1000 | — | optional `fetch_size`, defaults to 10000 | +| `parquet/read` | 1000 | — | — | +| `redis/read_stream` | 1000 | 1000 | — | +| `azure/read_event_hub` | 1000 | 1000 | `max_batch_size` 300 (renamed from old `batch_size`) | +| `http/receiver` | 1000 | 1000 | — | + +## Tests + +**New base-class tests** (`core/src/datayoga_core/tests/test_producer_batching.py`): + +A `FakeProducer` whose `produce_chunks` yields scripted chunks. Cases: + +- One 5000-record chunk + `batch_size=1000` → five batches of 1000. +- Three chunks of [200, 300, 400] + `batch_size=1000` → one batch of 900 on EOS (no empty trailing). +- 1500 records + `batch_size=1000` → batches of [1000, 500]. +- `flush_ms=100` with a producer that sleeps 200ms between chunks → partial batches flush on inactivity. +- `flush_ms=None` holds records indefinitely (asserted with a timeout that the next batch doesn't arrive early). +- Empty chunk yields are ignored (no empty batches emitted). +- Pump-task cleanup: cancelling the consumer cancels the pump cleanly (no warnings, no leaks). + +**Per-producer tests:** + +- `std/read`, `files/read_csv` — existing tests adapted; assert batch counts/sizes match `batch_size`. +- `relational/read` — assert it yields batches (not single rows); assert `fetch_size` controls driver calls independently of `batch_size`. +- `parquet/read` — multi-row-group file; batches honor `batch_size` regardless of row-group boundaries. +- `redis/read_stream` — assert `xreadgroup` called with `count=batch_size`. The `redis_to_relational` integration test (mentioned in #377) provides the end-to-end signal; it depends on the batch-fallback in `relational/write` shipped in commit `7e5b6f7`, which is already in place. +- `azure/read_event_hub` — assert validation rejects legacy `batch_size: 500` with no `max_batch_size`; assert `max_batch_size: 500, batch_size: 100` results in SDK callbacks of 500 and downstream batches of 100. +- `http/receiver` — send N records via webhook; assert they land in batches of `batch_size`, or partial batches after `flush_ms`. + +## Documentation + +- Update `docs/reference/blocks/*_read.md` for each affected producer (`batch_size`, `flush_ms`, `fetch_size`, `max_batch_size` where applicable). +- Add a section in `docs/processing-strategies.md` explaining the producer batching model: chunked subclass output, base-class re-chunking, `flush_ms` for streaming sources. +- PR description carries the breaking-change note (no CHANGELOG file in this repo): + - New `batch_size`/`flush_ms` on previously non-batching producers. + - **Breaking:** `azure/read_event_hub.batch_size` renamed to `max_batch_size`; the name `batch_size` now means pipeline batch size. + +## Risks and trade-offs + +1. **`Producer` ABC change.** `produce_chunks` is the new override hook (raises NotImplementedError by default; not formally `@abstractmethod` so legacy subclasses that still override `produce()` directly continue to validate). All 7 in-tree producers were migrated to override `produce_chunks`; external/downstream subclassers that override `produce()` directly continue to work but bypass the base-class batching. Called out in the PR description. + +2. **Event Hub silent-semantic-change risk.** The breaking rename is intentional. Adding `unevaluatedProperties: false` to the Event Hub schema (which lacked any `additionalProperties` declaration before) catches typos loudly. The literal `batch_size: 300` still validates after the rename but now means pipeline batch size, not SDK callback size — that semantic shift is documented in the PR description and the processing-strategies docs. + +3. **`flush_ms` semantics on Job shutdown.** When the producer is being cancelled (`Job.shutdown` → `Step.stop`), the pump's `try/finally` ensures `EOS` is queued. The `produce()` loop sees `EOS` and flushes the final partial batch. Verified by the `test_producer_batching` shutdown case. + +4. **`relational/read` defaults.** `fetch_size` defaults to 10000 to preserve today's DB roundtrip count. `batch_size` defaults to 1000, matching the framework default. Net effect vs. today: downstream batches grow from 1 to 1000 (huge improvement); DB roundtrips unchanged. Users with memory pressure on large rows can set a smaller `fetch_size` explicitly. Documented in the block's reference page. + +5. **Re-chunking cost.** Lists are sliced with `buffer[:n]` / `buffer[n:]` — O(batch_size) per batch. Negligible relative to per-record block work; no benchmark required. + +## Out of scope + +- Changing the `Result`/payload internal field representation (issue #245). +- Adding new connector blocks (Snowflake #392, Kafka, S3 #351, RabbitMQ #265, Kinesis #264). +- Pulling Prometheus out of core (#336). +- Backpressure / queue sizing changes to the `Step` pipeline. diff --git a/schemas/job.schema.json b/schemas/job.schema.json index 1b2a2533..d23ccc8e 100644 --- a/schemas/job.schema.json +++ b/schemas/job.schema.json @@ -111,13 +111,31 @@ "then": { "properties": { "with": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "allOf": [ + { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "description": "Streaming producer mixin: declares batch_size and flush_ms for producers reading from continuous sources.", + "properties": { + "batch_size": { + "default": 1000, + "description": "Maximum number of records yielded per downstream batch.", + "minimum": 1, + "type": "integer" + }, + "flush_ms": { + "default": 1000, + "description": "If set, flush a partial batch after this many ms of inactivity. null or omitted = wait until batch_size or end-of-stream.", + "minimum": 1, + "type": ["integer", "null"] + } + }, + "title": "streamable", + "type": "object" + } + ], "description": "Read from Azure Event Hub", "properties": { - "batch_size": { - "default": 300, - "description": "The maximum number of events to receive in each batch.", - "type": "integer" - }, "checkpoint_store_connection_string": { "description": "The connection string for the Azure Storage account used as the checkpoint store.", "type": "string" @@ -137,6 +155,12 @@ "event_hub_name": { "description": "The name of the Azure Event Hub.", "type": "string" + }, + "max_batch_size": { + "default": 300, + "description": "Maximum number of events to receive in each SDK callback. Renamed from the previous batch_size which used to mean this. Defaults to 300.", + "minimum": 1, + "type": "integer" } }, "required": [ @@ -147,7 +171,8 @@ "checkpoint_store_container_name" ], "title": "azure.read_event_hub", - "type": "object" + "type": "object", + "unevaluatedProperties": false } } } @@ -252,16 +277,26 @@ "then": { "properties": { "with": { - "additionalProperties": false, + "$schema": "https://json-schema.org/draft/2019-09/schema", + "allOf": [ + { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "description": "Producer batching mixin: declares batch_size for producers that yield records in batches.", + "properties": { + "batch_size": { + "default": 1000, + "description": "Maximum number of records yielded per downstream batch.", + "minimum": 1, + "type": "integer" + } + }, + "title": "batchable", + "type": "object" + } + ], "description": "Read data from CSV", "examples": [{ "delimiter": ";", "file": "archive.csv" }], "properties": { - "batch_size": { - "default": 1000, - "description": "Number of records to read per batch", - "minimum": 1, - "type": "number" - }, "delimiter": { "default": ",", "description": "Delimiter to use for splitting the csv records", @@ -308,7 +343,8 @@ }, "required": ["file"], "title": "files.read_csv", - "type": "object" + "type": "object", + "unevaluatedProperties": false } } } @@ -362,7 +398,29 @@ "then": { "properties": { "with": { - "additionalProperties": false, + "$schema": "https://json-schema.org/draft/2019-09/schema", + "allOf": [ + { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "description": "Streaming producer mixin: declares batch_size and flush_ms for producers reading from continuous sources.", + "properties": { + "batch_size": { + "default": 1000, + "description": "Maximum number of records yielded per downstream batch.", + "minimum": 1, + "type": "integer" + }, + "flush_ms": { + "default": 1000, + "description": "If set, flush a partial batch after this many ms of inactivity. null or omitted = wait until batch_size or end-of-stream.", + "minimum": 1, + "type": ["integer", "null"] + } + }, + "title": "streamable", + "type": "object" + } + ], "description": "Receives HTTP requests and process the data.", "examples": [{ "host": "localhost", "port": 8080 }], "properties": { @@ -378,7 +436,8 @@ } }, "title": "http.receiver", - "type": "object" + "type": "object", + "unevaluatedProperties": false } } } @@ -692,7 +751,23 @@ "then": { "properties": { "with": { - "additionalProperties": false, + "$schema": "https://json-schema.org/draft/2019-09/schema", + "allOf": [ + { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "description": "Producer batching mixin: declares batch_size for producers that yield records in batches.", + "properties": { + "batch_size": { + "default": 1000, + "description": "Maximum number of records yielded per downstream batch.", + "minimum": 1, + "type": "integer" + } + }, + "title": "batchable", + "type": "object" + } + ], "description": "Read data from parquet", "examples": [{ "file": "data.parquet" }], "properties": { @@ -703,7 +778,8 @@ }, "required": ["file"], "title": "parquet.read", - "type": "object" + "type": "object", + "unevaluatedProperties": false } } } @@ -822,7 +898,29 @@ "then": { "properties": { "with": { - "additionalProperties": false, + "$schema": "https://json-schema.org/draft/2019-09/schema", + "allOf": [ + { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "description": "Streaming producer mixin: declares batch_size and flush_ms for producers reading from continuous sources.", + "properties": { + "batch_size": { + "default": 1000, + "description": "Maximum number of records yielded per downstream batch.", + "minimum": 1, + "type": "integer" + }, + "flush_ms": { + "default": 1000, + "description": "If set, flush a partial batch after this many ms of inactivity. null or omitted = wait until batch_size or end-of-stream.", + "minimum": 1, + "type": ["integer", "null"] + } + }, + "title": "streamable", + "type": "object" + } + ], "description": "Read from Redis stream", "properties": { "connection": { @@ -843,7 +941,8 @@ }, "required": ["connection", "stream_name"], "title": "redis.read_stream", - "type": "object" + "type": "object", + "unevaluatedProperties": false } } } @@ -1008,7 +1107,23 @@ "then": { "properties": { "with": { - "additionalProperties": false, + "$schema": "https://json-schema.org/draft/2019-09/schema", + "allOf": [ + { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "description": "Producer batching mixin: declares batch_size for producers that yield records in batches.", + "properties": { + "batch_size": { + "default": 1000, + "description": "Maximum number of records yielded per downstream batch.", + "minimum": 1, + "type": "integer" + } + }, + "title": "batchable", + "type": "object" + } + ], "description": "Read a table from an SQL-compatible data store", "examples": [ { @@ -1037,6 +1152,12 @@ "title": "The connection to use for loading", "type": "string" }, + "fetch_size": { + "default": 10000, + "description": "Driver-level rows fetched per round-trip. Defaults to 10000.", + "minimum": 1, + "type": "integer" + }, "schema": { "description": "If left blank, the default schema of this connection will be used as defined in the connections.dy.yaml", "examples": ["dbo"], @@ -1052,7 +1173,8 @@ }, "required": ["connection", "table"], "title": "relational.read", - "type": "object" + "type": "object", + "unevaluatedProperties": false } } } @@ -1370,16 +1492,28 @@ "then": { "properties": { "with": { - "description": "Read from the standard input", - "properties": { - "batch_size": { - "default": 1000, - "description": "Number of records to process in a single batch", - "type": "integer" + "$schema": "https://json-schema.org/draft/2019-09/schema", + "allOf": [ + { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "description": "Producer batching mixin: declares batch_size for producers that yield records in batches.", + "properties": { + "batch_size": { + "default": 1000, + "description": "Maximum number of records yielded per downstream batch.", + "minimum": 1, + "type": "integer" + } + }, + "title": "batchable", + "type": "object" } - }, + ], + "description": "Read from the standard input", + "properties": {}, "title": "std.read", - "type": "object" + "type": "object", + "unevaluatedProperties": false } } } diff --git a/scripts/generate-docs.sh b/scripts/generate-docs.sh index 631bd1ed..4d58f199 100755 --- a/scripts/generate-docs.sh +++ b/scripts/generate-docs.sh @@ -37,6 +37,15 @@ done rm -rf ./docs/reference/blocks mkdir ./docs/reference/blocks +# Track temp files so we can clean them up on exit. +RESOLVED_TMP_FILES=() +cleanup_resolved_tmps() { + for tmp in "${RESOLVED_TMP_FILES[@]}"; do + [ -f "${tmp}" ] && rm -f "${tmp}" + done +} +trap cleanup_resolved_tmps EXIT + blocks_dir="./core/src/datayoga_core/blocks" for schema in $(find ${blocks_dir} -name '*.schema.json' | sort) do @@ -46,7 +55,67 @@ do block_package="$(echo ${block_package} | cut -c2- | sed 's/\//_/g')" [ ! -z "${block_package}" ] && block_package="${block_package}_" - npx jsonschema2mk --schema ${schema} --extension yaml-examples \ + # Materialize a docs-friendly copy of the schema: + # 1. Resolve local-file $ref nodes by inlining the referenced JSON. + # 2. Flatten allOf-contributed properties into the top-level `properties` + # so jsonschema2mk renders a single property table per block. + # Self-contained Python (stdlib only) so this works in CI without installing + # datayoga_core's runtime dependencies. Pre-resolve at doc-gen time only; + # the on-disk schemas remain standard JSON Schema. + resolved_tmp="$(mktemp --suffix=.schema.json)" + RESOLVED_TMP_FILES+=("${resolved_tmp}") + python3 - "${schema}" > "${resolved_tmp}" <<'PYEOF' +import json +import os +import sys + + +def resolve_node(node, base_dir, visited): + if isinstance(node, dict): + ref = node.get("$ref") + if isinstance(ref, str) and not ref.startswith("#") and "://" not in ref and ref.endswith(".json"): + target = os.path.normpath(os.path.join(base_dir, ref)) + if target in visited: + raise SystemExit(f"Circular $ref at {target}") + if not os.path.isfile(target): + raise SystemExit(f"$ref target not found: {ref} -> {target}") + with open(target) as f: + fragment = json.load(f) + visited.add(target) + try: + return resolve_node(fragment, os.path.dirname(target), visited) + finally: + visited.discard(target) + return {k: resolve_node(v, base_dir, visited) for k, v in node.items()} + if isinstance(node, list): + return [resolve_node(item, base_dir, visited) for item in node] + return node + + +def flatten_allof_properties(schema): + """Inline `allOf[*].properties` into the top-level `properties`, removing + the allOf. Docs-only transformation so jsonschema2mk renders one table.""" + if not isinstance(schema, dict) or "allOf" not in schema: + return schema + merged = {} + for member in schema.get("allOf", []): + if isinstance(member, dict): + merged.update(member.get("properties", {})) + merged.update(schema.get("properties", {})) + schema["properties"] = merged + schema.pop("allOf", None) + return schema + + +schema_path = sys.argv[1] +with open(schema_path) as f: + schema = json.load(f) +schema = resolve_node(schema, os.path.dirname(os.path.abspath(schema_path)), set()) +schema = flatten_allof_properties(schema) +json.dump(schema, sys.stdout) +PYEOF + + npx jsonschema2mk --schema "${resolved_tmp}" --extension yaml-examples \ --extension front-matter --fm.parent "Blocks" --fm.grand_parent "Reference" > \ "./docs/reference/blocks/${block_package}${doc_name}" done