diff --git a/wool/src/wool/runtime/worker/service.py b/wool/src/wool/runtime/worker/service.py index bf233714..ee8591f9 100644 --- a/wool/src/wool/runtime/worker/service.py +++ b/wool/src/wool/runtime/worker/service.py @@ -38,6 +38,20 @@ enough not to stall worker-loop teardown; past this timeout the drain gives up, with the daemon-thread reap as the backstop.""" +_WORKER_LOOP_TTL: Final[float] = 30.0 +"""Idle time-to-live in seconds for the worker event-loop held by +`WorkerService._loop_pool`. The loop pool is keyed by a single +constant (``"worker"``), so a positive TTL keeps one worker loop + +daemon thread warm across dispatches instead of creating and tearing +one down on every call; recreating the loop per dispatch would add its +setup/teardown to dispatch latency and strand cleanup callbacks +scheduled on the torn-down loop. The TTL only bounds how long an +*idle* loop lingers before automatic teardown; an explicit `stop` +clears the pool immediately regardless of this value, so no daemon +thread outlives the service. The 30s window is generous enough to span +the gap between bursts of dispatches on a healthy worker while still +reaping a loop that has gone quiet.""" + # public @dataclass(frozen=True) @@ -169,7 +183,7 @@ def __init__(self, *, backpressure: BackpressureLike | None = None): self._loop_pool = ResourcePool( factory=self._create_worker_loop, finalizer=self._destroy_worker_loop, - ttl=0, + ttl=_WORKER_LOOP_TTL, ) @property diff --git a/wool/tests/runtime/worker/conftest.py b/wool/tests/runtime/worker/conftest.py index 0820722e..73a2febb 100644 --- a/wool/tests/runtime/worker/conftest.py +++ b/wool/tests/runtime/worker/conftest.py @@ -2,6 +2,7 @@ import datetime import multiprocessing.shared_memory import threading +import time import uuid from types import MappingProxyType from typing import Any @@ -19,6 +20,7 @@ import wool.runtime.worker.pool as wp from tests.helpers import scoped_context +from wool.runtime.context.factory import _loops_with_factory from wool.runtime.context.factory import install_task_factory from wool.runtime.discovery.base import DiscoveryEvent from wool.runtime.worker.auth import WorkerCredentials @@ -100,6 +102,75 @@ def _clear_worker_context(): wool.__worker_service__.reset(svc_token) +@pytest.fixture(autouse=True) +def _reap_worker_loops(): + """Stop worker event-loops that a dispatch left warm. + + A test that dispatches through a `WorkerService` without stopping + it leaves the service's warm worker loop running on its daemon + thread (see `_WORKER_LOOP_TTL` for why the loop pool keeps a loop + warm across dispatches). Reap any such loop after the test so it + neither leaks a thread across tests nor survives to interpreter + exit, where wool's task-factory finalizer logs a spurious + displacement warning against the still-running loop. + + Runs as an autouse teardown after the test's explicitly-requested + fixtures, so a loop those fixtures already closed (e.g., + `worker_loop`) reads back closed and is skipped — only genuinely + leaked worker loops are stopped. A multi-generation residual-task + drain — matching the production finalizer + `WorkerService._destroy_worker_loop`, so a cancelled task's + follow-up cleanup is drained rather than stranded — then a stop is + scheduled onto the loop, and the worker thread closes it once + ``run_forever`` returns. + """ + yield + leaked = [ + loop + for loop in list(_loops_with_factory) + if loop.is_running() and not loop.is_closed() + ] + for loop in leaked: + + async def _shutdown(): + # Drain successive generations of pending tasks, mirroring + # the production finalizer's ``_shutdown``: a cancelled + # task's ``finally`` can schedule a second generation, so + # cancel -> await -> repeat until none remain (or a bounded + # budget elapses), then stop. A single pass would strand + # that second generation, surfacing the intermittent + # "Task was destroyed but it is pending!" warning. + current = asyncio.current_task() + deadline = asyncio.get_running_loop().time() + 5.0 + try: + while True: + pending = [t for t in asyncio.all_tasks() if t is not current] + if not pending: + break + for task in pending: + task.cancel() + remaining = deadline - asyncio.get_running_loop().time() + if remaining <= 0: + break + try: + await asyncio.wait_for( + asyncio.gather(*pending, return_exceptions=True), + timeout=remaining, + ) + except TimeoutError: + break + finally: + asyncio.get_running_loop().stop() + + try: + loop.call_soon_threadsafe(lambda loop=loop: loop.create_task(_shutdown())) + except RuntimeError: + continue + deadline = time.monotonic() + 5.0 + while loop.is_running() and time.monotonic() < deadline: + time.sleep(0.005) + + @pytest.fixture def worker_loop(): """Spin up a real worker loop on a daemon thread. diff --git a/wool/tests/runtime/worker/test_service.py b/wool/tests/runtime/worker/test_service.py index f9c56295..3ccad5b6 100644 --- a/wool/tests/runtime/worker/test_service.py +++ b/wool/tests/runtime/worker/test_service.py @@ -1,4 +1,5 @@ import asyncio +import logging import threading from contextlib import asynccontextmanager from uuid import uuid4 @@ -52,8 +53,9 @@ def grpc_add_to_server(): def grpc_servicer(): service = WorkerService() yield service - for entry in service._loop_pool._cache.values(): - service._destroy_worker_loop(entry.obj) + # The autouse ``_reap_worker_loops`` fixture (see conftest) reaps + # any worker loop a dispatch left warm; see `_WORKER_LOOP_TTL` for + # why the pool keeps one warm. @pytest.fixture(scope="function") @@ -290,6 +292,25 @@ async def _stop_streaming_routine(): raise +async def _worker_loop_identity_probe(): + """Report an identity fingerprint of the worker event-loop and its + daemon thread that this routine runs on. + + The routine runs on the pooled worker loop — a distinct daemon + thread from the gRPC handler loop — so two sequential dispatches + through the same service report an identical fingerprint only when + the worker loop was kept warm and reused rather than recreated per + dispatch. The thread name embeds a monotonic counter that never + repeats across thread creations, so it distinguishes a reused + thread from a fresh one even if object ids or thread idents are + recycled. Defined at module level so cloudpickle can serialize the + callable for dispatch. + """ + loop = asyncio.get_running_loop() + thread = threading.current_thread() + return (id(loop), thread.ident, thread.name) + + @pytest.fixture @asynccontextmanager async def service_fixture(mocker: MockerFixture, grpc_aio_stub): @@ -2117,6 +2138,163 @@ async def test_stop_should_drain_every_generation_of_orphaned_cleanup_chain( finally: _drain_cleanup_observed = None + @pytest.mark.asyncio + async def test_dispatch_should_reuse_worker_loop_across_sequential_dispatches( + self, grpc_aio_stub, mock_worker_proxy_cache + ): + """Test `WorkerService.dispatch` runs sequential dispatches on a + single reused worker event-loop. + + Given: + A running `WorkerService` and a routine that reports the + identity of the worker event-loop and daemon thread it runs + on + When: + Two dispatches are driven sequentially through the same + service + Then: + It should run both on the same worker loop and daemon + thread, proving the loop is kept warm and reused across + dispatches rather than created and destroyed per call + """ + + # Arrange + async def dispatch_once(stub): + wool_task = make_task(_worker_loop_identity_probe) + stream = stub.dispatch() + await stream.write(protocol.Request(task=wool_task.to_protobuf())) + await stream.done_writing() + ack, result = [r async for r in stream] + assert ack.HasField("ack") + assert result.HasField("result") + return cloudpickle.loads(result.result.dump) + + # Act + async with grpc_aio_stub() as stub: + first = await dispatch_once(stub) + second = await dispatch_once(stub) + + # Assert + assert first == second, ( + "both dispatches must run on the same warm worker loop and " + f"daemon thread; observed {first!r} then {second!r} — a " + "differing fingerprint means the loop was recreated per " + "dispatch instead of reused" + ) + + @pytest.mark.asyncio + async def test_stop_should_reap_warm_worker_loop_thread( + self, grpc_aio_stub, grpc_servicer, mock_worker_proxy_cache, caplog + ): + """Test `WorkerService.stop` tears down the warm worker loop and + its daemon thread. + + Given: + A `WorkerService` that has serviced a dispatch, leaving one + worker event-loop and its daemon thread warm across the call + When: + The stop RPC is invoked + Then: + It should reap the warm daemon thread so no worker-loop + thread outlives the service, signal stopped state, and log + no task-factory displacement warning + """ + # Arrange — one dispatch warms a worker loop; the probe reports + # the daemon thread it runs on. + wool_task = make_task(_worker_loop_identity_probe) + async with grpc_aio_stub() as stub: + stream = stub.dispatch() + await stream.write(protocol.Request(task=wool_task.to_protobuf())) + await stream.done_writing() + ack, result = [r async for r in stream] + assert ack.HasField("ack") + assert result.HasField("result") + _, _, worker_thread_name = cloudpickle.loads(result.result.dump) + + # The warm loop's daemon thread is still alive after the + # dispatch returns — the pool kept it warm across the call. + live_before = {t.name for t in threading.enumerate() if t.is_alive()} + assert worker_thread_name in live_before + + # Act + with caplog.at_level(logging.WARNING, logger="wool.runtime.context.factory"): + stop_result = await asyncio.wait_for( + stub.stop(protocol.StopRequest(timeout=5)), 10 + ) + + # Assert + assert isinstance(stop_result, protocol.Void) + assert grpc_servicer.stopped.is_set() + # The warm daemon thread was reaped — no worker-loop thread + # leaks past the stopped service. + live_after = {t.name for t in threading.enumerate() if t.is_alive()} + assert worker_thread_name not in live_after + # Clean teardown emits no task-factory displacement warning. + assert not any( + "task factory has been displaced" in record.getMessage() + for record in caplog.records + ) + + @pytest.mark.asyncio + async def test_dispatch_should_reap_idle_worker_loop_after_ttl_expiry( + self, grpc_aio_stub, mocker: MockerFixture, mock_worker_proxy_cache + ): + """Test `WorkerService` reaps the warm worker loop once its idle + TTL elapses without an explicit stop. + + Given: + A `WorkerService` whose loop pool holds one worker loop warm + for a short idle TTL after a dispatch + When: + The idle TTL elapses with no further dispatch and no stop RPC + Then: + It should finalize the idle worker loop and reap its daemon + thread so no worker-loop thread lingers past the TTL window + """ + # Arrange — a short idle TTL so the warm loop is reaped + # promptly. The pool captures the TTL at construction, so patch + # the constant before building the service. + mocker.patch("wool.runtime.worker.service._WORKER_LOOP_TTL", 0.2) + service = WorkerService() + + # Act — one dispatch warms a worker loop on a daemon thread; the + # probe reports the thread it runs on. + wool_task = make_task(_worker_loop_identity_probe) + async with grpc_aio_stub(servicer=service) as stub: + stream = stub.dispatch() + await stream.write(protocol.Request(task=wool_task.to_protobuf())) + await stream.done_writing() + ack, result = [r async for r in stream] + assert ack.HasField("ack") + assert result.HasField("result") + _, _, worker_thread_name = cloudpickle.loads(result.result.dump) + + # The warm loop's daemon thread is alive right after the + # dispatch returns, well before the short idle TTL elapses. + live_names = {t.name for t in threading.enumerate() if t.is_alive()} + assert worker_thread_name in live_names + + # The pool schedules its idle reap on the main loop, so + # yield to it and poll until the daemon thread is reaped. + # Bounded so a regression (idle loop never reaped) fails + # rather than hangs. + loop = asyncio.get_running_loop() + deadline = loop.time() + 5.0 + while loop.time() < deadline: + await asyncio.sleep(0.05) + live_names = {t.name for t in threading.enumerate() if t.is_alive()} + if worker_thread_name not in live_names: + break + + # Assert — the idle worker loop was finalized and its daemon + # thread reaped once the TTL expired, with no explicit stop. + live_after = {t.name for t in threading.enumerate() if t.is_alive()} + assert worker_thread_name not in live_after, ( + "the warm worker loop's daemon thread must be reaped once " + "the idle TTL elapses; it is still alive, so the positive " + "TTL did not finalize the idle loop" + ) + @pytest.mark.asyncio async def test_dispatch_should_yield_results_in_order_when_async_generator( self, grpc_aio_stub, mocker: MockerFixture, mock_worker_proxy_cache diff --git a/wool/tests/runtime/worker/test_session.py b/wool/tests/runtime/worker/test_session.py index 73d2651a..444c1dc3 100644 --- a/wool/tests/runtime/worker/test_session.py +++ b/wool/tests/runtime/worker/test_session.py @@ -216,8 +216,9 @@ def grpc_add_to_server(): def grpc_servicer(): service = WorkerService() yield service - for entry in service._loop_pool._cache.values(): - service._destroy_worker_loop(entry.obj) + # The autouse ``_reap_worker_loops`` fixture (see conftest) reaps + # any worker loop a dispatch left warm; see `_WORKER_LOOP_TTL` for + # why the pool keeps one warm. @pytest.fixture(scope="function")