From 502cfba8ed278c92db6f25c7da8516b65b83ba7a Mon Sep 17 00:00:00 2001 From: youngrok-XCENA Date: Mon, 16 Mar 2026 06:30:08 +0000 Subject: [PATCH 1/8] fix(maru): address medium/low review feedback for MaruBackend - Add use_layerwise guard (NotImplementedError) - Change zip strict=False to strict=True in batched_submit_put_task - Add warning log when contains(pin=True) is called - Add warning log for in-flight put_tasks on close() --- lmcache/v1/storage_backend/maru_backend.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/lmcache/v1/storage_backend/maru_backend.py b/lmcache/v1/storage_backend/maru_backend.py index f00f8e14d5..3825d5df49 100644 --- a/lmcache/v1/storage_backend/maru_backend.py +++ b/lmcache/v1/storage_backend/maru_backend.py @@ -53,6 +53,11 @@ def __init__( ): super().__init__(dst_device=dst_device) + if config.use_layerwise: + raise NotImplementedError( + "MaruBackend does not yet support layerwise KV cache." + ) + # 1. Config self.config = config self.loop = loop @@ -303,7 +308,7 @@ def batched_submit_put_task( List of Futures, one per key. """ futures = [] - for key, memory_obj in zip(keys, memory_objs, strict=False): + for key, memory_obj in zip(keys, memory_objs, strict=True): future = self.submit_put_task( key, memory_obj, on_complete_callback=on_complete_callback ) @@ -425,6 +430,13 @@ def contains(self, key: CacheEngineKey, pin: bool = False) -> bool: if self._mla_worker_id_as0_mode: key = key.with_new_worker_id(0) + if pin: + logger.warning( + "[Maru] contains(pin=True) requested but pin is not yet " + "supported — proceeding without pin for key=%s", + key, + ) + return self._handler.exists(key.to_string()) def pin(self, key: CacheEngineKey) -> bool: @@ -476,6 +488,13 @@ def remove(self, key: CacheEngineKey, force: bool = True) -> bool: def close(self) -> None: """Close the backend and underlying MaruHandler.""" + with self.put_lock: + pending = len(self.put_tasks) + if pending > 0: + logger.warning( + "[Maru] closing with %d in-flight put tasks still pending", + pending, + ) self.memory_allocator.close() self._handler.close() logger.info("MaruBackend closed.") From 1458b7bcfece1788a9dcf51632e1e8f2ed676d17 Mon Sep 17 00:00:00 2001 From: youngrok-XCENA Date: Mon, 16 Mar 2026 06:54:32 +0000 Subject: [PATCH 2/8] chore: remove pin warning --- lmcache/v1/storage_backend/maru_backend.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/lmcache/v1/storage_backend/maru_backend.py b/lmcache/v1/storage_backend/maru_backend.py index 3825d5df49..3eae33ccd0 100644 --- a/lmcache/v1/storage_backend/maru_backend.py +++ b/lmcache/v1/storage_backend/maru_backend.py @@ -430,13 +430,6 @@ def contains(self, key: CacheEngineKey, pin: bool = False) -> bool: if self._mla_worker_id_as0_mode: key = key.with_new_worker_id(0) - if pin: - logger.warning( - "[Maru] contains(pin=True) requested but pin is not yet " - "supported — proceeding without pin for key=%s", - key, - ) - return self._handler.exists(key.to_string()) def pin(self, key: CacheEngineKey) -> bool: From 4b2fbf2bf8cc758265a0709cc02e83a215c7db58 Mon Sep 17 00:00:00 2001 From: youngrok-XCENA Date: Mon, 16 Mar 2026 08:33:06 +0000 Subject: [PATCH 3/8] feat: implement batched_async_contains and batched_get_non_blocking for MaruBackend Enable MaruBackend to participate in StorageManager.async_lookup_and_prefetch() by implementing the two required async lookup APIs. Both use asyncio.to_thread to wrap sync RPC calls (handler.exists / handler.retrieve) without blocking the event loop. --- lmcache/v1/storage_backend/maru_backend.py | 66 ++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/lmcache/v1/storage_backend/maru_backend.py b/lmcache/v1/storage_backend/maru_backend.py index 3eae33ccd0..0c73cf0937 100644 --- a/lmcache/v1/storage_backend/maru_backend.py +++ b/lmcache/v1/storage_backend/maru_backend.py @@ -413,6 +413,72 @@ def get_blocking( ) return memory_obj + # ========================================================================= + # Async lookup API (used by StorageManager.async_lookup_and_prefetch) + # ========================================================================= + + async def batched_async_contains( + self, + lookup_id: str, + keys: List[CacheEngineKey], + pin: bool = False, + ) -> int: + """Check how many prefix keys exist on MaruServer. + + Prefix-based: returns the count of contiguous keys starting + from index 0 that exist. Stops at first miss. + + Args: + lookup_id: Unique request identifier. + keys: Keys to check in prefix order. + pin: Whether to pin. Not supported; logged as debug. + + Returns: + Number of prefix-contiguous keys that exist. + """ + + def _contains_prefix() -> int: + num_hit = 0 + for key in keys: + if not self.contains(key): + break + num_hit += 1 + return num_hit + + return await asyncio.to_thread(_contains_prefix) + + async def batched_get_non_blocking( + self, + lookup_id: str, + keys: list[CacheEngineKey], + transfer_spec: Any = None, + ) -> list[MemoryObj]: + """Non-blocking batched get via CXL direct read. + + Each key triggers a metadata lookup on MaruServer followed by + a zero-copy CXL memory read. Stops at first miss and returns + the prefix that was successfully retrieved. + + Args: + lookup_id: Unique request identifier. + keys: Keys to retrieve (already confirmed by contains). + transfer_spec: Unused. + + Returns: + List of MemoryObjs backed by CXL memory. + """ + + def _get_batch() -> list[MemoryObj]: + results: list[MemoryObj] = [] + for key in keys: + mem_obj = self.get_blocking(key) + if mem_obj is None: + break + results.append(mem_obj) + return results + + return await asyncio.to_thread(_get_batch) + # ========================================================================= # Contains / Pin / Unpin / Remove # ========================================================================= From c2e6579007b1d9268301b326eb87daa2359102f1 Mon Sep 17 00:00:00 2001 From: youngrok-XCENA Date: Mon, 16 Mar 2026 09:27:36 +0000 Subject: [PATCH 4/8] fix: add maru:// URL scheme conversion and pin on get_blocking - Convert maru:// to tcp:// in _create_handler for ZMQ compatibility - Call memory_obj.pin() in get_blocking to match cleanup unpin, fixing pin_count=-1 warning on retrieve path --- lmcache/v1/storage_backend/maru_backend.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lmcache/v1/storage_backend/maru_backend.py b/lmcache/v1/storage_backend/maru_backend.py index 0c73cf0937..01a86aada6 100644 --- a/lmcache/v1/storage_backend/maru_backend.py +++ b/lmcache/v1/storage_backend/maru_backend.py @@ -133,9 +133,14 @@ def _create_handler( """ assert config.maru_path is not None, "maru_path must be set for MaruBackend" + # Convert maru:// scheme to tcp:// for ZMQ + server_url = config.maru_path + if server_url.startswith("maru://"): + server_url = "tcp://" + server_url[len("maru://"):] + extra = config.extra_config or {} maru_config = MaruConfig( - server_url=config.maru_path, + server_url=server_url, instance_id=extra.get("maru_instance_id"), pool_size=self._parse_pool_size(config.maru_pool_size), chunk_size_bytes=self._full_chunk_size_bytes, @@ -404,6 +409,7 @@ def get_blocking( return None memory_obj.ref_count_up() + memory_obj.pin() logger.debug( "[Maru] get_blocking rid=%d pid=%d size=%d", From c415335e79325975eb4940db54000a2506668f82 Mon Sep 17 00:00:00 2001 From: youngrok-XCENA Date: Mon, 16 Mar 2026 16:11:41 +0000 Subject: [PATCH 5/8] fix(maru): only invoke on_complete_callback on successful store Previously _async_store called on_complete_callback in the finally block regardless of success/failure, which could signal false success to callers and mask CXL page leaks on store failure. Aligns with LocalCPUBackend and NixlDynamic which only call callback on success. --- lmcache/v1/storage_backend/maru_backend.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lmcache/v1/storage_backend/maru_backend.py b/lmcache/v1/storage_backend/maru_backend.py index 01a86aada6..ef4ef8a21b 100644 --- a/lmcache/v1/storage_backend/maru_backend.py +++ b/lmcache/v1/storage_backend/maru_backend.py @@ -336,6 +336,7 @@ async def _async_store( memory_obj: MemoryObj backed by CXL memory. on_complete_callback: Optional callback after registration. """ + success = False try: allocator = self.memory_allocator assert isinstance(allocator, CxlMemoryAdapter) @@ -343,6 +344,7 @@ async def _async_store( key_str = key.to_string() await asyncio.to_thread(self._handler.store, key_str, handle) + success = True logger.debug( "[Maru] store key=%s rid=%d pid=%d", @@ -357,7 +359,7 @@ async def _async_store( with self.put_lock: self.put_tasks.discard(key) - if on_complete_callback is not None: + if success and on_complete_callback is not None: try: on_complete_callback(key) except Exception as e: From b284d180422c6ba393904c818fda0be181c65c25 Mon Sep 17 00:00:00 2001 From: hyunyul-XCENA Date: Tue, 17 Mar 2026 02:04:42 +0000 Subject: [PATCH 6/8] feat(maru): add pool_id config for dax device selection Read maru_pool_id from extra_config and pass to MaruConfig. Supports int, list[int], and comma-separated string formats. Usage in LMCache YAML: extra_config: maru_pool_id: 0 # single pool maru_pool_id: [0, 1] # fallback chain maru_pool_id: "0,1,2" # comma-separated --- lmcache/v1/storage_backend/maru_backend.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lmcache/v1/storage_backend/maru_backend.py b/lmcache/v1/storage_backend/maru_backend.py index ef4ef8a21b..88721c91c9 100644 --- a/lmcache/v1/storage_backend/maru_backend.py +++ b/lmcache/v1/storage_backend/maru_backend.py @@ -139,7 +139,7 @@ def _create_handler( server_url = "tcp://" + server_url[len("maru://"):] extra = config.extra_config or {} - maru_config = MaruConfig( + maru_kwargs = dict( server_url=server_url, instance_id=extra.get("maru_instance_id"), pool_size=self._parse_pool_size(config.maru_pool_size), @@ -150,6 +150,15 @@ def _create_handler( max_inflight=extra.get("maru_max_inflight", 64), eager_map=extra.get("maru_eager_map", True), ) + pool_id = extra.get("maru_pool_id") + if pool_id is not None: + if isinstance(pool_id, list): + maru_kwargs["pool_id"] = [int(p) for p in pool_id] + elif isinstance(pool_id, str) and "," in pool_id: + maru_kwargs["pool_id"] = [int(p.strip()) for p in pool_id.split(",")] + else: + maru_kwargs["pool_id"] = int(pool_id) + maru_config = MaruConfig(**maru_kwargs) handler = MaruHandler(maru_config) if not handler.connect(): From fab7823a377411839cc0de5a2b66931b468d2415 Mon Sep 17 00:00:00 2001 From: hyunyul-XCENA Date: Tue, 17 Mar 2026 02:37:29 +0000 Subject: [PATCH 7/8] style: fix ruff-format slice spacing --- lmcache/v1/storage_backend/maru_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lmcache/v1/storage_backend/maru_backend.py b/lmcache/v1/storage_backend/maru_backend.py index 88721c91c9..7903c40b5d 100644 --- a/lmcache/v1/storage_backend/maru_backend.py +++ b/lmcache/v1/storage_backend/maru_backend.py @@ -136,7 +136,7 @@ def _create_handler( # Convert maru:// scheme to tcp:// for ZMQ server_url = config.maru_path if server_url.startswith("maru://"): - server_url = "tcp://" + server_url[len("maru://"):] + server_url = "tcp://" + server_url[len("maru://") :] extra = config.extra_config or {} maru_kwargs = dict( From 9897fced18c6327b98a90a15bc4eceef54cab3bb Mon Sep 17 00:00:00 2001 From: hyunyul-XCENA Date: Wed, 18 Mar 2026 07:16:28 +0000 Subject: [PATCH 8/8] fix(maru): add error handling and edge case guards for maru_pool_id - Wrap int() conversions in try/except with clear ValueError message - Handle empty list, empty string, and comma-only string as None (skip) - Filter out empty entries in comma-separated format --- lmcache/v1/storage_backend/maru_backend.py | 23 ++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/lmcache/v1/storage_backend/maru_backend.py b/lmcache/v1/storage_backend/maru_backend.py index 7903c40b5d..b2f80eadd3 100644 --- a/lmcache/v1/storage_backend/maru_backend.py +++ b/lmcache/v1/storage_backend/maru_backend.py @@ -152,12 +152,23 @@ def _create_handler( ) pool_id = extra.get("maru_pool_id") if pool_id is not None: - if isinstance(pool_id, list): - maru_kwargs["pool_id"] = [int(p) for p in pool_id] - elif isinstance(pool_id, str) and "," in pool_id: - maru_kwargs["pool_id"] = [int(p.strip()) for p in pool_id.split(",")] - else: - maru_kwargs["pool_id"] = int(pool_id) + try: + if isinstance(pool_id, list): + if pool_id: + maru_kwargs["pool_id"] = [int(p) for p in pool_id] + elif isinstance(pool_id, str): + stripped = pool_id.strip() + if stripped: + if "," in stripped: + maru_kwargs["pool_id"] = [ + int(p.strip()) for p in stripped.split(",") if p.strip() + ] + else: + maru_kwargs["pool_id"] = int(stripped) + else: + maru_kwargs["pool_id"] = int(pool_id) + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid maru_pool_id={pool_id!r}: {e}") from e maru_config = MaruConfig(**maru_kwargs) handler = MaruHandler(maru_config)