Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions lmcache/v1/multiprocess/modules/gpu_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,14 @@ def store(
"""
st = time.perf_counter()
obj_keys = self._ctx.resolve_obj_keys(key)
_t_resolve = time.perf_counter()

entry = self._gpu_contexts.get(instance_id)
if entry is None:
raise ValueError(f"No GPU context registered for instance ID {instance_id}")
gpu_context = entry.gpu_context
model_name = entry.model_name
_num_groups = gpu_context.kv_layer_groups_manager.num_groups

# NOTE: different engine groups may have different block sizes, so
# ``blocks_per_chunk[i]`` is the number of blocks in one chunk for
Expand All @@ -363,6 +365,7 @@ def store(
block_ids_per_group_gpu = gpu_context.copy_view_block_ids_to_gpu(
gpu_block_ids
)
_t_copy_ids = time.perf_counter()

# Fail closed: every LMCache group must have block IDs covering all
# chunks. A short list (e.g. a caller/protocol bug) would otherwise
Expand Down Expand Up @@ -397,6 +400,7 @@ def store(
gpu_context.device, event_ipc_handle
)
vllm_event.wait(stream=gpu_context.stream)
_t_event_wait = time.perf_counter()

# CPU-synchronous sentinel: a GPU store is about to be enqueued.
# Must be published via publish() (not publish_on_stream) so the
Expand All @@ -421,14 +425,21 @@ def store(
},
),
)
_t_publish = time.perf_counter()

reserved_dict: dict[ObjectKey, MemoryObj] = {}
store_succeeded = False
_t_reserve = _t_publish
_t_loop_end = _t_publish
_t_record_start = _t_publish
_t_record_end = _t_publish
_t_callback_end = _t_publish
try:
layout_desc = get_layout_desc(gpu_context, self._ctx.chunk_size)
reserved_dict = self._ctx.storage_manager.reserve_write(
obj_keys, layout_desc, "new"
)
_t_reserve = time.perf_counter()

# NOTE: Store is not batched because some obj_keys may be
# skipped (not in reserved_dict), making block_ids
Expand All @@ -441,6 +452,7 @@ def store(
else:
continue

_t_chunk_start = time.perf_counter()
# Copy from GPU paged buffer to tmp buffer, then to CPU — per
# group. Each group uses its own block-id list (HMA).
for group_idx in range(num_groups):
Expand All @@ -467,16 +479,28 @@ def store(
gpu_context.gpu_kv_format_,
0,
)
_t_kernel_end = time.perf_counter()
# Store is not batched, so we always use chunk_idx=0 (single slot)
lmcache_memcpy_async_d2h(
gpu_context.get_tmp_gpu_buffer_flat(chunk_idx=0), memory_obj
)
_t_memcpy_end = time.perf_counter()
logger.info(
"[GPU-STORE-CHUNK] req=%s chunk_idx=%d kernel=%.3f memcpy_d2h=%.3f ms",
key.request_id,
idx,
(_t_kernel_end - _t_chunk_start) * 1000,
(_t_memcpy_end - _t_kernel_end) * 1000,
)
store_succeeded = True
_t_loop_end = time.perf_counter()
except Exception:
logger.exception("Cannot store keys due to exception")
return event.ipc_handle(), False
finally:
_t_record_start = time.perf_counter()
event.record()
_t_record_end = time.perf_counter()
# Fail closed: commit the reserved objects only when every chunk
# copied successfully; otherwise the whole store is skipped.
stored_count = len(reserved_dict) if store_succeeded else 0
Expand All @@ -486,6 +510,7 @@ def store(
"finish_write",
list(reserved_dict.keys()),
)
_t_callback_end = time.perf_counter()
# All reserved MemoryObjs share one layout_desc, so per-object
# size is identical — avoid summing N identical values.
total_bytes = (
Expand All @@ -509,6 +534,24 @@ def store(
)

ed = time.perf_counter()
logger.info(
"[GPU-STORE] req=%s resolve_keys=%.3f copy_block_ids=%.3f "
"event_ipc_wait=%.3f event_publish=%.3f reserve_write=%.3f "
"kernel_loop=%.3f event_record=%.3f submit_cb=%.3f total=%.3f ms "
"(num_chunks=%d, num_groups=%d)",
key.request_id,
(_t_resolve - st) * 1000,
(_t_copy_ids - _t_resolve) * 1000,
(_t_event_wait - _t_copy_ids) * 1000,
(_t_publish - _t_event_wait) * 1000,
(_t_reserve - _t_publish) * 1000,
(_t_loop_end - _t_reserve) * 1000,
(_t_record_end - _t_record_start) * 1000,
(_t_callback_end - _t_record_end) * 1000,
(ed - st) * 1000,
len(obj_keys),
_num_groups,
)
if length := len(reserved_dict):
logger.info(
"Stored %d tokens in %.3f seconds",
Expand Down
22 changes: 19 additions & 3 deletions lmcache/v1/multiprocess/transfer_context/worker_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from enum import Enum
from typing import Any, Callable, Protocol
import os
import time

# Third Party
import torch
Expand Down Expand Up @@ -281,11 +282,26 @@ def submit_store(
"Handle transfer context is not registered. "
"Call register() before submit_store()."
)
return self._send_request(
_t0 = time.perf_counter()
ipc_handle = event.ipc_handle()
_t_ipc = time.perf_counter()
mq_future = self._send_request(
self._mq_client,
RequestType.STORE,
[key, instance_id, block_ids, event.ipc_handle()],
).to_cuda_future()
[key, instance_id, block_ids, ipc_handle],
)
_t_send = time.perf_counter()
cuda_future = mq_future.to_cuda_future()
_t_cuda = time.perf_counter()
logger.info(
"[FWD-IPC] req=%s ipc_handle=%.3f send_request=%.3f to_cuda_future=%.3f total=%.3f ms",
_request_id,
(_t_ipc - _t0) * 1000,
(_t_send - _t_ipc) * 1000,
(_t_cuda - _t_send) * 1000,
(_t_cuda - _t0) * 1000,
)
return cuda_future

def submit_retrieve(
self,
Expand Down