Skip to content

[Feature 4/N] PdL2Adapter: Submit/Query/Async Event Loop & Task Signaling #222

@hlin99

Description

@hlin99

Label
Please label your issue with "new feature", "l2-adapter", "multiprocess", "pd-backend", "pr/4", and any other relevant labels so that it can be easily categorized under LMCache Onboarding

Is your feature request related to a problem? Please describe.
The current PdL2Adapter skeleton lacks any submit/query async interface implementation – controllers (StoreController, PrefetchController) cannot hand off store, lookup or load tasks, poll for completion, or receive eventfd signals, nor verify task id lifecycle behavior. This blocks all controller/adapter integration and any end-to-end or CI workflow, as well as disables concurrency and parallelism testability.

Describe the solution you'd like

All info below is sufficient for zero-context implementation. Reference MockL2Adapter in mock_l2_adapter.py for canonical patterns.

1. Location

  • File: lmcache/v1/distributed/l2_adapters/pd_l2_adapter.py (extend existing class)
  • Unit test: tests/v1/distributed/l2_adapters/test_pd_l2_adapter.py (extend existing)

2. Methods to Implement (replace NotImplementedError stubs)

Method Behavior Eventfd
submit_store_task(keys, objects) -> L2TaskId Allocate task ID, push onto async queue, schedule coroutine
pop_completed_store_tasks() -> dict[L2TaskId, bool] Pop and return all completed store results
submit_lookup_and_lock_task(keys) -> L2TaskId Allocate task ID, execute lookup synchronously or via call_soon_threadsafe
query_lookup_and_lock_result(task_id) -> Bitmap | None Pop result if ready, else None
submit_unlock(keys) -> None Schedule unlock on event loop
submit_load_task(keys, objects) -> L2TaskId Allocate task ID, schedule async load coroutine
query_load_result(task_id) -> Bitmap | None Pop result if ready, else None

Each async coroutine, on completion, must:

  1. Write result into self._completed_*_tasks dict under self._lock
  2. Signal the corresponding eventfd: os.eventfd_write(self._*_efd, 1)

3. Internal State (from PR 2/N skeleton, now wired up)

self._next_task_id: L2TaskId = 0
self._completed_store_tasks: dict[L2TaskId, bool] = {}
self._completed_lookup_tasks: dict[L2TaskId, Bitmap] = {}
self._completed_load_tasks: dict[L2TaskId, Bitmap] = {}
self._lock = threading.Lock()

4. Async Execution Pattern (follow MockL2Adapter)

def submit_store_task(self, keys, objects) -> L2TaskId:
    with self._lock:
        task_id = self._get_next_task_id()
    asyncio.run_coroutine_threadsafe(
        self._execute_store(keys, objects, task_id), self._loop
    )
    return task_id

async def _execute_store(self, keys, objects, task_id):
    # TODO (PR 5/N): real RDMA/ZMQ transfer here
    # For now: immediate success stub
    success = True
    with self._lock:
        self._completed_store_tasks[task_id] = success
    os.eventfd_write(self._store_efd, 1)
  • Lookup can be synchronous via self._loop.call_soon_threadsafe()
  • Load should be async (same pattern as store)
  • All stubs should mark # TODO: PR 5/N — wire to RDMA/ZMQ

5. How Controllers Use These Methods

StoreController (store_controller.py):

  • Calls submit_store_task() → polls store_efd → calls pop_completed_store_tasks()

PrefetchController (prefetch_controller.py):

  • Calls submit_lookup_and_lock_task() → polls lookup_efd → calls query_lookup_and_lock_result()
  • Then submit_load_task() → polls load_efd → calls query_load_result()
  • Finally submit_unlock() to release L2 locks

6. Unit Tests Required

Test Description
test_submit_store_and_pop submit → wait for eventfd → pop returns task with success=True
test_submit_lookup_and_query submit → wait → query returns valid Bitmap
test_submit_load_and_query submit → wait → query returns valid Bitmap
test_task_ids_monotonic Multiple submits produce strictly increasing IDs
test_query_unknown_task_returns_none query with bogus task_id returns None
test_eventfd_signals_on_complete After submit, os.eventfd_read(efd) does not block
test_concurrent_submits Thread-safe: submit from 2 threads simultaneously
test_pop_clears_results After pop, second pop returns empty dict

Example:

import os, time

def test_submit_store_and_pop():
    cfg = PdL2AdapterConfig.from_dict({
        "type": "pd", "role": "sender",
        "peer_host": "127.0.0.1",
        "peer_init_port": [9051], "peer_alloc_port": [9052],
    })
    adapter = PdL2Adapter(cfg)
    keys = [make_test_object_key(i) for i in range(3)]
    objs = [make_test_memory_obj() for _ in range(3)]

    task_id = adapter.submit_store_task(keys, objs)
    assert task_id == 0

    # Wait for eventfd signal
    time.sleep(0.1)
    os.eventfd_read(adapter.get_store_event_fd())

    results = adapter.pop_completed_store_tasks()
    assert results[task_id] is True

    adapter.close()

Describe alternatives you've considered

  • Synchronous submit/query: would block other controller tasks and break polling contract.
  • Inlining true I/O here: better for later integration PR (not skeleton).
  • Deferring concurrency to later PRs: fails controller/adapter e2e validation now.

Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions