Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions lmcache/v1/multiprocess/transfer_context/async_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any
import threading
import time

# Third Party
import torch
Expand Down Expand Up @@ -142,7 +143,6 @@ def submit_store(
the background ``commit_executor``. Returns an unresolved future that
resolves only after both gather completion and the commit ACK.
"""
import time
_t_entry = time.perf_counter()
if self._non_gpu_context is None:
raise RuntimeError(
Expand All @@ -158,19 +158,25 @@ def submit_store(
# pinned staging buffers that need to be released later (False).
used_shm_direct = False
try:
_t0 = time.perf_counter()
with self._inflight_lock:
if self._is_closing:
completion.set_result(False)
return completion
_t_lock = time.perf_counter()

result = non_gpu_context.prepare_store(key, instance_id)
_t_prepare = time.perf_counter()

out_buffers, chunk_indices = result if result is not None else (None, None)
if chunk_indices is not None and len(chunk_indices) == 0:
# All chunks are already in cache: no gather, no commit.
completion.set_result(True)
return completion

full_block_ids = _single_group_block_ids(block_ids)
_t_block_ids = time.perf_counter()

num_chunks = (
len(chunk_indices)
if chunk_indices is not None
Expand All @@ -196,17 +202,24 @@ def submit_store(
num_chunks,
)
gather_target = staged_chunks
_t_alloc = time.perf_counter()

# Capture variables for the closure
_used_shm_direct = used_shm_direct
_gather_target = gather_target
_t_submit_start = time.perf_counter()

def _commit_after_gather() -> None:
_tb_entry = time.perf_counter()
gather_done: Any | None = None
ok = False
try:
_tb0 = time.perf_counter()
with torch.inference_mode(), torch_dev.stream(self._copy_stream):
_tb_stream_enter = time.perf_counter()

_event.wait(stream=self._copy_stream)
_tb_event_wait = time.perf_counter()

gather_paged_kv_to_cpu(
kv_caches,
Expand All @@ -217,22 +230,49 @@ def _commit_after_gather() -> None:
out=_gather_target,
chunk_indices=chunk_indices,
)
_tb_gather = time.perf_counter()

gather_done = torch_dev.Event()
gather_done.record(self._copy_stream)
_tb_record = time.perf_counter()

_tb_stream_exit = time.perf_counter()

with self._inflight_lock:
if gather_done is not None:
self._inflight_gather_events.add(gather_done)
_tb_lock = time.perf_counter()

if gather_done is not None:
gather_done.synchronize()
_tb_sync = time.perf_counter()

ok = non_gpu_context.commit_store(key, instance_id, _gather_target)
_tb_commit = time.perf_counter()

if not ok:
logger.error(
"Async non-GPU commit_store failed for request_id=%s",
_request_id,
)

logger.info(
"[BG %s] thread_start=%.3f stream_enter=%.3f "
"event_wait=%.3f gather_launch=%.3f record=%.3f "
"stream_exit=%.3f lock=%.3f sync=%.3f commit=%.3f "
"total=%.3f ms",
_request_id,
(_tb0 - _tb_entry) * 1000,
(_tb_stream_enter - _tb0) * 1000,
(_tb_event_wait - _tb_stream_enter) * 1000,
(_tb_gather - _tb_event_wait) * 1000,
(_tb_record - _tb_gather) * 1000,
(_tb_stream_exit - _tb_record) * 1000,
(_tb_lock - _tb_stream_exit) * 1000,
(_tb_sync - _tb_lock) * 1000,
(_tb_commit - _tb_sync) * 1000,
(_tb_commit - _tb_entry) * 1000,
)
except Exception:
logger.exception(
"Async non-GPU store failed for request_id=%s",
Expand All @@ -253,6 +293,7 @@ def _commit_after_gather() -> None:
# only handles failures that occur *before* this submit, so it can never
# double-release or double-resolve.
commit_future = commit_executor.submit(_commit_after_gather)
_t_submit_end = time.perf_counter()
except Exception:
logger.exception("Failed to submit async non-GPU store")
if staged_chunks:
Expand All @@ -268,8 +309,23 @@ def _drop_commit_future(done_future: ConcurrentFuture[None]) -> None:
self._inflight_commits.discard(done_future)

commit_future.add_done_callback(_drop_commit_future)
logger.info("[submit_store] forward thread returned at %.3f ms since entry",
(time.perf_counter() - _t_entry) * 1000)

_t_exit = time.perf_counter()
logger.info(
"[FWD %s] lock=%.3f prepare=%.3f block_ids=%.3f "
"alloc=%.3f submit=%.3f bookkeep=%.3f total=%.3f ms "
"(num_chunks=%d, shm=%s)",
_request_id,
(_t_lock - _t0) * 1000,
(_t_prepare - _t_lock) * 1000,
(_t_block_ids - _t_prepare) * 1000,
(_t_alloc - _t_block_ids) * 1000,
(_t_submit_end - _t_submit_start) * 1000,
(_t_exit - _t_submit_end) * 1000,
(_t_exit - _t_entry) * 1000,
num_chunks,
used_shm_direct,
)
return completion

def flush_inflight_gathers(self) -> None:
Expand All @@ -280,13 +336,21 @@ def flush_inflight_gathers(self) -> None:
reading them. Only gather completion is awaited; commit futures are not
affected, since commits read from LMCache-owned staging buffers.
"""
_t0 = time.perf_counter()
with self._inflight_lock:
gather_events = list(self._inflight_gather_events)
for event in gather_events:
event.synchronize()
_t1 = time.perf_counter()
logger.info(
"[flush_inflight_gathers] synced %d events in %.3f ms",
len(gather_events),
(_t1 - _t0) * 1000,
)

def close(self) -> None:
# Drain in-flight gather/commit work before closing the base context.
_t0 = time.perf_counter()
with self._inflight_lock:
self._is_closing = True
gather_events = list(self._inflight_gather_events)
Expand All @@ -295,5 +359,13 @@ def close(self) -> None:
event.synchronize()
except Exception:
logger.exception("Failed while draining gather events")
_t1 = time.perf_counter()
self._commit_executor.shutdown(wait=True, cancel_futures=False)
_t2 = time.perf_counter()
logger.info(
"[close] gather_drain=%.3f executor_shutdown=%.3f total=%.3f ms",
(_t1 - _t0) * 1000,
(_t2 - _t1) * 1000,
(_t2 - _t0) * 1000,
)
super().close()