diff --git a/lmcache/v1/multiprocess/modules/gpu_transfer.py b/lmcache/v1/multiprocess/modules/gpu_transfer.py index 2ec9c25148..4ff7359956 100644 --- a/lmcache/v1/multiprocess/modules/gpu_transfer.py +++ b/lmcache/v1/multiprocess/modules/gpu_transfer.py @@ -338,12 +338,14 @@ def store( """ st = time.perf_counter() obj_keys = self._ctx.resolve_obj_keys(key) + _t_resolve = time.perf_counter() entry = self._gpu_contexts.get(instance_id) if entry is None: raise ValueError(f"No GPU context registered for instance ID {instance_id}") gpu_context = entry.gpu_context model_name = entry.model_name + _num_groups = gpu_context.kv_layer_groups_manager.num_groups # NOTE: different engine groups may have different block sizes, so # ``blocks_per_chunk[i]`` is the number of blocks in one chunk for @@ -363,6 +365,7 @@ def store( block_ids_per_group_gpu = gpu_context.copy_view_block_ids_to_gpu( gpu_block_ids ) + _t_copy_ids = time.perf_counter() # Fail closed: every LMCache group must have block IDs covering all # chunks. A short list (e.g. a caller/protocol bug) would otherwise @@ -397,6 +400,7 @@ def store( gpu_context.device, event_ipc_handle ) vllm_event.wait(stream=gpu_context.stream) + _t_event_wait = time.perf_counter() # CPU-synchronous sentinel: a GPU store is about to be enqueued. # Must be published via publish() (not publish_on_stream) so the @@ -421,14 +425,21 @@ def store( }, ), ) + _t_publish = time.perf_counter() reserved_dict: dict[ObjectKey, MemoryObj] = {} store_succeeded = False + _t_reserve = _t_publish + _t_loop_end = _t_publish + _t_record_start = _t_publish + _t_record_end = _t_publish + _t_callback_end = _t_publish try: layout_desc = get_layout_desc(gpu_context, self._ctx.chunk_size) reserved_dict = self._ctx.storage_manager.reserve_write( obj_keys, layout_desc, "new" ) + _t_reserve = time.perf_counter() # NOTE: Store is not batched because some obj_keys may be # skipped (not in reserved_dict), making block_ids @@ -441,6 +452,7 @@ def store( else: continue + _t_chunk_start = time.perf_counter() # Copy from GPU paged buffer to tmp buffer, then to CPU — per # group. Each group uses its own block-id list (HMA). for group_idx in range(num_groups): @@ -467,16 +479,28 @@ def store( gpu_context.gpu_kv_format_, 0, ) + _t_kernel_end = time.perf_counter() # Store is not batched, so we always use chunk_idx=0 (single slot) lmcache_memcpy_async_d2h( gpu_context.get_tmp_gpu_buffer_flat(chunk_idx=0), memory_obj ) + _t_memcpy_end = time.perf_counter() + logger.info( + "[GPU-STORE-CHUNK] req=%s chunk_idx=%d kernel=%.3f memcpy_d2h=%.3f ms", + key.request_id, + idx, + (_t_kernel_end - _t_chunk_start) * 1000, + (_t_memcpy_end - _t_kernel_end) * 1000, + ) store_succeeded = True + _t_loop_end = time.perf_counter() except Exception: logger.exception("Cannot store keys due to exception") return event.ipc_handle(), False finally: + _t_record_start = time.perf_counter() event.record() + _t_record_end = time.perf_counter() # Fail closed: commit the reserved objects only when every chunk # copied successfully; otherwise the whole store is skipped. stored_count = len(reserved_dict) if store_succeeded else 0 @@ -486,6 +510,7 @@ def store( "finish_write", list(reserved_dict.keys()), ) + _t_callback_end = time.perf_counter() # All reserved MemoryObjs share one layout_desc, so per-object # size is identical — avoid summing N identical values. total_bytes = ( @@ -509,6 +534,24 @@ def store( ) ed = time.perf_counter() + logger.info( + "[GPU-STORE] req=%s resolve_keys=%.3f copy_block_ids=%.3f " + "event_ipc_wait=%.3f event_publish=%.3f reserve_write=%.3f " + "kernel_loop=%.3f event_record=%.3f submit_cb=%.3f total=%.3f ms " + "(num_chunks=%d, num_groups=%d)", + key.request_id, + (_t_resolve - st) * 1000, + (_t_copy_ids - _t_resolve) * 1000, + (_t_event_wait - _t_copy_ids) * 1000, + (_t_publish - _t_event_wait) * 1000, + (_t_reserve - _t_publish) * 1000, + (_t_loop_end - _t_reserve) * 1000, + (_t_record_end - _t_record_start) * 1000, + (_t_callback_end - _t_record_end) * 1000, + (ed - st) * 1000, + len(obj_keys), + _num_groups, + ) if length := len(reserved_dict): logger.info( "Stored %d tokens in %.3f seconds", diff --git a/lmcache/v1/multiprocess/transfer_context/worker_transfer.py b/lmcache/v1/multiprocess/transfer_context/worker_transfer.py index cbf715feaa..4b475bef76 100644 --- a/lmcache/v1/multiprocess/transfer_context/worker_transfer.py +++ b/lmcache/v1/multiprocess/transfer_context/worker_transfer.py @@ -7,6 +7,7 @@ from enum import Enum from typing import Any, Callable, Protocol import os +import time # Third Party import torch @@ -281,11 +282,26 @@ def submit_store( "Handle transfer context is not registered. " "Call register() before submit_store()." ) - return self._send_request( + _t0 = time.perf_counter() + ipc_handle = event.ipc_handle() + _t_ipc = time.perf_counter() + mq_future = self._send_request( self._mq_client, RequestType.STORE, - [key, instance_id, block_ids, event.ipc_handle()], - ).to_cuda_future() + [key, instance_id, block_ids, ipc_handle], + ) + _t_send = time.perf_counter() + cuda_future = mq_future.to_cuda_future() + _t_cuda = time.perf_counter() + logger.info( + "[FWD-IPC] req=%s ipc_handle=%.3f send_request=%.3f to_cuda_future=%.3f total=%.3f ms", + _request_id, + (_t_ipc - _t0) * 1000, + (_t_send - _t_ipc) * 1000, + (_t_cuda - _t_send) * 1000, + (_t_cuda - _t0) * 1000, + ) + return cuda_future def submit_retrieve( self,