From b1827eee232ff99b4e3d0bd56c7561b9da06bb96 Mon Sep 17 00:00:00 2001 From: Conrad Date: Thu, 2 Jul 2026 19:01:12 -0400 Subject: [PATCH 1/2] fix: Keep the worker event loop warm across dispatches The worker service runs each dispatch's routine on a loop taken from a ResourcePool keyed by the constant "worker". The pool used ttl=0, so the loop's reference count fell to zero at the end of every dispatch and the finalizer tore the loop and its daemon thread down, only for the next dispatch to build them again. That per-dispatch churn, and the task drain the teardown runs, dominate dispatch latency and are implicated in a cross-loop lock error under concurrency. Give the pool a positive time-to-live, _WORKER_LOOP_TTL, so one warm loop and thread serve dispatches within the window and are torn down only once the loop sits idle past the TTL. An explicit stop still clears the pool at once, so no loop or thread outlives the service. --- wool/src/wool/runtime/worker/service.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/wool/src/wool/runtime/worker/service.py b/wool/src/wool/runtime/worker/service.py index bf233714..ee8591f9 100644 --- a/wool/src/wool/runtime/worker/service.py +++ b/wool/src/wool/runtime/worker/service.py @@ -38,6 +38,20 @@ enough not to stall worker-loop teardown; past this timeout the drain gives up, with the daemon-thread reap as the backstop.""" +_WORKER_LOOP_TTL: Final[float] = 30.0 +"""Idle time-to-live in seconds for the worker event-loop held by +`WorkerService._loop_pool`. The loop pool is keyed by a single +constant (``"worker"``), so a positive TTL keeps one worker loop + +daemon thread warm across dispatches instead of creating and tearing +one down on every call; recreating the loop per dispatch would add its +setup/teardown to dispatch latency and strand cleanup callbacks +scheduled on the torn-down loop. The TTL only bounds how long an +*idle* loop lingers before automatic teardown; an explicit `stop` +clears the pool immediately regardless of this value, so no daemon +thread outlives the service. The 30s window is generous enough to span +the gap between bursts of dispatches on a healthy worker while still +reaping a loop that has gone quiet.""" + # public @dataclass(frozen=True) @@ -169,7 +183,7 @@ def __init__(self, *, backpressure: BackpressureLike | None = None): self._loop_pool = ResourcePool( factory=self._create_worker_loop, finalizer=self._destroy_worker_loop, - ttl=0, + ttl=_WORKER_LOOP_TTL, ) @property From 64d0f33ae020c0c7a3702b926958d18842b4ea97 Mon Sep 17 00:00:00 2001 From: Conrad Date: Thu, 2 Jul 2026 19:01:12 -0400 Subject: [PATCH 2/2] test: Cover worker event-loop reuse and teardown Assert one warm worker loop and daemon thread serve two sequential dispatches, that stopping the service reaps the warm thread with no factory-displacement warning, and that an idle loop is reaped once its time-to-live elapses. Add an autouse teardown that reaps any loop a test leaves warm, draining residual tasks across successive generations as the production finalizer does, so a cancelled task's follow-up cleanup is not stranded. --- wool/tests/runtime/worker/conftest.py | 71 +++++++++ wool/tests/runtime/worker/test_service.py | 182 +++++++++++++++++++++- wool/tests/runtime/worker/test_session.py | 5 +- 3 files changed, 254 insertions(+), 4 deletions(-) diff --git a/wool/tests/runtime/worker/conftest.py b/wool/tests/runtime/worker/conftest.py index 0820722e..73a2febb 100644 --- a/wool/tests/runtime/worker/conftest.py +++ b/wool/tests/runtime/worker/conftest.py @@ -2,6 +2,7 @@ import datetime import multiprocessing.shared_memory import threading +import time import uuid from types import MappingProxyType from typing import Any @@ -19,6 +20,7 @@ import wool.runtime.worker.pool as wp from tests.helpers import scoped_context +from wool.runtime.context.factory import _loops_with_factory from wool.runtime.context.factory import install_task_factory from wool.runtime.discovery.base import DiscoveryEvent from wool.runtime.worker.auth import WorkerCredentials @@ -100,6 +102,75 @@ def _clear_worker_context(): wool.__worker_service__.reset(svc_token) +@pytest.fixture(autouse=True) +def _reap_worker_loops(): + """Stop worker event-loops that a dispatch left warm. + + A test that dispatches through a `WorkerService` without stopping + it leaves the service's warm worker loop running on its daemon + thread (see `_WORKER_LOOP_TTL` for why the loop pool keeps a loop + warm across dispatches). Reap any such loop after the test so it + neither leaks a thread across tests nor survives to interpreter + exit, where wool's task-factory finalizer logs a spurious + displacement warning against the still-running loop. + + Runs as an autouse teardown after the test's explicitly-requested + fixtures, so a loop those fixtures already closed (e.g., + `worker_loop`) reads back closed and is skipped — only genuinely + leaked worker loops are stopped. A multi-generation residual-task + drain — matching the production finalizer + `WorkerService._destroy_worker_loop`, so a cancelled task's + follow-up cleanup is drained rather than stranded — then a stop is + scheduled onto the loop, and the worker thread closes it once + ``run_forever`` returns. + """ + yield + leaked = [ + loop + for loop in list(_loops_with_factory) + if loop.is_running() and not loop.is_closed() + ] + for loop in leaked: + + async def _shutdown(): + # Drain successive generations of pending tasks, mirroring + # the production finalizer's ``_shutdown``: a cancelled + # task's ``finally`` can schedule a second generation, so + # cancel -> await -> repeat until none remain (or a bounded + # budget elapses), then stop. A single pass would strand + # that second generation, surfacing the intermittent + # "Task was destroyed but it is pending!" warning. + current = asyncio.current_task() + deadline = asyncio.get_running_loop().time() + 5.0 + try: + while True: + pending = [t for t in asyncio.all_tasks() if t is not current] + if not pending: + break + for task in pending: + task.cancel() + remaining = deadline - asyncio.get_running_loop().time() + if remaining <= 0: + break + try: + await asyncio.wait_for( + asyncio.gather(*pending, return_exceptions=True), + timeout=remaining, + ) + except TimeoutError: + break + finally: + asyncio.get_running_loop().stop() + + try: + loop.call_soon_threadsafe(lambda loop=loop: loop.create_task(_shutdown())) + except RuntimeError: + continue + deadline = time.monotonic() + 5.0 + while loop.is_running() and time.monotonic() < deadline: + time.sleep(0.005) + + @pytest.fixture def worker_loop(): """Spin up a real worker loop on a daemon thread. diff --git a/wool/tests/runtime/worker/test_service.py b/wool/tests/runtime/worker/test_service.py index f9c56295..3ccad5b6 100644 --- a/wool/tests/runtime/worker/test_service.py +++ b/wool/tests/runtime/worker/test_service.py @@ -1,4 +1,5 @@ import asyncio +import logging import threading from contextlib import asynccontextmanager from uuid import uuid4 @@ -52,8 +53,9 @@ def grpc_add_to_server(): def grpc_servicer(): service = WorkerService() yield service - for entry in service._loop_pool._cache.values(): - service._destroy_worker_loop(entry.obj) + # The autouse ``_reap_worker_loops`` fixture (see conftest) reaps + # any worker loop a dispatch left warm; see `_WORKER_LOOP_TTL` for + # why the pool keeps one warm. @pytest.fixture(scope="function") @@ -290,6 +292,25 @@ async def _stop_streaming_routine(): raise +async def _worker_loop_identity_probe(): + """Report an identity fingerprint of the worker event-loop and its + daemon thread that this routine runs on. + + The routine runs on the pooled worker loop — a distinct daemon + thread from the gRPC handler loop — so two sequential dispatches + through the same service report an identical fingerprint only when + the worker loop was kept warm and reused rather than recreated per + dispatch. The thread name embeds a monotonic counter that never + repeats across thread creations, so it distinguishes a reused + thread from a fresh one even if object ids or thread idents are + recycled. Defined at module level so cloudpickle can serialize the + callable for dispatch. + """ + loop = asyncio.get_running_loop() + thread = threading.current_thread() + return (id(loop), thread.ident, thread.name) + + @pytest.fixture @asynccontextmanager async def service_fixture(mocker: MockerFixture, grpc_aio_stub): @@ -2117,6 +2138,163 @@ async def test_stop_should_drain_every_generation_of_orphaned_cleanup_chain( finally: _drain_cleanup_observed = None + @pytest.mark.asyncio + async def test_dispatch_should_reuse_worker_loop_across_sequential_dispatches( + self, grpc_aio_stub, mock_worker_proxy_cache + ): + """Test `WorkerService.dispatch` runs sequential dispatches on a + single reused worker event-loop. + + Given: + A running `WorkerService` and a routine that reports the + identity of the worker event-loop and daemon thread it runs + on + When: + Two dispatches are driven sequentially through the same + service + Then: + It should run both on the same worker loop and daemon + thread, proving the loop is kept warm and reused across + dispatches rather than created and destroyed per call + """ + + # Arrange + async def dispatch_once(stub): + wool_task = make_task(_worker_loop_identity_probe) + stream = stub.dispatch() + await stream.write(protocol.Request(task=wool_task.to_protobuf())) + await stream.done_writing() + ack, result = [r async for r in stream] + assert ack.HasField("ack") + assert result.HasField("result") + return cloudpickle.loads(result.result.dump) + + # Act + async with grpc_aio_stub() as stub: + first = await dispatch_once(stub) + second = await dispatch_once(stub) + + # Assert + assert first == second, ( + "both dispatches must run on the same warm worker loop and " + f"daemon thread; observed {first!r} then {second!r} — a " + "differing fingerprint means the loop was recreated per " + "dispatch instead of reused" + ) + + @pytest.mark.asyncio + async def test_stop_should_reap_warm_worker_loop_thread( + self, grpc_aio_stub, grpc_servicer, mock_worker_proxy_cache, caplog + ): + """Test `WorkerService.stop` tears down the warm worker loop and + its daemon thread. + + Given: + A `WorkerService` that has serviced a dispatch, leaving one + worker event-loop and its daemon thread warm across the call + When: + The stop RPC is invoked + Then: + It should reap the warm daemon thread so no worker-loop + thread outlives the service, signal stopped state, and log + no task-factory displacement warning + """ + # Arrange — one dispatch warms a worker loop; the probe reports + # the daemon thread it runs on. + wool_task = make_task(_worker_loop_identity_probe) + async with grpc_aio_stub() as stub: + stream = stub.dispatch() + await stream.write(protocol.Request(task=wool_task.to_protobuf())) + await stream.done_writing() + ack, result = [r async for r in stream] + assert ack.HasField("ack") + assert result.HasField("result") + _, _, worker_thread_name = cloudpickle.loads(result.result.dump) + + # The warm loop's daemon thread is still alive after the + # dispatch returns — the pool kept it warm across the call. + live_before = {t.name for t in threading.enumerate() if t.is_alive()} + assert worker_thread_name in live_before + + # Act + with caplog.at_level(logging.WARNING, logger="wool.runtime.context.factory"): + stop_result = await asyncio.wait_for( + stub.stop(protocol.StopRequest(timeout=5)), 10 + ) + + # Assert + assert isinstance(stop_result, protocol.Void) + assert grpc_servicer.stopped.is_set() + # The warm daemon thread was reaped — no worker-loop thread + # leaks past the stopped service. + live_after = {t.name for t in threading.enumerate() if t.is_alive()} + assert worker_thread_name not in live_after + # Clean teardown emits no task-factory displacement warning. + assert not any( + "task factory has been displaced" in record.getMessage() + for record in caplog.records + ) + + @pytest.mark.asyncio + async def test_dispatch_should_reap_idle_worker_loop_after_ttl_expiry( + self, grpc_aio_stub, mocker: MockerFixture, mock_worker_proxy_cache + ): + """Test `WorkerService` reaps the warm worker loop once its idle + TTL elapses without an explicit stop. + + Given: + A `WorkerService` whose loop pool holds one worker loop warm + for a short idle TTL after a dispatch + When: + The idle TTL elapses with no further dispatch and no stop RPC + Then: + It should finalize the idle worker loop and reap its daemon + thread so no worker-loop thread lingers past the TTL window + """ + # Arrange — a short idle TTL so the warm loop is reaped + # promptly. The pool captures the TTL at construction, so patch + # the constant before building the service. + mocker.patch("wool.runtime.worker.service._WORKER_LOOP_TTL", 0.2) + service = WorkerService() + + # Act — one dispatch warms a worker loop on a daemon thread; the + # probe reports the thread it runs on. + wool_task = make_task(_worker_loop_identity_probe) + async with grpc_aio_stub(servicer=service) as stub: + stream = stub.dispatch() + await stream.write(protocol.Request(task=wool_task.to_protobuf())) + await stream.done_writing() + ack, result = [r async for r in stream] + assert ack.HasField("ack") + assert result.HasField("result") + _, _, worker_thread_name = cloudpickle.loads(result.result.dump) + + # The warm loop's daemon thread is alive right after the + # dispatch returns, well before the short idle TTL elapses. + live_names = {t.name for t in threading.enumerate() if t.is_alive()} + assert worker_thread_name in live_names + + # The pool schedules its idle reap on the main loop, so + # yield to it and poll until the daemon thread is reaped. + # Bounded so a regression (idle loop never reaped) fails + # rather than hangs. + loop = asyncio.get_running_loop() + deadline = loop.time() + 5.0 + while loop.time() < deadline: + await asyncio.sleep(0.05) + live_names = {t.name for t in threading.enumerate() if t.is_alive()} + if worker_thread_name not in live_names: + break + + # Assert — the idle worker loop was finalized and its daemon + # thread reaped once the TTL expired, with no explicit stop. + live_after = {t.name for t in threading.enumerate() if t.is_alive()} + assert worker_thread_name not in live_after, ( + "the warm worker loop's daemon thread must be reaped once " + "the idle TTL elapses; it is still alive, so the positive " + "TTL did not finalize the idle loop" + ) + @pytest.mark.asyncio async def test_dispatch_should_yield_results_in_order_when_async_generator( self, grpc_aio_stub, mocker: MockerFixture, mock_worker_proxy_cache diff --git a/wool/tests/runtime/worker/test_session.py b/wool/tests/runtime/worker/test_session.py index 73d2651a..444c1dc3 100644 --- a/wool/tests/runtime/worker/test_session.py +++ b/wool/tests/runtime/worker/test_session.py @@ -216,8 +216,9 @@ def grpc_add_to_server(): def grpc_servicer(): service = WorkerService() yield service - for entry in service._loop_pool._cache.values(): - service._destroy_worker_loop(entry.obj) + # The autouse ``_reap_worker_loops`` fixture (see conftest) reaps + # any worker loop a dispatch left warm; see `_WORKER_LOOP_TTL` for + # why the pool keeps one warm. @pytest.fixture(scope="function")