refactor: remove dead branches in async _pump_iterator and drop put_terminal#34
Merged
Merged
Conversation
2cd726d to
bf0b602
Compare
…erminal - Collapse the two isinstance(q, AsyncQueueBranch): await q.put(item) else: await q.put(item) blocks in async _pump_iterator (both arms were byte-for-byte identical) into a single await q.put(item), and unify the materialize_before_enqueue / plain paths under one async for loop. - Remove AsyncQueueBranch.put_terminal: it was functionally identical to put (same active-gated put_nowait loop), so collapse callers in _pump_iterator to use put for terminal markers too. - Add pytest-timeout dev dependency and a per-test timeout=10s in pyproject.toml so a hung streaming test fails loudly instead of stalling the suite. The sync SyncFanout._put_terminal busy-wait + get_nowait() drop is intentionally left untouched here: removing the drop without a full shutdown redesign of SyncFanout (terminal consumer that never drains its SyncQueueIterator deadlocks the pump) turns it into a hard hang. That belongs in its own PR.
bf0b602 to
dc8bebf
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Cleanup follow-up to #33 (max_in_flight).
async
_pump_iteratordead branches + dead parametersTwo
isinstance(q, AsyncQueueBranch): await q.put(item) else: await q.put(item)checks for item enqueue where both arms were byte-for-byte identical — leftover from an incomplete refactor. Collapsed them into a singleawait q.put(item).The
materialize_before_enqueueandconsumer_typeparameters were also dead: every caller passedmaterialize_before_enqueue=False, so theTruebranch (which called_apply_materializerbefore re-iterating) was never executed. Removed both parameters and the dead branch, simplifying the pump to a singleasync forloop.AsyncQueueBranch.put_terminalremovalput_terminalwas functionally identical toput(sameactive-gatedput_nowaitretry loop), so it is removed and theexcept/finallyblocks in_pump_iteratornow callputfor terminal markers too.AsyncQueueBranchkeepsclose()/activefor backpressure / cancellation.Per-test timeout
Added
pytest-timeoutas a dev dependency andtimeout = 10(thread method) in[tool.pytest.ini_options]so a hung streaming test fails loudly instead of stalling the whole suite. Baseline run is ~2.2s, so 10s gives ample headroom.Scope intentionally left out — needs its own PR
SyncFanout._put_terminal(sync engine) keeps its busy-wait +get_nowait()drop on a full queue. Removing the drop without a full shutdown redesign ofSyncFanoutturns it into a hard hang: a terminal consumer whose output stays lazy never drains itsSyncQueueIterator, the bounded queue fills up, and the pump blocks forever trying to deliver the EOF marker. The drop is the current workaround for that design gap and touching it belongs in a dedicated PR (shutdown semantics ofSyncFanout, non-enqueueingabort, drain-on-cleanup).Verification
uv run ruff format/ruff check --select F401: cleanuv run pytest tests/: 445 passed in 2.21s (baseline intact, timeout enabled)on_error == OnError.STOPbranch inside the async pump, which is unreachable in the current engine: STOP streams are eagerly materialized in_publish_outputbefore the pump is ever used.