Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion wool/src/wool/runtime/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@
enough not to stall worker-loop teardown; past this timeout the drain
gives up, with the daemon-thread reap as the backstop."""

_WORKER_LOOP_TTL: Final[float] = 30.0
"""Idle time-to-live in seconds for the worker event-loop held by
`WorkerService._loop_pool`. The loop pool is keyed by a single
constant (``"worker"``), so a positive TTL keeps one worker loop +
daemon thread warm across dispatches instead of creating and tearing
one down on every call; recreating the loop per dispatch would add its
setup/teardown to dispatch latency and strand cleanup callbacks
scheduled on the torn-down loop. The TTL only bounds how long an
*idle* loop lingers before automatic teardown; an explicit `stop`
clears the pool immediately regardless of this value, so no daemon
thread outlives the service. The 30s window is generous enough to span
the gap between bursts of dispatches on a healthy worker while still
reaping a loop that has gone quiet."""


# public
@dataclass(frozen=True)
Expand Down Expand Up @@ -169,7 +183,7 @@ def __init__(self, *, backpressure: BackpressureLike | None = None):
self._loop_pool = ResourcePool(
factory=self._create_worker_loop,
finalizer=self._destroy_worker_loop,
ttl=0,
ttl=_WORKER_LOOP_TTL,
)

@property
Expand Down
71 changes: 71 additions & 0 deletions wool/tests/runtime/worker/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime
import multiprocessing.shared_memory
import threading
import time
import uuid
from types import MappingProxyType
from typing import Any
Expand All @@ -19,6 +20,7 @@

import wool.runtime.worker.pool as wp
from tests.helpers import scoped_context
from wool.runtime.context.factory import _loops_with_factory
from wool.runtime.context.factory import install_task_factory
from wool.runtime.discovery.base import DiscoveryEvent
from wool.runtime.worker.auth import WorkerCredentials
Expand Down Expand Up @@ -100,6 +102,75 @@ def _clear_worker_context():
wool.__worker_service__.reset(svc_token)


@pytest.fixture(autouse=True)
def _reap_worker_loops():
"""Stop worker event-loops that a dispatch left warm.

A test that dispatches through a `WorkerService` without stopping
it leaves the service's warm worker loop running on its daemon
thread (see `_WORKER_LOOP_TTL` for why the loop pool keeps a loop
warm across dispatches). Reap any such loop after the test so it
neither leaks a thread across tests nor survives to interpreter
exit, where wool's task-factory finalizer logs a spurious
displacement warning against the still-running loop.

Runs as an autouse teardown after the test's explicitly-requested
fixtures, so a loop those fixtures already closed (e.g.,
`worker_loop`) reads back closed and is skipped — only genuinely
leaked worker loops are stopped. A multi-generation residual-task
drain — matching the production finalizer
`WorkerService._destroy_worker_loop`, so a cancelled task's
follow-up cleanup is drained rather than stranded — then a stop is
scheduled onto the loop, and the worker thread closes it once
``run_forever`` returns.
"""
yield
leaked = [
loop
for loop in list(_loops_with_factory)
if loop.is_running() and not loop.is_closed()
]
for loop in leaked:

async def _shutdown():
# Drain successive generations of pending tasks, mirroring
# the production finalizer's ``_shutdown``: a cancelled
# task's ``finally`` can schedule a second generation, so
# cancel -> await -> repeat until none remain (or a bounded
# budget elapses), then stop. A single pass would strand
# that second generation, surfacing the intermittent
# "Task was destroyed but it is pending!" warning.
current = asyncio.current_task()
deadline = asyncio.get_running_loop().time() + 5.0
try:
while True:
pending = [t for t in asyncio.all_tasks() if t is not current]
if not pending:
break
for task in pending:
task.cancel()
remaining = deadline - asyncio.get_running_loop().time()
if remaining <= 0:
break
try:
await asyncio.wait_for(
asyncio.gather(*pending, return_exceptions=True),
timeout=remaining,
)
except TimeoutError:
break
finally:
asyncio.get_running_loop().stop()

try:
loop.call_soon_threadsafe(lambda loop=loop: loop.create_task(_shutdown()))
except RuntimeError:
continue
deadline = time.monotonic() + 5.0
while loop.is_running() and time.monotonic() < deadline:
time.sleep(0.005)


@pytest.fixture
def worker_loop():
"""Spin up a real worker loop on a daemon thread.
Expand Down
182 changes: 180 additions & 2 deletions wool/tests/runtime/worker/test_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
import threading
from contextlib import asynccontextmanager
from uuid import uuid4
Expand Down Expand Up @@ -52,8 +53,9 @@ def grpc_add_to_server():
def grpc_servicer():
service = WorkerService()
yield service
for entry in service._loop_pool._cache.values():
service._destroy_worker_loop(entry.obj)
# The autouse ``_reap_worker_loops`` fixture (see conftest) reaps
# any worker loop a dispatch left warm; see `_WORKER_LOOP_TTL` for
# why the pool keeps one warm.


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -290,6 +292,25 @@ async def _stop_streaming_routine():
raise


async def _worker_loop_identity_probe():
"""Report an identity fingerprint of the worker event-loop and its
daemon thread that this routine runs on.

The routine runs on the pooled worker loop — a distinct daemon
thread from the gRPC handler loop — so two sequential dispatches
through the same service report an identical fingerprint only when
the worker loop was kept warm and reused rather than recreated per
dispatch. The thread name embeds a monotonic counter that never
repeats across thread creations, so it distinguishes a reused
thread from a fresh one even if object ids or thread idents are
recycled. Defined at module level so cloudpickle can serialize the
callable for dispatch.
"""
loop = asyncio.get_running_loop()
thread = threading.current_thread()
return (id(loop), thread.ident, thread.name)


@pytest.fixture
@asynccontextmanager
async def service_fixture(mocker: MockerFixture, grpc_aio_stub):
Expand Down Expand Up @@ -2117,6 +2138,163 @@ async def test_stop_should_drain_every_generation_of_orphaned_cleanup_chain(
finally:
_drain_cleanup_observed = None

@pytest.mark.asyncio
async def test_dispatch_should_reuse_worker_loop_across_sequential_dispatches(
self, grpc_aio_stub, mock_worker_proxy_cache
):
"""Test `WorkerService.dispatch` runs sequential dispatches on a
single reused worker event-loop.

Given:
A running `WorkerService` and a routine that reports the
identity of the worker event-loop and daemon thread it runs
on
When:
Two dispatches are driven sequentially through the same
service
Then:
It should run both on the same worker loop and daemon
thread, proving the loop is kept warm and reused across
dispatches rather than created and destroyed per call
"""

# Arrange
async def dispatch_once(stub):
wool_task = make_task(_worker_loop_identity_probe)
stream = stub.dispatch()
await stream.write(protocol.Request(task=wool_task.to_protobuf()))
await stream.done_writing()
ack, result = [r async for r in stream]
assert ack.HasField("ack")
assert result.HasField("result")
return cloudpickle.loads(result.result.dump)

# Act
async with grpc_aio_stub() as stub:
first = await dispatch_once(stub)
second = await dispatch_once(stub)

# Assert
assert first == second, (
"both dispatches must run on the same warm worker loop and "
f"daemon thread; observed {first!r} then {second!r} — a "
"differing fingerprint means the loop was recreated per "
"dispatch instead of reused"
)

@pytest.mark.asyncio
async def test_stop_should_reap_warm_worker_loop_thread(
self, grpc_aio_stub, grpc_servicer, mock_worker_proxy_cache, caplog
):
"""Test `WorkerService.stop` tears down the warm worker loop and
its daemon thread.

Given:
A `WorkerService` that has serviced a dispatch, leaving one
worker event-loop and its daemon thread warm across the call
When:
The stop RPC is invoked
Then:
It should reap the warm daemon thread so no worker-loop
thread outlives the service, signal stopped state, and log
no task-factory displacement warning
"""
# Arrange — one dispatch warms a worker loop; the probe reports
# the daemon thread it runs on.
wool_task = make_task(_worker_loop_identity_probe)
async with grpc_aio_stub() as stub:
stream = stub.dispatch()
await stream.write(protocol.Request(task=wool_task.to_protobuf()))
await stream.done_writing()
ack, result = [r async for r in stream]
assert ack.HasField("ack")
assert result.HasField("result")
_, _, worker_thread_name = cloudpickle.loads(result.result.dump)

# The warm loop's daemon thread is still alive after the
# dispatch returns — the pool kept it warm across the call.
live_before = {t.name for t in threading.enumerate() if t.is_alive()}
assert worker_thread_name in live_before

# Act
with caplog.at_level(logging.WARNING, logger="wool.runtime.context.factory"):
stop_result = await asyncio.wait_for(
stub.stop(protocol.StopRequest(timeout=5)), 10
)

# Assert
assert isinstance(stop_result, protocol.Void)
assert grpc_servicer.stopped.is_set()
# The warm daemon thread was reaped — no worker-loop thread
# leaks past the stopped service.
live_after = {t.name for t in threading.enumerate() if t.is_alive()}
assert worker_thread_name not in live_after
# Clean teardown emits no task-factory displacement warning.
assert not any(
"task factory has been displaced" in record.getMessage()
for record in caplog.records
)

@pytest.mark.asyncio
async def test_dispatch_should_reap_idle_worker_loop_after_ttl_expiry(
self, grpc_aio_stub, mocker: MockerFixture, mock_worker_proxy_cache
):
"""Test `WorkerService` reaps the warm worker loop once its idle
TTL elapses without an explicit stop.

Given:
A `WorkerService` whose loop pool holds one worker loop warm
for a short idle TTL after a dispatch
When:
The idle TTL elapses with no further dispatch and no stop RPC
Then:
It should finalize the idle worker loop and reap its daemon
thread so no worker-loop thread lingers past the TTL window
"""
# Arrange — a short idle TTL so the warm loop is reaped
# promptly. The pool captures the TTL at construction, so patch
# the constant before building the service.
mocker.patch("wool.runtime.worker.service._WORKER_LOOP_TTL", 0.2)
service = WorkerService()

# Act — one dispatch warms a worker loop on a daemon thread; the
# probe reports the thread it runs on.
wool_task = make_task(_worker_loop_identity_probe)
async with grpc_aio_stub(servicer=service) as stub:
stream = stub.dispatch()
await stream.write(protocol.Request(task=wool_task.to_protobuf()))
await stream.done_writing()
ack, result = [r async for r in stream]
assert ack.HasField("ack")
assert result.HasField("result")
_, _, worker_thread_name = cloudpickle.loads(result.result.dump)

# The warm loop's daemon thread is alive right after the
# dispatch returns, well before the short idle TTL elapses.
live_names = {t.name for t in threading.enumerate() if t.is_alive()}
assert worker_thread_name in live_names

# The pool schedules its idle reap on the main loop, so
# yield to it and poll until the daemon thread is reaped.
# Bounded so a regression (idle loop never reaped) fails
# rather than hangs.
loop = asyncio.get_running_loop()
deadline = loop.time() + 5.0
while loop.time() < deadline:
await asyncio.sleep(0.05)
live_names = {t.name for t in threading.enumerate() if t.is_alive()}
if worker_thread_name not in live_names:
break

# Assert — the idle worker loop was finalized and its daemon
# thread reaped once the TTL expired, with no explicit stop.
live_after = {t.name for t in threading.enumerate() if t.is_alive()}
assert worker_thread_name not in live_after, (
"the warm worker loop's daemon thread must be reaped once "
"the idle TTL elapses; it is still alive, so the positive "
"TTL did not finalize the idle loop"
)

@pytest.mark.asyncio
async def test_dispatch_should_yield_results_in_order_when_async_generator(
self, grpc_aio_stub, mocker: MockerFixture, mock_worker_proxy_cache
Expand Down
5 changes: 3 additions & 2 deletions wool/tests/runtime/worker/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ def grpc_add_to_server():
def grpc_servicer():
service = WorkerService()
yield service
for entry in service._loop_pool._cache.values():
service._destroy_worker_loop(entry.obj)
# The autouse ``_reap_worker_loops`` fixture (see conftest) reaps
# any worker loop a dispatch left warm; see `_WORKER_LOOP_TTL` for
# why the pool keeps one warm.


@pytest.fixture(scope="function")
Expand Down
Loading