From 0d343931918c7a245ca3b705ec4d1804a45d0817 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 16 Jun 2026 06:23:17 +0000 Subject: [PATCH 1/2] Initial plan From a9c17e9149401f7cdba5327105c351faf7ab887b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 16 Jun 2026 06:32:43 +0000 Subject: [PATCH 2/2] perf: move gather phase to background thread in AsyncDataTransferContext Previously, submit_store performed the gather kernel launch (including _event.wait() and gather_paged_kv_to_cpu()) directly on the forward thread. When the copy stream has a pending event-wait (for the forward pass to finish), CUDA runtime throttles the CPU as kernels queue up on a stream with unresolved dependencies, blocking the forward thread for ~38ms on every store. This commit moves the entire gather phase into the background _commit_after_gather thread via the commit_executor. The forward thread now only does lightweight preparation (prepare_store, buffer allocation) and immediately submits the work and returns. Background thread now: 1. Acquires copy stream context 2. Inserts event-level wait for forward completion 3. Launches gather_paged_kv_to_cpu() 4. Records gather_done event on copy stream 5. Adds gather_done to _inflight_gather_events (under lock) 6. Synchronizes gather_done (waits for GPU gather to finish) 7. Calls commit_store() and resolves the future Also removes profiling remnants: import time, t00/t1/t2/t3/t4/t11 timing variables, Store Profiler logger.info calls, and the two torch_dev.synchronize() calls that were added for profiling only. --- .../transfer_context/async_data.py | 90 ++++++++----------- 1 file changed, 38 insertions(+), 52 deletions(-) diff --git a/lmcache/v1/multiprocess/transfer_context/async_data.py b/lmcache/v1/multiprocess/transfer_context/async_data.py index 61f886a3d8..6fc5183658 100644 --- a/lmcache/v1/multiprocess/transfer_context/async_data.py +++ b/lmcache/v1/multiprocess/transfer_context/async_data.py @@ -43,12 +43,18 @@ class AsyncDataTransferContext(DataTransferContext): / SHM registration, no stream dependency) and ``submit_retrieve()`` (this path does not change retrieve). Only the store is made async. - Store is two-phase: - 1) gather: enqueue GPU->CPU copies on a dedicated copy stream. When SHM - buffers are available, gather writes directly into SHM views (matching - the synchronous path). Otherwise, gather targets pinned staging buffers. - 2) commit: wait for gather completion in a background thread, then perform - commit_store() and resolve the returned future. + Store is two-phase, both executed entirely in a background thread: + 1) gather: wait for the forward event on the copy stream, then enqueue + GPU->CPU copies. When SHM buffers are available, gather writes directly + into SHM views (matching the synchronous path). Otherwise, gather + targets pinned staging buffers. + 2) commit: wait for gather completion (via a recorded CUDA event), then + perform commit_store() and resolve the returned future. + + ``submit_store`` only performs lightweight preparation (prepare_store, + buffer allocation) on the forward thread and immediately submits all + GPU/copy work to the background ``commit_executor``, so the forward thread + is never blocked by gather kernel launch latency. This class is only instantiated by the factory when the device is async-capable, so the constructor creates async resources unconditionally; @@ -130,10 +136,12 @@ def submit_store( _event: IPCEvent, blocks_in_chunk: int, ) -> MessagingFuture: - """Two-phase async store (gather on copy stream, deferred commit). + """Two-phase async store (gather and commit both in background thread). - Returns an unresolved future that resolves only after both gather - completion and the commit ACK. + Performs lightweight preparation (prepare_store, buffer allocation) on + the forward thread and immediately submits the gather + commit work to + the background ``commit_executor``. Returns an unresolved future that + resolves only after both gather completion and the commit ACK. """ if self._non_gpu_context is None: raise RuntimeError( @@ -147,7 +155,6 @@ def submit_store( semaphore.acquire() staged_chunks: list[torch.Tensor] = [] - gather_done: Any | None = None # Whether we gathered directly into SHM views (True) or into # pinned staging buffers that need to be released later (False). used_shm_direct = False @@ -193,52 +200,34 @@ def submit_store( ) gather_target = staged_chunks - # Standard - import time - t00 = time.perf_counter() - - with torch_dev.stream(self._copy_stream): - _event.wait(stream=self._copy_stream) - torch_dev.synchronize() - t1 = time.perf_counter() - - gather_paged_kv_to_cpu( - kv_caches, - full_block_ids, - blocks_in_chunk, - layout_hints=self._layout_hints, - gpu_kv_format=self._gpu_kv_format, - out=gather_target, - chunk_indices=chunk_indices, - ) - t2 = time.perf_counter() - torch_dev.synchronize() - t3 = time.perf_counter() - - gather_done = torch_dev.Event() - gather_done.record(self._copy_stream) - t4 = time.perf_counter() - # Print intervals in milliseconds (ms) - logger.info( - "[Store Profiler] launch: %.3f ms | gpu_exec: %.3f ms | total: %.3f ms", - (t2 - t1) * 1000, - (t3 - t2) * 1000, - (t3 - t1) * 1000, - ) - t11 = time.perf_counter() - logger.info("[Store Profiler] submit block time: %.3f ms", (t11 - t00) * 1000) - - with self._inflight_lock: - if gather_done is not None: - self._inflight_gather_events.add(gather_done) - # Capture variables for the closure _used_shm_direct = used_shm_direct _gather_target = gather_target def _commit_after_gather() -> None: + gather_done: Any | None = None ok = False try: + with torch_dev.stream(self._copy_stream): + _event.wait(stream=self._copy_stream) + + gather_paged_kv_to_cpu( + kv_caches, + full_block_ids, + blocks_in_chunk, + layout_hints=self._layout_hints, + gpu_kv_format=self._gpu_kv_format, + out=_gather_target, + chunk_indices=chunk_indices, + ) + + gather_done = torch_dev.Event() + gather_done.record(self._copy_stream) + + with self._inflight_lock: + if gather_done is not None: + self._inflight_gather_events.add(gather_done) + if gather_done is not None: gather_done.synchronize() ok = non_gpu_context.commit_store(key, instance_id, _gather_target) @@ -272,9 +261,6 @@ def _commit_after_gather() -> None: logger.exception("Failed to submit async non-GPU store") if staged_chunks: self._release_staging(staged_chunks) - if gather_done is not None: - with self._inflight_lock: - self._inflight_gather_events.discard(gather_done) completion.set_result(False) semaphore.release() return completion