From dc8bebf6b488e22fe9d9b4b8c09a7ca509e7d835 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Thu, 18 Jun 2026 12:34:15 +0100 Subject: [PATCH] refactor: remove dead branches in async _pump_iterator and drop put_terminal - Collapse the two isinstance(q, AsyncQueueBranch): await q.put(item) else: await q.put(item) blocks in async _pump_iterator (both arms were byte-for-byte identical) into a single await q.put(item), and unify the materialize_before_enqueue / plain paths under one async for loop. - Remove AsyncQueueBranch.put_terminal: it was functionally identical to put (same active-gated put_nowait loop), so collapse callers in _pump_iterator to use put for terminal markers too. - Add pytest-timeout dev dependency and a per-test timeout=10s in pyproject.toml so a hung streaming test fails loudly instead of stalling the suite. The sync SyncFanout._put_terminal busy-wait + get_nowait() drop is intentionally left untouched here: removing the drop without a full shutdown redesign of SyncFanout (terminal consumer that never drains its SyncQueueIterator deadlocks the pump) turns it into a hard hang. That belongs in its own PR. --- pyproject.toml | 3 ++ synaflow/execution/async_engine/executor.py | 34 +++---------------- .../execution/async_engine/iterator_utils.py | 10 ------ uv.lock | 16 ++++++++- 4 files changed, 23 insertions(+), 40 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9657a2f..2bf9b7b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,8 @@ testpaths = ["tests"] asyncio_mode = "auto" addopts = "--import-mode=importlib" pythonpath = ["."] +timeout = 10 +timeout_method = "thread" [tool.hatch.build.targets.wheel] packages = ["synaflow"] @@ -83,5 +85,6 @@ dev = [ "mkdocs-material>=9.5", "pytest-asyncio>=1.4.0", "pytest-cov>=4.0", + "pytest-timeout>=2.4.0", "ruff>=0.15.17", ] diff --git a/synaflow/execution/async_engine/executor.py b/synaflow/execution/async_engine/executor.py index 9b449f9..8c8d67f 100644 --- a/synaflow/execution/async_engine/executor.py +++ b/synaflow/execution/async_engine/executor.py @@ -148,43 +148,20 @@ async def _pump_iterator( queues: dict[str, Any], on_error: Any, dag: Dag | None = None, - materialize_before_enqueue: bool = False, - consumer_type: Any = None, ) -> None: try: - safe = _safe_iterate(name, iterator) - if materialize_before_enqueue: - items, _, _ = await _apply_materializer( - dag, name, safe, consumer_type=consumer_type - ) - async for item in _safe_iterate(name, items): - for q in queues.values(): - if isinstance(q, AsyncQueueBranch): - await q.put(item) - else: - await q.put(item) - else: - async for item in safe: - for q in queues.values(): - if isinstance(q, AsyncQueueBranch): - await q.put(item) - else: - await q.put(item) + async for item in _safe_iterate(name, iterator): + for q in queues.values(): + await q.put(item) except StepExecutionError as e: await _handle_error(dag, name, e.__cause__ or e) if on_error == OnError.STOP: for q in queues.values(): - if isinstance(q, AsyncQueueBranch): - await q.put_terminal(PipelineStopException(step_name=name)) - else: - await q.put(PipelineStopException(step_name=name)) + await q.put(PipelineStopException(step_name=name)) raise PipelineStopException(step_name=name) from e finally: for q in queues.values(): - if isinstance(q, AsyncQueueBranch): - await q.put_terminal(EOF_MARKER) - else: - await q.put(EOF_MARKER) + await q.put(EOF_MARKER) async def _pump_observer(name: str, queue: asyncio.Queue, observer: Any) -> None: @@ -737,7 +714,6 @@ async def _publish_stream_to_queues( queues, node.on_error, dag=self.dag, - materialize_before_enqueue=False, ) ) self._pump_tasks.append(task) diff --git a/synaflow/execution/async_engine/iterator_utils.py b/synaflow/execution/async_engine/iterator_utils.py index 84db470..4aa406a 100644 --- a/synaflow/execution/async_engine/iterator_utils.py +++ b/synaflow/execution/async_engine/iterator_utils.py @@ -31,16 +31,6 @@ async def put(self, item) -> None: except asyncio.QueueFull: await asyncio.sleep(0.001) - async def put_terminal(self, item) -> None: - if not self.active: - return - while self.active: - try: - self.queue.put_nowait(item) - return - except asyncio.QueueFull: - await asyncio.sleep(0.001) - async def get(self): return await self.queue.get() diff --git a/uv.lock b/uv.lock index 4d6ad45..6c6ad66 100644 --- a/uv.lock +++ b/uv.lock @@ -647,6 +647,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9d/7a/d968e294073affff457b041c2be9868a40c1c71f4a35fcc1e45e5493067b/pytest_cov-7.1.0-py3-none-any.whl", hash = "sha256:a0461110b7865f9a271aa1b51e516c9a95de9d696734a2f71e3e78f46e1d4678", size = 22876 }, ] +[[package]] +name = "pytest-timeout" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ac/82/4c9ecabab13363e72d880f2fb504c5f750433b2b6f16e99f4ec21ada284c/pytest_timeout-2.4.0.tar.gz", hash = "sha256:7e68e90b01f9eff71332b25001f85c75495fc4e3a836701876183c4bcfd0540a", size = 17973 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/b6/3127540ecdf1464a00e5a01ee60a1b09175f6913f0644ac748494d9c4b21/pytest_timeout-2.4.0-py3-none-any.whl", hash = "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2", size = 14382 }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -786,7 +798,7 @@ wheels = [ [[package]] name = "synaflow" -version = "0.15.0" +version = "0.16.0" source = { editable = "." } dependencies = [ { name = "inflect" }, @@ -797,6 +809,7 @@ dev = [ { name = "mkdocs-material" }, { name = "pytest-asyncio" }, { name = "pytest-cov" }, + { name = "pytest-timeout" }, { name = "ruff" }, ] @@ -808,6 +821,7 @@ dev = [ { name = "mkdocs-material", specifier = ">=9.5" }, { name = "pytest-asyncio", specifier = ">=1.4.0" }, { name = "pytest-cov", specifier = ">=4.0" }, + { name = "pytest-timeout", specifier = ">=2.4.0" }, { name = "ruff", specifier = ">=0.15.17" }, ]