Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 38 additions & 52 deletions lmcache/v1/multiprocess/transfer_context/async_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ class AsyncDataTransferContext(DataTransferContext):
/ SHM registration, no stream dependency) and ``submit_retrieve()`` (this
path does not change retrieve). Only the store is made async.

Store is two-phase:
1) gather: enqueue GPU->CPU copies on a dedicated copy stream. When SHM
buffers are available, gather writes directly into SHM views (matching
the synchronous path). Otherwise, gather targets pinned staging buffers.
2) commit: wait for gather completion in a background thread, then perform
commit_store() and resolve the returned future.
Store is two-phase, both executed entirely in a background thread:
1) gather: wait for the forward event on the copy stream, then enqueue
GPU->CPU copies. When SHM buffers are available, gather writes directly
into SHM views (matching the synchronous path). Otherwise, gather
targets pinned staging buffers.
2) commit: wait for gather completion (via a recorded CUDA event), then
perform commit_store() and resolve the returned future.

``submit_store`` only performs lightweight preparation (prepare_store,
buffer allocation) on the forward thread and immediately submits all
GPU/copy work to the background ``commit_executor``, so the forward thread
is never blocked by gather kernel launch latency.

This class is only instantiated by the factory when the device is
async-capable, so the constructor creates async resources unconditionally;
Expand Down Expand Up @@ -130,10 +136,12 @@ def submit_store(
_event: IPCEvent,
blocks_in_chunk: int,
) -> MessagingFuture:
"""Two-phase async store (gather on copy stream, deferred commit).
"""Two-phase async store (gather and commit both in background thread).

Returns an unresolved future that resolves only after both gather
completion and the commit ACK.
Performs lightweight preparation (prepare_store, buffer allocation) on
the forward thread and immediately submits the gather + commit work to
the background ``commit_executor``. Returns an unresolved future that
resolves only after both gather completion and the commit ACK.
"""
if self._non_gpu_context is None:
raise RuntimeError(
Expand All @@ -147,7 +155,6 @@ def submit_store(

semaphore.acquire()
staged_chunks: list[torch.Tensor] = []
gather_done: Any | None = None
# Whether we gathered directly into SHM views (True) or into
# pinned staging buffers that need to be released later (False).
used_shm_direct = False
Expand Down Expand Up @@ -193,52 +200,34 @@ def submit_store(
)
gather_target = staged_chunks

# Standard
import time
t00 = time.perf_counter()

with torch_dev.stream(self._copy_stream):
_event.wait(stream=self._copy_stream)
torch_dev.synchronize()
t1 = time.perf_counter()

gather_paged_kv_to_cpu(
kv_caches,
full_block_ids,
blocks_in_chunk,
layout_hints=self._layout_hints,
gpu_kv_format=self._gpu_kv_format,
out=gather_target,
chunk_indices=chunk_indices,
)
t2 = time.perf_counter()
torch_dev.synchronize()
t3 = time.perf_counter()

gather_done = torch_dev.Event()
gather_done.record(self._copy_stream)
t4 = time.perf_counter()
# Print intervals in milliseconds (ms)
logger.info(
"[Store Profiler] launch: %.3f ms | gpu_exec: %.3f ms | total: %.3f ms",
(t2 - t1) * 1000,
(t3 - t2) * 1000,
(t3 - t1) * 1000,
)
t11 = time.perf_counter()
logger.info("[Store Profiler] submit block time: %.3f ms", (t11 - t00) * 1000)

with self._inflight_lock:
if gather_done is not None:
self._inflight_gather_events.add(gather_done)

# Capture variables for the closure
_used_shm_direct = used_shm_direct
_gather_target = gather_target

def _commit_after_gather() -> None:
gather_done: Any | None = None
ok = False
try:
with torch_dev.stream(self._copy_stream):
_event.wait(stream=self._copy_stream)

gather_paged_kv_to_cpu(
kv_caches,
full_block_ids,
blocks_in_chunk,
layout_hints=self._layout_hints,
gpu_kv_format=self._gpu_kv_format,
out=_gather_target,
chunk_indices=chunk_indices,
)

gather_done = torch_dev.Event()
gather_done.record(self._copy_stream)

with self._inflight_lock:
if gather_done is not None:
self._inflight_gather_events.add(gather_done)

if gather_done is not None:
gather_done.synchronize()
ok = non_gpu_context.commit_store(key, instance_id, _gather_target)
Expand Down Expand Up @@ -272,9 +261,6 @@ def _commit_after_gather() -> None:
logger.exception("Failed to submit async non-GPU store")
if staged_chunks:
self._release_staging(staged_chunks)
if gather_done is not None:
with self._inflight_lock:
self._inflight_gather_events.discard(gather_done)
completion.set_result(False)
semaphore.release()
return completion
Expand Down