From a33075eb1c5e3b7d12e903c6eb579188b80179b4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 16 Jun 2026 08:03:01 +0000 Subject: [PATCH 1/3] Initial plan From 51017394df4cbadf295b5f3a9bbece60c258be20 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 16 Jun 2026 08:05:12 +0000 Subject: [PATCH 2/3] Add comprehensive profiling instrumentation to async_data.py --- .../transfer_context/async_data.py | 77 ++++++++++++++++++- 1 file changed, 74 insertions(+), 3 deletions(-) diff --git a/lmcache/v1/multiprocess/transfer_context/async_data.py b/lmcache/v1/multiprocess/transfer_context/async_data.py index 23df512b52..853e4366db 100644 --- a/lmcache/v1/multiprocess/transfer_context/async_data.py +++ b/lmcache/v1/multiprocess/transfer_context/async_data.py @@ -6,6 +6,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import Any import threading +import time # Third Party import torch @@ -142,7 +143,6 @@ def submit_store( the background ``commit_executor``. Returns an unresolved future that resolves only after both gather completion and the commit ACK. """ - import time _t_entry = time.perf_counter() if self._non_gpu_context is None: raise RuntimeError( @@ -158,12 +158,16 @@ def submit_store( # pinned staging buffers that need to be released later (False). used_shm_direct = False try: + _t0 = time.perf_counter() with self._inflight_lock: if self._is_closing: completion.set_result(False) return completion + _t_lock = time.perf_counter() result = non_gpu_context.prepare_store(key, instance_id) + _t_prepare = time.perf_counter() + out_buffers, chunk_indices = result if result is not None else (None, None) if chunk_indices is not None and len(chunk_indices) == 0: # All chunks are already in cache: no gather, no commit. @@ -171,6 +175,8 @@ def submit_store( return completion full_block_ids = _single_group_block_ids(block_ids) + _t_block_ids = time.perf_counter() + num_chunks = ( len(chunk_indices) if chunk_indices is not None @@ -196,17 +202,24 @@ def submit_store( num_chunks, ) gather_target = staged_chunks + _t_alloc = time.perf_counter() # Capture variables for the closure _used_shm_direct = used_shm_direct _gather_target = gather_target + _t_submit_start = time.perf_counter() def _commit_after_gather() -> None: + _tb_entry = time.perf_counter() gather_done: Any | None = None ok = False try: + _tb0 = time.perf_counter() with torch.inference_mode(), torch_dev.stream(self._copy_stream): + _tb_stream_enter = time.perf_counter() + _event.wait(stream=self._copy_stream) + _tb_event_wait = time.perf_counter() gather_paged_kv_to_cpu( kv_caches, @@ -217,22 +230,49 @@ def _commit_after_gather() -> None: out=_gather_target, chunk_indices=chunk_indices, ) + _tb_gather = time.perf_counter() gather_done = torch_dev.Event() gather_done.record(self._copy_stream) + _tb_record = time.perf_counter() + + _tb_stream_exit = time.perf_counter() with self._inflight_lock: if gather_done is not None: self._inflight_gather_events.add(gather_done) + _tb_lock = time.perf_counter() if gather_done is not None: gather_done.synchronize() + _tb_sync = time.perf_counter() + ok = non_gpu_context.commit_store(key, instance_id, _gather_target) + _tb_commit = time.perf_counter() + if not ok: logger.error( "Async non-GPU commit_store failed for request_id=%s", _request_id, ) + + logger.info( + "[BG %s] thread_start=%.3f stream_enter=%.3f " + "event_wait=%.3f gather_launch=%.3f record=%.3f " + "stream_exit=%.3f lock=%.3f sync=%.3f commit=%.3f " + "total=%.3f ms", + _request_id, + (_tb0 - _tb_entry) * 1000, + (_tb_stream_enter - _tb0) * 1000, + (_tb_event_wait - _tb_stream_enter) * 1000, + (_tb_gather - _tb_event_wait) * 1000, + (_tb_record - _tb_gather) * 1000, + (_tb_stream_exit - _tb_record) * 1000, + (_tb_lock - _tb_stream_exit) * 1000, + (_tb_sync - _tb_lock) * 1000, + (_tb_commit - _tb_sync) * 1000, + (_tb_commit - _tb_entry) * 1000, + ) except Exception: logger.exception( "Async non-GPU store failed for request_id=%s", @@ -253,6 +293,7 @@ def _commit_after_gather() -> None: # only handles failures that occur *before* this submit, so it can never # double-release or double-resolve. commit_future = commit_executor.submit(_commit_after_gather) + _t_submit_end = time.perf_counter() except Exception: logger.exception("Failed to submit async non-GPU store") if staged_chunks: @@ -268,8 +309,22 @@ def _drop_commit_future(done_future: ConcurrentFuture[None]) -> None: self._inflight_commits.discard(done_future) commit_future.add_done_callback(_drop_commit_future) - logger.info("[submit_store] forward thread returned at %.3f ms since entry", - (time.perf_counter() - _t_entry) * 1000) + + _t_exit = time.perf_counter() + logger.info( + "[FWD %s] lock=%.3f prepare=%.3f block_ids=%.3f " + "alloc=%.3f submit=%.3f bookkeep=%.3f total=%.3f ms " + "(num_chunks=%d, shm=%s)", + _request_id, + (_t_lock - _t0) * 1000, + (_t_prepare - _t_lock) * 1000, + (_t_block_ids - _t_prepare) * 1000, + (_t_alloc - _t_block_ids) * 1000, + (_t_submit_end - _t_submit_start) * 1000, + (_t_exit - _t_submit_end) * 1000, + num_chunks, + _used_shm_direct, + ) return completion def flush_inflight_gathers(self) -> None: @@ -280,13 +335,21 @@ def flush_inflight_gathers(self) -> None: reading them. Only gather completion is awaited; commit futures are not affected, since commits read from LMCache-owned staging buffers. """ + _t0 = time.perf_counter() with self._inflight_lock: gather_events = list(self._inflight_gather_events) for event in gather_events: event.synchronize() + _t1 = time.perf_counter() + logger.info( + "[flush_inflight_gathers] synced %d events in %.3f ms", + len(gather_events), + (_t1 - _t0) * 1000, + ) def close(self) -> None: # Drain in-flight gather/commit work before closing the base context. + _t0 = time.perf_counter() with self._inflight_lock: self._is_closing = True gather_events = list(self._inflight_gather_events) @@ -295,5 +358,13 @@ def close(self) -> None: event.synchronize() except Exception: logger.exception("Failed while draining gather events") + _t1 = time.perf_counter() self._commit_executor.shutdown(wait=True, cancel_futures=False) + _t2 = time.perf_counter() + logger.info( + "[close] gather_drain=%.3f executor_shutdown=%.3f total=%.3f ms", + (_t1 - _t0) * 1000, + (_t2 - _t1) * 1000, + (_t2 - _t0) * 1000, + ) super().close() From 5d7407459c0c338d8222d0d08e14b4c6575867ec Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 16 Jun 2026 08:09:12 +0000 Subject: [PATCH 3/3] Fix missing total argument and use outer-scope used_shm_direct in FWD log --- lmcache/v1/multiprocess/transfer_context/async_data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lmcache/v1/multiprocess/transfer_context/async_data.py b/lmcache/v1/multiprocess/transfer_context/async_data.py index 853e4366db..0f35811da5 100644 --- a/lmcache/v1/multiprocess/transfer_context/async_data.py +++ b/lmcache/v1/multiprocess/transfer_context/async_data.py @@ -322,8 +322,9 @@ def _drop_commit_future(done_future: ConcurrentFuture[None]) -> None: (_t_alloc - _t_block_ids) * 1000, (_t_submit_end - _t_submit_start) * 1000, (_t_exit - _t_submit_end) * 1000, + (_t_exit - _t_entry) * 1000, num_chunks, - _used_shm_direct, + used_shm_direct, ) return completion