Skip to content

[Feature 5/N] PdL2Adapter: Store/Load Full Data Path (L1 ↔ Staging ↔ RDMA) #223

@hlin99

Description

@hlin99

Label
Please label your issue with "new feature", "l2-adapter", "multiprocess", "pd-backend", "pr/5", and any other relevant labels.

Is your feature request related to a problem? Please describe.
The PdL2Adapter currently only supports task stubs; there is no real data flow implemented between L1 MemoryObj and remote via staging buffer and ZMQ+TransferChannel flow. Without a working store/load data path, MP mode and any RDMA-based cache integration is impossible to test or deploy. All integration and production use is blocked until this end-to-end link is ready.

Describe the solution you'd like

All info below references existing PDBackend (lmcache/v1/storage_backend/pd_backend.py) as the source-of-truth for the PD wire protocol.

1. Location

  • File: lmcache/v1/distributed/l2_adapters/pd_l2_adapter.py (extend existing)
  • Wire protocol types: lmcache/v1/storage_backend/pd_backend.py (AllocRequest, AllocResponse, ProxyNotif)
  • Unit test: tests/v1/distributed/l2_adapters/test_pd_l2_adapter_datapath.py

2. Sender Store Flow (replace PR 4/N stub in _execute_store)

submit_store_task(keys, memory_objs)
  │
  ▼
_execute_store(keys, objs, task_id)   [async coroutine on self._loop]
  │
  ├─ 1. Build AllocRequest from keys + memory_objs metadata
  │     - keys: list[str] via objectkey_to_cachekey_str() (#220)
  │     - fmt, shape, dtype, last_chunk_toks from MemoryObj.meta
  │
  ├─ 2. Send AllocRequest via ZMQ DEALER → receiver's alloc ROUTER
  │     - _ensure_peer_connection() if first time (lazy init)
  │     - Receive AllocResponse (already_sent_indexes, remote_indexes)
  │
  ├─ 3. Filter out already_sent_indexes (skip those keys)
  │
  ├─ 4. Copy L1 MemoryObj data → staging buffer (tensor.copy_)
  │
  ├─ 5. TransferChannel.batched_write(objects, transfer_spec)
  │     - transfer_spec = {"receiver_id": ..., "remote_indexes": [...]}
  │
  ├─ 6. Mark task complete: self._completed_store_tasks[task_id] = True
  │
  └─ 7. os.eventfd_write(self._store_efd, 1)

3. Receiver Alloc Server (background thread/coroutine)

_mem_alloc_loop()   [runs on self._loop or dedicated thread]
  │
  ├─ recv AllocRequest from ZMQ ROUTER
  ├─ For each key in request:
  │   ├─ If key already exists → add to already_sent_indexes
  │   └─ Else → allocate staging buffer slot → record remote_index
  ├─ Send AllocResponse back
  └─ Wait for RDMA write to land (TransferChannel handles this)

4. Receiver Load Flow (replace PR 4/N stub in _execute_load)

submit_load_task(keys, l1_memory_objs)
  │
  ▼
_execute_load(keys, l1_objs, task_id)  [async coroutine]
  │
  ├─ 1. For each key: find staging buffer data (from alloc server)
  ├─ 2. Copy staging → L1 MemoryObj (tensor.copy_)
  ├─ 3. Build result Bitmap (set bit for each successfully loaded key)
  ├─ 4. self._completed_load_tasks[task_id] = bitmap
  └─ 5. os.eventfd_write(self._load_efd, 1)

5. Lookup Flow (receiver side)

_execute_lookup(keys, task_id)
  │
  ├─ For each key: check if data exists in receiver's staging/data dict
  ├─ Build Bitmap, lock matched keys
  ├─ self._completed_lookup_tasks[task_id] = bitmap
  └─ os.eventfd_write(self._lookup_efd, 1)

6. Key Dependencies & Imports

Component Import Path Used For
AllocRequest/AllocResponse lmcache.v1.storage_backend.pd_backend Wire protocol messages
objectkey_to_cachekey_str lmcache.v1.distributed.l2_adapters.key_mapper (#220) Key serialization
CreateTransferChannel lmcache.v1.transfer_channel RDMA/NIXL data transfer
PagedCpuGpuMemoryAllocator lmcache.v1.memory_management Staging buffer mgmt
msgspec third-party ZMQ message encode/decode
Bitmap lmcache.native_storage_ops Result bitmaps

7. What Can Be Stubbed vs. Must Be Real

Component This PR Notes
ZMQ AllocRequest/Response Real Must encode/decode via msgspec
TransferChannel.batched_write Stub OK Use mock_memory channel for UT
Staging buffer alloc Real Use PagedCpuGpuMemoryAllocator with CPU
L1 MemoryObj memcpy Real tensor.copy_()
Peer connection (NIXL init) Stub OK Lazy init, skip in mock mode

8. Unit Tests Required

Test Description
test_sender_store_builds_alloc_request Verify AllocRequest fields from keys+objs
test_receiver_alloc_response Mock ZMQ, verify AllocResponse with correct indexes
test_store_end_to_end_mock_channel sender store → mock ZMQ → mock channel → eventfd
test_load_copies_data_to_l1 Verify tensor data actually copied into provided MemoryObj
test_lookup_returns_correct_bitmap Keys present → bit set; absent → bit clear
test_already_sent_skips_transfer AllocResponse.already_sent_indexes skips those keys
test_partial_alloc_failure Some keys fail alloc → partial bitmap, no crash

Example:

def test_sender_store_builds_alloc_request():
    keys = [make_test_object_key(i) for i in range(3)]
    objs = [make_test_memory_obj() for _ in range(3)]
    req = adapter._get_remote_alloc_request(keys, objs)
    assert len(req.keys) == 3
    assert req.dtype == "half"
    assert req.last_chunk_toks > 0

Describe alternatives you've considered

  • Reusing PDBackend classes directly within L2Adapter — not compatible, would break abstraction/layering.
  • Only mocking store/load at the API boundary — would not exercise actual protocol coverage, misses most bugs.
  • Deferring protocol conformance to later PRs — leaves pivotal async data path broken/unverified.

Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions