Skip to content

Improve SuspendObjectStream concurrency and expand test coverage#11

Merged
JohnRichard4096 merged 6 commits into
mainfrom
feat
Jun 19, 2026
Merged

Improve SuspendObjectStream concurrency and expand test coverage#11
JohnRichard4096 merged 6 commits into
mainfrom
feat

Conversation

@JohnRichard4096

@JohnRichard4096 JohnRichard4096 commented Jun 19, 2026

Copy link
Copy Markdown
Member

Summary by Sourcery

改进 SuspendObjectStream 的并发语义并扩展测试覆盖率。

Bug 修复:

  • 使用共享状态锁来确保 suspend/resume 协调在多线程环境下是线程安全的,并支持在每个 suspend 周期中允许多个等待者。
  • 防止与 suspend 信号、resume 信号和队列结束状态(queue-done state)相关的竞态条件,以及避免重复关闭或错误复用的问题。
  • 使队列关闭检查与回调设置在并发访问场景下更加健壮,并防止对回调的静默误用。

功能增强:

  • 简化队列发送辅助函数(queue send helper)的使用方式,并使响应生成器的关闭过程在遇到关闭错误时更加稳健。

构建:

  • 将项目版本提升至 0.3.2,并在配置中将支持的 Python 版本范围扩展到 <3.16。

CI:

  • 在 GitHub Actions 工作流中针对 Python 3.15 运行 CI。

测试:

  • 新增全面的异步测试,覆盖 suspend 装饰器行为、多等待者恢复(multi-waiter resume)、suspend 等待边界、队列操作、回调配置、生成器生命周期以及基于 yield 的 suspend 标签处理等场景。
Original summary in English

Summary by Sourcery

Improve SuspendObjectStream concurrency semantics and expand coverage.

Bug Fixes:

  • Ensure suspend/resume coordination is thread-safe using a shared state lock and support multiple waiters per suspend cycle.
  • Prevent race conditions and double-closing or reuse issues around suspend signals, resume signals, and queue-done state.
  • Make queue closed checks and callbacks setting robust against concurrent access and avoid silent misuse of callbacks.

Enhancements:

  • Simplify queue send helper usage and make response generator shutdown more resilient to close errors.

Build:

  • Bump project version to 0.3.2 and extend supported Python range to <3.16 in config.

CI:

  • Run CI against Python 3.15 in the GitHub Actions workflow.

Tests:

  • Add comprehensive async tests covering suspend decorator behavior, multi-waiter resume, suspend waiting boundaries, queue operations, callback configuration, generator lifecycle, and suspend-on-yield tag handling.

@JohnRichard4096

Copy link
Copy Markdown
Member Author

@sourcery-ai title

@sourcery-ai

sourcery-ai Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

审阅者指南

重构 SuspendObjectStream 的并发和队列处理逻辑,使其基于 state-lock 驱动并支持多等待者安全;围绕挂起、队列、回调和生成器生命周期添加全面测试,并更新项目元数据以支持 Python 3.15 和新的补丁版本发布。

SuspendObjectStream 中多等待者挂起与恢复的时序图

sequenceDiagram
    actor SuspendCaller
    participant SuspendObjectStream
    actor Worker1
    actor Worker2

    SuspendCaller->>SuspendObjectStream: wait_to_suspend(tags, timeout)
    SuspendObjectStream->>SuspendObjectStream: acquire _state_lock
    SuspendObjectStream->>SuspendObjectStream: set _suspend_tags, __suspend_signal
    SuspendObjectStream-->>SuspendCaller: await __suspend_signal

    Worker1->>SuspendObjectStream: _wait_for_continue(tag)
    SuspendObjectStream->>SuspendObjectStream: acquire _state_lock
    alt first waiter
        SuspendObjectStream->>SuspendObjectStream: set_result(__suspend_signal)
        SuspendObjectStream->>SuspendObjectStream: create __resume_signal
        SuspendObjectStream-->>Worker1: await shared_fut
    end

    Worker2->>SuspendObjectStream: _wait_for_continue(tag)
    SuspendObjectStream->>SuspendObjectStream: acquire _state_lock
    alt additional waiter
        SuspendObjectStream->>SuspendObjectStream: add_done_callback(shared_fut)
        SuspendObjectStream-->>Worker2: await fut
    end

    SuspendCaller->>SuspendObjectStream: resume()
    SuspendObjectStream->>SuspendObjectStream: acquire _state_lock
    SuspendObjectStream->>SuspendObjectStream: set_result(__resume_signal)

    Worker1-->>SuspendObjectStream: fut completed
    Worker2-->>SuspendObjectStream: fut completed
    SuspendObjectStream->>SuspendObjectStream: clear __resume_signal under _state_lock
Loading

文件级变更

Change Details Files
引入共享状态锁,并重构挂起/恢复及队列状态管理,使之在并发场景下安全且感知多等待者。
  • 添加基于 aiologic 的 _state_lock 属性,在 init 中初始化,并用它来保护共享状态的修改。
  • 重写 _wait_for_continue,使其支持在共享的 __resume_signal 上进行多协程并发等待,支持标签过滤并进行正确清理。
  • 将 wait_to_suspend 的逻辑用 _state_lock 包裹起来,包括挂起信号的生命周期和标签管理,并确保在各挂起周期之间安全复用。
  • 修改 resume(),在解析 __resume_signal 前先在状态锁上同步,当不存在恢复 future 时,使其安全地成为 no-op。
src/amrita_sense/streaming.py
使队列完成和生成器生命周期在并发与重复调用下更加健壮,并简化队列写入逻辑。
  • 在 set_queue_done 中使用 _state_lock 保护 _queue_done 标志和队列完成逻辑,对重复调用进行短路,并优雅处理 BrokenResourceError。
  • 移除多余的 _push_to_queue 辅助函数,将所有队列写入统一通过带超时语义的 _put_to_queue。
  • 在 push_object 和 yield_response 中使用 _state_lock 检查 _queue_done 并快照回调状态,仅在队列已明确关闭且不存在回调时抛出 RuntimeError。
  • 通过 _state_lock 确保 get_response_generator 与回调消费互斥,并以原子方式标记 _has_consumer。
  • 更新 _response_generator 的拆卸逻辑,在锁下设置 _queue_done,并用 asyncio.gather 并发关闭发送/接收流,忽略关闭异常。
src/amrita_sense/streaming.py
添加广泛测试,覆盖挂起装饰器行为、多等待者恢复、挂起等待边界、队列操作、回调、生成器生命周期以及 SUSPEND_ON_YIELD 标记。
  • 新增异步辅助工具 _consume_all 和 _async_gen,用于消费 SuspendObjectStream 的响应并构建异步生成器。
  • 添加测试,覆盖挂起装饰器通过位置/关键字参数发现 SuspendObjectStream、错误用在同步函数上,以及缺失流参数等情况。
  • 添加测试,验证多个 _wait_for_continue 调用方可由单次 resume() 恢复,wait_to_suspend 强制单等待者语义,以及挂起周期可复用。
  • 添加测试,覆盖队列初始为关闭状态、set_queue_done 的幂等性、正常的 push_object/yield_response 流程、在已关闭队列上推送/产出时报错,以及基于回调的产出绕过队列的情况。
  • 添加测试,覆盖回调 setter 方法强制单次赋值语义、生成器与回调的互斥性、完整的 push/yield/done/close 流程、yield_response_iteration 行为、done 标记终止,以及基于 SUSPEND_ON_YIELD 的挂起。
tests/test_object_stream.py
更新项目配置以发布新补丁版本,并在 CI 中扩展 Python 兼容性。
  • 在 pyproject.toml 中将项目版本从 0.3.1.post1 提升到 0.3.2,并将支持的 Python 范围扩展至 <3.16。
  • 扩展 GitHub Actions CI matrix,加入 Python 3.15。
  • 重新生成 uv.lock,以反映依赖解析的变更(细节未在 diff 中展示)。
pyproject.toml
.github/workflows/CI.yml
uv.lock

提示与命令

与 Sourcery 交互

  • 触发新评审: 在 pull request 中评论 @sourcery-ai review
  • 继续讨论: 直接回复 Sourcery 的评审评论。
  • 从评审评论生成 GitHub issue: 通过回复评审评论的方式,要求 Sourcery 从该评论创建 issue。你也可以直接回复评审评论 @sourcery-ai issue 来从该评论创建 issue。
  • 生成 pull request 标题: 在 pull request 标题的任意位置写上 @sourcery-ai,即可随时生成标题。你也可以在 pull request 中评论 @sourcery-ai title 以(重新)生成标题。
  • 生成 pull request 总结: 在 pull request 正文的任意位置写上 @sourcery-ai summary,即可在你想要的位置生成 PR 总结。你也可以在 pull request 中评论 @sourcery-ai summary 以(重新)生成总结。
  • 生成审阅者指南: 在 pull request 中评论 @sourcery-ai guide,即可随时(重新)生成审阅者指南。
  • 解决所有 Sourcery 评论: 在 pull request 中评论 @sourcery-ai resolve,以解决所有 Sourcery 评论。如果你已经处理完所有评论且不再希望看到它们,这会很有用。
  • 忽略所有 Sourcery 评审: 在 pull request 中评论 @sourcery-ai dismiss,以忽略所有现有的 Sourcery 评审。如果你想从头开始新的评审,这尤其有用 —— 别忘了再评论一次 @sourcery-ai review 触发新评审!

自定义你的使用体验

访问你的控制面板 以:

  • 启用或禁用评审功能,例如 Sourcery 自动生成的 pull request 总结、审阅者指南等。
  • 更改评审语言。
  • 添加、移除或编辑自定义评审说明。
  • 调整其他评审设置。

获取帮助

Original review guide in English

Reviewer's Guide

Refactors SuspendObjectStream’s concurrency and queue-handling logic to be state-lock–driven and multi-waiter safe, adds comprehensive tests around suspension, queue, callbacks, and generator lifecycle, and updates project metadata for Python 3.15 support and a new patch release.

Sequence diagram for multi-waiter suspend and resume in SuspendObjectStream

sequenceDiagram
    actor SuspendCaller
    participant SuspendObjectStream
    actor Worker1
    actor Worker2

    SuspendCaller->>SuspendObjectStream: wait_to_suspend(tags, timeout)
    SuspendObjectStream->>SuspendObjectStream: acquire _state_lock
    SuspendObjectStream->>SuspendObjectStream: set _suspend_tags, __suspend_signal
    SuspendObjectStream-->>SuspendCaller: await __suspend_signal

    Worker1->>SuspendObjectStream: _wait_for_continue(tag)
    SuspendObjectStream->>SuspendObjectStream: acquire _state_lock
    alt first waiter
        SuspendObjectStream->>SuspendObjectStream: set_result(__suspend_signal)
        SuspendObjectStream->>SuspendObjectStream: create __resume_signal
        SuspendObjectStream-->>Worker1: await shared_fut
    end

    Worker2->>SuspendObjectStream: _wait_for_continue(tag)
    SuspendObjectStream->>SuspendObjectStream: acquire _state_lock
    alt additional waiter
        SuspendObjectStream->>SuspendObjectStream: add_done_callback(shared_fut)
        SuspendObjectStream-->>Worker2: await fut
    end

    SuspendCaller->>SuspendObjectStream: resume()
    SuspendObjectStream->>SuspendObjectStream: acquire _state_lock
    SuspendObjectStream->>SuspendObjectStream: set_result(__resume_signal)

    Worker1-->>SuspendObjectStream: fut completed
    Worker2-->>SuspendObjectStream: fut completed
    SuspendObjectStream->>SuspendObjectStream: clear __resume_signal under _state_lock
Loading

File-Level Changes

Change Details Files
Introduce a shared state lock and refactor suspend/resume and queue state management to be concurrency-safe and multi-waiter aware.
  • Add an aiologic-based _state_lock attribute initialized in init and use it to guard shared state mutations.
  • Rewrite _wait_for_continue to support multiple concurrent waiters on a shared __resume_signal with tag filtering and proper cleanup.
  • Wrap wait_to_suspend logic with _state_lock, including suspend-signal lifecycle and tag management, and ensure safe reuse between cycles.
  • Change resume() to synchronize on the state lock before resolving __resume_signal, making it safe as a no-op when no resume future exists.
src/amrita_sense/streaming.py
Make queue completion and generator lifecycle robust under concurrency and repeated calls, and simplify queue put logic.
  • Guard _queue_done flag and queue completion in set_queue_done with _state_lock, short-circuiting repeated calls and gracefully handling BrokenResourceError.
  • Remove the redundant _push_to_queue helper and route all queue writes through _put_to_queue with timeout semantics.
  • Use _state_lock in push_object and yield_response to check _queue_done and snapshot callback state, raising RuntimeError only when the queue is definitively closed without a callback.
  • Ensure get_response_generator is mutually exclusive with callback consumption via _state_lock and mark _has_consumer atomically.
  • Update _response_generator teardown to set _queue_done under lock and close send/receive streams concurrently with asyncio.gather, swallowing close exceptions.
src/amrita_sense/streaming.py
Add extensive tests covering suspend decorator behavior, multi-waiter resume, suspend waiting boundaries, queue operations, callbacks, generator lifecycle, and SUSPEND_ON_YIELD tagging.
  • Introduce async helper utilities _consume_all and _async_gen for consuming SuspendObjectStream responses and building async generators.
  • Add tests for suspend decorator discovery of SuspendObjectStream via positional/keyword args, misuse on sync functions, and missing stream argument.
  • Add tests validating that multiple _wait_for_continue callers are resumed by a single resume(), that wait_to_suspend enforces single waiter semantics, and that suspend cycles can be reused.
  • Add tests covering queue_closed initial state, idempotent set_queue_done, normal push_object/yield_response flows, erroring when pushing/yielding on closed queues, and callback-based yield bypassing the queue.
  • Add tests for callback setter methods enforcing single-assignment semantics, generator exclusivity with callbacks, full push/yield/done/close flows, yield_response_iteration behavior, done-marker termination, and SUSPEND_ON_YIELD-based suspension.
tests/test_object_stream.py
Update project configuration for a new patch release and expanded Python compatibility in CI.
  • Bump project version from 0.3.1.post1 to 0.3.2 and extend supported Python range to <3.16 in pyproject.toml.
  • Extend GitHub Actions CI matrix to include Python 3.15.
  • Regenerate uv.lock to reflect dependency resolution changes (details not shown in diff).
pyproject.toml
.github/workflows/CI.yml
uv.lock

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@sourcery-ai sourcery-ai Bot changed the title Feat Improve SuspendObjectStream concurrency and expand test coverage Jun 19, 2026

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 3 个问题,并给出了一些高层次的反馈:

  • 新的 _state_lock 是一个异步的 aiologic.Lock,但在 resumeset_callback_funcset_callback_fun_sendingget_response_generator 这些同步方法里用的是普通的 with,这会在运行时出错;要么把这些方法改成 async 并配合异步锁使用,要么在这些方法里避免使用异步锁。
  • set_queue_done 中,_queue_done 会在 _put_to_queue 运行之前就被设置为 True;如果 _put_to_queue 因为非 BrokenResourceError 的原因失败,队列会被标记为完成,但结束标记并没有被放入队列——建议只在发送成功后再将 _queue_done 置为 True,或者显式处理非 BrokenResourceError 的异常。
  • 新增的 _wait_for_continue / wait_to_suspend 同步逻辑更复杂,使用了共享的 future 加 _state_lock;建议再仔细检查一下潜在的竞态条件(例如:wait_to_suspend 在某个等待者获取锁的同时清空 __suspend_signal),并在这些边界情况附近加上简短注释,说明预期的执行顺序。
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new `_state_lock` is an async `aiologic.Lock`, but it's being used with plain `with` in sync methods like `resume`, `set_callback_func`, `set_callback_fun_sending`, and `get_response_generator`, which will fail at runtime; either make these methods async or avoid using the async lock in them.
- In `set_queue_done`, `_queue_done` is set to `True` before `_put_to_queue` runs; if `_put_to_queue` fails for reasons other than `BrokenResourceError`, the queue will be marked done without the done marker being enqueued—consider only flipping `_queue_done` after a successful send or handling non-`BrokenResourceError` exceptions explicitly.
- The new `_wait_for_continue` / `wait_to_suspend` synchronization logic is more complex and uses shared futures plus `_state_lock`; it would be helpful to double-check for potential races (e.g., `wait_to_suspend` clearing `__suspend_signal` while a waiter is acquiring the lock) and possibly add brief comments around those edge cases to explain the intended ordering.

## Individual Comments

### Comment 1
<location path="src/amrita_sense/streaming.py" line_range="121-130" />
<code_context>
+                shared_fut = self.__resume_signal
+                is_first = True
+
+            await asyncio.sleep(0)
+
+            if is_first:
+                fut = shared_fut
+            else:
</code_context>
<issue_to_address>
**issue (bug_risk):** Cancellation before `fut` initialization leads to `UnboundLocalError` in `finally`.

If the task is cancelled during `await asyncio.sleep(0)`, control goes to `finally` before `fut` is assigned in either branch, so referencing `fut` there raises `UnboundLocalError` instead of doing the intended cleanup. You can prevent this by initializing `fut = None` before the `async with` and guarding the cleanup (e.g. only resetting `__resume_signal` if `is_first` and `fut is not None`), or by moving `await asyncio.sleep(0)` to a point after `fut` is always assigned.
</issue_to_address>

### Comment 2
<location path="src/amrita_sense/streaming.py" line_range="183-159" />
<code_context>
         """Resume to run when suspend."""
-        if self.__resume_signal and not self.__resume_signal.done():
-            self.__resume_signal.set_result(True)
+        with self._state_lock:
+            if self.__resume_signal and not self.__resume_signal.done():
+                self.__resume_signal.set_result(True)

</code_context>
<issue_to_address>
**issue (bug_risk):** `resume` uses an async lock with a synchronous `with`, which will fail at runtime.

`_state_lock` is an `aiologic.Lock`, which must be used with `async with`, not a synchronous `with` block. In its current form, `resume()` will raise a `TypeError` on first use. Update this to either make `resume` async and use `async with self._state_lock`, or change `_state_lock` to a synchronous lock type and use it consistently across the class.
</issue_to_address>

### Comment 3
<location path="tests/test_object_stream.py" line_range="26" />
<code_context>
+        yield item


 @pytest.mark.asyncio
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for `wait_to_suspend` timeout behavior to ensure state is correctly reset after a timeout.

The new `wait_to_suspend` uses `_state_lock` and resets `__suspend_signal` / `_suspend_tags` in a `finally` block, and we already cover the "already waiting" and full suspend/resume cases. What’s missing is a test where `wait_to_suspend(..., timeout=...)` actually times out. Please add a test that:

1. Calls `await obj.wait_to_suspend("tag", timeout=small_value)` and asserts `asyncio.TimeoutError`.
2. Verifies a subsequent `wait_to_suspend("tag", timeout=...)` still works, confirming the timeout didn’t leave `__suspend_signal` / `_suspend_tags` in a bad state.

This would help catch any regressions where a timeout leaves the object stuck in an "already waiting" or inconsistent state.

Suggested implementation:

```python
# A. suspend decorator (4 tests)
# ============================================================================
=======
async def _async_gen(items: list):
    """Create an async generator from a list."""
    for item in items:
        yield item


@pytest.mark.asyncio
        assert suspend, "Suspend not called"
    finally:
        hd.cancel()


# ============================================================================
# A. suspend decorator (4 tests)
# ============================================================================


@pytest.mark.asyncio
async def test_wait_to_suspend_timeout_resets_state(suspendable_obj):
    """
    wait_to_suspend(...) should reset internal state when it times out so that a
    subsequent wait_to_suspend on the same tag still works.
    """
    # 1) First call: let it time out and ensure we get asyncio.TimeoutError
    with pytest.raises(asyncio.TimeoutError):
        await suspendable_obj.wait_to_suspend("timeout-tag", timeout=0.01)

    # 2) Second call: verify state was reset and we can wait again successfully
    async def trigger_suspend():
        # Give wait_to_suspend a chance to start before triggering suspend
        await asyncio.sleep(0.01)
        await suspendable_obj.suspend("timeout-tag")

    task = asyncio.create_task(trigger_suspend())
    try:
        suspend = await suspendable_obj.wait_to_suspend("timeout-tag", timeout=0.5)
        assert suspend, "Suspend not called after timeout reset"
    finally:
        task.cancel()

```

To wire this test into your existing codebase you will likely need to:

1. Replace the `suspendable_obj` fixture parameter with whatever fixture/object you already use to exercise `wait_to_suspend` in your existing tests (e.g. `obj`, `handler`, `stream`, etc.).
2. Adjust the `suspendable_obj.suspend("timeout-tag")` call if your API uses a different method name or signature to trigger suspension.
3. Ensure `asyncio` and `pytest` are imported at the top of `tests/test_object_stream.py` (they almost certainly already are given other async tests in this file).
4. Optionally tune the timeout values (`0.01` / `0.5`) if your CI environments are slow; e.g. use `0.05` / `1.0` while still ensuring the first call reliably times out and the second reliably completes.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
帮我变得更有用!请对每条评论点 👍 或 👎,我会根据这些反馈改进之后的评审。
Original comment in English

Hey - I've found 3 issues, and left some high level feedback:

  • The new _state_lock is an async aiologic.Lock, but it's being used with plain with in sync methods like resume, set_callback_func, set_callback_fun_sending, and get_response_generator, which will fail at runtime; either make these methods async or avoid using the async lock in them.
  • In set_queue_done, _queue_done is set to True before _put_to_queue runs; if _put_to_queue fails for reasons other than BrokenResourceError, the queue will be marked done without the done marker being enqueued—consider only flipping _queue_done after a successful send or handling non-BrokenResourceError exceptions explicitly.
  • The new _wait_for_continue / wait_to_suspend synchronization logic is more complex and uses shared futures plus _state_lock; it would be helpful to double-check for potential races (e.g., wait_to_suspend clearing __suspend_signal while a waiter is acquiring the lock) and possibly add brief comments around those edge cases to explain the intended ordering.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new `_state_lock` is an async `aiologic.Lock`, but it's being used with plain `with` in sync methods like `resume`, `set_callback_func`, `set_callback_fun_sending`, and `get_response_generator`, which will fail at runtime; either make these methods async or avoid using the async lock in them.
- In `set_queue_done`, `_queue_done` is set to `True` before `_put_to_queue` runs; if `_put_to_queue` fails for reasons other than `BrokenResourceError`, the queue will be marked done without the done marker being enqueued—consider only flipping `_queue_done` after a successful send or handling non-`BrokenResourceError` exceptions explicitly.
- The new `_wait_for_continue` / `wait_to_suspend` synchronization logic is more complex and uses shared futures plus `_state_lock`; it would be helpful to double-check for potential races (e.g., `wait_to_suspend` clearing `__suspend_signal` while a waiter is acquiring the lock) and possibly add brief comments around those edge cases to explain the intended ordering.

## Individual Comments

### Comment 1
<location path="src/amrita_sense/streaming.py" line_range="121-130" />
<code_context>
+                shared_fut = self.__resume_signal
+                is_first = True
+
+            await asyncio.sleep(0)
+
+            if is_first:
+                fut = shared_fut
+            else:
</code_context>
<issue_to_address>
**issue (bug_risk):** Cancellation before `fut` initialization leads to `UnboundLocalError` in `finally`.

If the task is cancelled during `await asyncio.sleep(0)`, control goes to `finally` before `fut` is assigned in either branch, so referencing `fut` there raises `UnboundLocalError` instead of doing the intended cleanup. You can prevent this by initializing `fut = None` before the `async with` and guarding the cleanup (e.g. only resetting `__resume_signal` if `is_first` and `fut is not None`), or by moving `await asyncio.sleep(0)` to a point after `fut` is always assigned.
</issue_to_address>

### Comment 2
<location path="src/amrita_sense/streaming.py" line_range="183-159" />
<code_context>
         """Resume to run when suspend."""
-        if self.__resume_signal and not self.__resume_signal.done():
-            self.__resume_signal.set_result(True)
+        with self._state_lock:
+            if self.__resume_signal and not self.__resume_signal.done():
+                self.__resume_signal.set_result(True)

</code_context>
<issue_to_address>
**issue (bug_risk):** `resume` uses an async lock with a synchronous `with`, which will fail at runtime.

`_state_lock` is an `aiologic.Lock`, which must be used with `async with`, not a synchronous `with` block. In its current form, `resume()` will raise a `TypeError` on first use. Update this to either make `resume` async and use `async with self._state_lock`, or change `_state_lock` to a synchronous lock type and use it consistently across the class.
</issue_to_address>

### Comment 3
<location path="tests/test_object_stream.py" line_range="26" />
<code_context>
+        yield item


 @pytest.mark.asyncio
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for `wait_to_suspend` timeout behavior to ensure state is correctly reset after a timeout.

The new `wait_to_suspend` uses `_state_lock` and resets `__suspend_signal` / `_suspend_tags` in a `finally` block, and we already cover the "already waiting" and full suspend/resume cases. What’s missing is a test where `wait_to_suspend(..., timeout=...)` actually times out. Please add a test that:

1. Calls `await obj.wait_to_suspend("tag", timeout=small_value)` and asserts `asyncio.TimeoutError`.
2. Verifies a subsequent `wait_to_suspend("tag", timeout=...)` still works, confirming the timeout didn’t leave `__suspend_signal` / `_suspend_tags` in a bad state.

This would help catch any regressions where a timeout leaves the object stuck in an "already waiting" or inconsistent state.

Suggested implementation:

```python
# A. suspend decorator (4 tests)
# ============================================================================
=======
async def _async_gen(items: list):
    """Create an async generator from a list."""
    for item in items:
        yield item


@pytest.mark.asyncio
        assert suspend, "Suspend not called"
    finally:
        hd.cancel()


# ============================================================================
# A. suspend decorator (4 tests)
# ============================================================================


@pytest.mark.asyncio
async def test_wait_to_suspend_timeout_resets_state(suspendable_obj):
    """
    wait_to_suspend(...) should reset internal state when it times out so that a
    subsequent wait_to_suspend on the same tag still works.
    """
    # 1) First call: let it time out and ensure we get asyncio.TimeoutError
    with pytest.raises(asyncio.TimeoutError):
        await suspendable_obj.wait_to_suspend("timeout-tag", timeout=0.01)

    # 2) Second call: verify state was reset and we can wait again successfully
    async def trigger_suspend():
        # Give wait_to_suspend a chance to start before triggering suspend
        await asyncio.sleep(0.01)
        await suspendable_obj.suspend("timeout-tag")

    task = asyncio.create_task(trigger_suspend())
    try:
        suspend = await suspendable_obj.wait_to_suspend("timeout-tag", timeout=0.5)
        assert suspend, "Suspend not called after timeout reset"
    finally:
        task.cancel()

```

To wire this test into your existing codebase you will likely need to:

1. Replace the `suspendable_obj` fixture parameter with whatever fixture/object you already use to exercise `wait_to_suspend` in your existing tests (e.g. `obj`, `handler`, `stream`, etc.).
2. Adjust the `suspendable_obj.suspend("timeout-tag")` call if your API uses a different method name or signature to trigger suspension.
3. Ensure `asyncio` and `pytest` are imported at the top of `tests/test_object_stream.py` (they almost certainly already are given other async tests in this file).
4. Optionally tune the timeout values (`0.01` / `0.5`) if your CI environments are slow; e.g. use `0.05` / `1.0` while still ensuring the first call reliably times out and the second reliably completes.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread src/amrita_sense/streaming.py
Comment thread src/amrita_sense/streaming.py
Comment thread tests/test_object_stream.py
@JohnRichard4096

Copy link
Copy Markdown
Member Author

@sourcery-ai title

@JohnRichard4096 JohnRichard4096 merged commit 85fee13 into main Jun 19, 2026
11 checks passed
@JohnRichard4096 JohnRichard4096 deleted the feat branch June 19, 2026 06:45
@x42005e1f

Copy link
Copy Markdown

Using self._state_lock in two different ways (sync and async) within the same thread seems unsafe, as it can lead to deadlocks:

#!/usr/bin/env python3

import asyncio

import aiologic

lock = aiologic.Lock()


async def work():
    async with lock:
        await asyncio.sleep(0)


async def main():
    async with lock:
        task = asyncio.create_task(work())
        await asyncio.sleep(0)  # switch to the task

    with lock:  # deadlock!
        print("never")


if __name__ == "__main__":
    asyncio.run(main())

For example, obj.wait_to_suspend() + obj.resume() can block the entire event loop if the same object is used by different tasks within the same event loop. You should avoid such mixing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants