Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
da02f02
Initial plan
Copilot Jun 6, 2026
effa7e6
Make MP non-GPU store path fully async with preemption-aware flush
Copilot Jun 6, 2026
486bd20
Gate async non-GPU store on device capability with sync fallback
Copilot Jun 6, 2026
5b8a463
Add docstrings and clarify preemption schema comments per review
Copilot Jun 6, 2026
8a1e283
Refactor async non-GPU store into dedicated AsyncDataTransferContext
Copilot Jun 6, 2026
2ba9c49
Improve AsyncDataTransferContext docstrings per review
Copilot Jun 6, 2026
cd86a4e
test: align preemption-flush tests with real vLLM schema
hlin99 Jun 6, 2026
711f585
refactor: simplify preemption-flush detection to real vLLM fields
hlin99 Jun 6, 2026
0dee0ac
fix: wrap shm_view.copy_ with inference_mode(False) to avoid Inferenc…
hlin99 Jun 15, 2026
471b84b
fix: gather directly into SHM view when available, eliminating redund…
hlin99 Jun 15, 2026
8be4642
add logs
hlin99 Jun 16, 2026
1dd3bd7
Fix SHM worker host registration
Copilot Jun 16, 2026
b0374ad
Polish SHM pinning validation logs
Copilot Jun 16, 2026
4aa4c12
add logs
hlin99 Jun 16, 2026
b68992b
perf: move gather phase to background thread in AsyncDataTransferContext
Copilot Jun 16, 2026
9a3574a
remove semaphone
hlin99 Jun 16, 2026
98eff82
Add comprehensive profiling instrumentation to async_data.py
Copilot Jun 16, 2026
0fe5e8a
Add comprehensive profiling instrumentation to async_data.py
Copilot Jun 16, 2026
20190bc
fix log
hlin99 Jun 17, 2026
fe220ce
Fix missing total argument and use outer-scope used_shm_direct in FWD…
Copilot Jun 16, 2026
08e03b9
Add profiling instrumentation to CUDA IPC transfer path
Copilot Jun 17, 2026
01c2d7e
Fix timing variable scoping: initialize before try block to avoid Nam…
Copilot Jun 17, 2026
357aab4
Add E2E timing from submit_store_request to get_finished
Copilot Jun 17, 2026
11529a7
Remove redundant str() in E2E-STORE log call
Copilot Jun 17, 2026
959b005
feat(ops): add multi_layer_block_kv_transfer Python fallback as unifi…
hlin99 Jun 7, 2026
a12f41a
Optimize the Python fallback path for block transfer operations with …
hlin99 Jun 12, 2026
49e060d
add log
hlin99 Jun 17, 2026
3e6deea
add use_c_ops
hlin99 Jun 17, 2026
1a15c79
<debug> force data transfer
hlin99 Jun 18, 2026
5ecb60e
feat: add detailed timing instrumentation for non-GPU store path
Copilot Jun 18, 2026
d2cf6c6
refactor: use strategy_name property instead of isinstance checks
Copilot Jun 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion lmcache/integration/vllm/lmcache_mp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,11 @@ class LMCacheMPConnectorMetadata(KVConnectorMetadata):
def __init__(self):
super().__init__()
self.requests: list[LMCacheMPRequestMetadata] = []
# True only on scheduler steps where preemption/eviction may overwrite
# blocks referenced by in-flight async stores. Worker-side
# handle_preemptions() uses this hint to flush deferred gather
# (device->CPU copy) work before vLLM can overwrite source KV blocks.
self.need_flush: bool = False

def add_request_metadata(self, request_metadata: LMCacheMPRequestMetadata):
self.requests.append(request_metadata)
Expand All @@ -513,7 +518,7 @@ def __str__(self):
f"num_blocks={len(req_meta.op.block_ids[0])}, "
f"block_ids={req_meta.op.block_ids})"
)
return "[" + "\n".join(request_strs) + "]"
return f"need_flush={self.need_flush}; [" + "\n".join(request_strs) + "]"

def __repr__(self):
return self.__str__()
Expand Down Expand Up @@ -741,6 +746,17 @@ def wait_for_save(self):
request_ids, ops, event, cache_salts=cache_salts
)

def handle_preemptions(self, kv_connector_metadata: KVConnectorMetadata) -> None:
"""Flush async non-GPU gathers only when scheduler metadata requests it."""
worker_adapter = getattr(self, "worker_adapter", None)
if self.role != KVConnectorRole.WORKER or worker_adapter is None:
return
need_flush = (
isinstance(kv_connector_metadata, LMCacheMPConnectorMetadata)
and kv_connector_metadata.need_flush
)
worker_adapter.handle_preemptions(need_flush)

def get_finished(
self, finished_req_ids: set[str]
) -> tuple[set[str] | None, set[str] | None]:
Expand Down Expand Up @@ -963,6 +979,7 @@ def build_connector_meta(
scheduler_output (SchedulerOutput): the scheduler output object.
"""
metadata = LMCacheMPConnectorMetadata()
metadata.need_flush = self._scheduler_step_needs_flush(scheduler_output)

self._process_retrieve_requests(metadata)
self._process_new_requests(scheduler_output, metadata)
Expand All @@ -976,6 +993,43 @@ def build_connector_meta(

return metadata

def _scheduler_step_needs_flush(self, scheduler_output: SchedulerOutput) -> bool:
"""Return whether this scheduler step can overwrite preempted blocks.

Under-syncing here risks KV-block corruption (a paged block may be
overwritten before a deferred async gather reads it), while over-syncing
only costs performance, so we prefer a spurious flush over a missed one.

Signal fields are verified against vLLM main:
- ``CachedRequestData.resumed_req_ids``: requests resumed from
preemption this step. Their blocks are replaced (not appended), so an
in-flight gather against the old blocks must be flushed first.
- ``SchedulerOutput.preempted_req_ids``: requests preempted this step.
Their blocks are freed and may be reused, so the same applies.
"""
cached_reqs = getattr(scheduler_output, "scheduled_cached_reqs", None)

# Primary signal: requests resumed from preemption this step.
if getattr(cached_reqs, "resumed_req_ids", None):
return True

# Primary signal: requests preempted this step.
if getattr(scheduler_output, "preempted_req_ids", None):
return True

# Conservative fallback: if cached requests are present but the schema
# exposes no recognized ``resumed_req_ids`` field (e.g. a much older or
# forked vLLM), we cannot prove the step is preemption-free, so flush
# rather than risk corruption.
if cached_reqs is not None and not hasattr(cached_reqs, "resumed_req_ids"):
logger.warning(
"Unrecognized scheduled_cached_reqs schema (%s); conservatively "
"flushing in-flight async gathers to avoid KV block corruption.",
type(cached_reqs).__name__,
)
return True
return False

def update_connector_output(self, connector_output: KVConnectorOutput):
"""
Update KVConnector state from worker-side connectors output.
Expand Down
28 changes: 28 additions & 0 deletions lmcache/integration/vllm/vllm_multi_process_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import enum
import os
import threading
import time

# Third Party
import torch
Expand Down Expand Up @@ -936,6 +937,10 @@ def __init__(
# Prevents re-reporting the same ID after drain clears tracking sets.
self._returned_finished: set[str] = set()

# Timestamps recorded when submit_store_request is called, used to
# measure E2E wall-clock time until the future is resolved.
self._store_submit_times: dict[str, float] = {}

self.model_name = model_name
self.parallel_strategy = parallel_strategy

Expand Down Expand Up @@ -1185,6 +1190,7 @@ def submit_store_request(
)
self.store_futures[request_id] = future
self.store_events[request_id] = event
self._store_submit_times[request_id] = time.perf_counter()

@_lmcache_nvtx_annotate
def submit_retrieve_request(
Expand Down Expand Up @@ -1345,6 +1351,7 @@ def get_finished(
self.retrieve_futures.clear()
self.store_events.clear()
self.retrieve_events.clear()
self._store_submit_times.clear()

ret_stores = self._process_finished_stores(
finished_stores, finished_req_ids_from_engine
Expand All @@ -1363,6 +1370,15 @@ def get_finished(
if not s_future.query():
continue

_t_done = time.perf_counter()
_t_submit = self._store_submit_times.pop(request_id, None)
if _t_submit is not None:
logger.info(
"[E2E-STORE] req=%s e2e=%.3f ms",
request_id,
(_t_done - _t_submit) * 1000,
)

s_result = s_future.result()
finished_stores.add(request_id)

Expand Down Expand Up @@ -1430,6 +1446,18 @@ def get_block_ids_with_load_errors(self) -> set[int]:
self.error_block_ids.clear()
return errors

def handle_preemptions(self, need_flush: bool) -> None:
"""Handle worker-side preemption hints from connector metadata.

When ``need_flush`` is true, synchronize deferred non-GPU gather work
before the next forward pass can overwrite paged KV blocks.
"""
if not need_flush:
return
if not self.is_healthy or self.transfer_ctx is None:
return
self.transfer_ctx.flush_inflight_gathers()

def shutdown(self):
"""
Shutdown the LMCache MP worker adapter
Expand Down
Loading