Skip to content

feat: Maru storage backend to dev#14

Closed
seohui-XCENA wants to merge 32 commits into
devfrom
feat/maru-backend
Closed

feat: Maru storage backend to dev#14
seohui-XCENA wants to merge 32 commits into
devfrom
feat/maru-backend

Conversation

@seohui-XCENA

Copy link
Copy Markdown

No description provided.

jooho-XCENA and others added 16 commits March 6, 2026 12:44
* Address code review comments on Maru connector

- Add layerwise assertion guard
- Remove raw URL logging to prevent credential leakage
- Fix pool_size log format %s → %d and clarify URL-derived config
- Move MemoryObjMetadata/TensorMemoryObj to top-level imports
- Add robust error handling in parse_size for invalid inputs

* Replace assert with raise NotImplementedError for layerwise guard

assert is stripped in Python optimized mode (-O), so use a proper
exception to ensure the guard is always enforced in production.
- Rename CxlMemoryAllocator → CxlMemoryAdapter import
- Use handler.get_chunk_size() instead of handler.owned_region_manager.get_chunk_size()
- Remove deprecated maru_connector.py and maru_adapter.py (storage backend replaces connector)
* fix(maru): address medium/low review feedback for MaruBackend

- Add use_layerwise guard (NotImplementedError)
- Change zip strict=False to strict=True in batched_submit_put_task
- Add warning log when contains(pin=True) is called
- Add warning log for in-flight put_tasks on close()

* chore: remove pin warning

* feat: implement batched_async_contains and batched_get_non_blocking for MaruBackend

Enable MaruBackend to participate in StorageManager.async_lookup_and_prefetch()
by implementing the two required async lookup APIs. Both use asyncio.to_thread
to wrap sync RPC calls (handler.exists / handler.retrieve) without blocking the
event loop.

* fix: add maru:// URL scheme conversion and pin on get_blocking

- Convert maru:// to tcp:// in _create_handler for ZMQ compatibility
- Call memory_obj.pin() in get_blocking to match cleanup unpin,
  fixing pin_count=-1 warning on retrieve path

* fix(maru): only invoke on_complete_callback on successful store

Previously _async_store called on_complete_callback in the finally
block regardless of success/failure, which could signal false success
to callers and mask CXL page leaks on store failure.

Aligns with LocalCPUBackend and NixlDynamic which only call callback
on success.
* feat(maru): use batch RPC APIs for MaruHandler operations

Replace per-key for-loop RPC calls with single batch RPC calls:

- batched_submit_put_task: N x handler.store() → 1 x handler.batch_store()
- batched_async_contains: N x handler.exists() → 1 x handler.batch_exists()
- batched_get_non_blocking: N x handler.retrieve() → 1 x handler.batch_retrieve()

Add missing batch methods:
- batched_contains: new, using handler.batch_exists()
- batched_get_blocking: new, using handler.batch_retrieve()

* fix(maru): address design review findings for batch operations

- Move handle creation inside try block to prevent ghost keys in
  put_tasks on create_store_handle failure
- Only fire on_complete_callback for keys that succeeded in batch_store
- Remove observability code (metrics, time imports) — belongs in
  separate observability branch
- Update batched_submit_put_task docstring to reflect single-Future
  return semantics

* fix: resolve ruff-format and mypy pre-commit failures

- Fix ruff-format: whitespace in slice, logger arg formatting
- Fix mypy: guard sum(results)/len(results) with None check

* fix(maru): address PR #11 review feedback

- Add assert memory_obj.tensor is not None in batched_submit_put_task
  for parity with single-key submit_put_task
- Add summary debug logs to batched_get_blocking and
  batched_get_non_blocking for production observability
- Deduplicate batched_async_contains by delegating to batched_contains
* feat: add maru_as_primary_allocator config and PD mutual exclusion assert

- Assert enable_pd=False when maru_path is set (mutual exclusion)
- Add maru_as_primary_allocator config option (default: True)
- Update _get_allocator_backend to respect the new option

* fix: change maru_as_primary_allocator default to False

* refactor: remove maru_as_primary_allocator, prefer LocalCPUBackend when available

Simplify allocator selection: when MaruBackend is present, use
LocalCPUBackend if it exists, otherwise fall back to MaruBackend.

* refactor: remove PD mutual exclusion assert from MaruBackend creation

* feat: add explicit ImportError messages for maru dependencies

Follow mooncakestore_connector.py pattern to provide clear error
messages when maru or maru_lmcache packages are not installed.

* feat: port batch RPC calls from maru-connector with debug logging

Use handler.batch_exists, batch_retrieve, batch_store instead of
individual loop calls to reduce RPC round-trips.
Added [DEBUG] logs for batch call tracing (to be removed later).

* style: fix isort and ruff-format for maru_backend
* feat: implement MaruBackend pin/unpin and ref_count management

- contains(pin=True): call handler.exists_and_pin() for server-side pin
- batched_async_contains(): use batch_exists_and_pin RPC instead of per-key calls
- get_blocking(): remove local MemoryObj.pin(), keep ref_count_up() only
- submit_put_task(): add ref_count_up() x2 (pool ref + async_store guard)
- _async_store(): add ref_count_down() on completion
- pin()/unpin(): delegate to handler.pin_kv()/unpin_kv()
- batched_unpin(): batch RPC override for efficiency
- StorageManager.batched_unpin(): use MaruBackend.batched_unpin() when available

* feat: add server unpin in async cleanup_memory_objs for MaruBackend

- When MaruBackend is present, use batched_unpin RPC instead of local MemoryObj.unpin()
- Other backends keep existing memory_obj.unpin() behavior unchanged

* chore: apply ruff-format

* feat: add batched_unpin to StorageBackendInterface

- Add batched_unpin() default implementation to abstract_backend (loops unpin per key)
- MaruBackend overrides with single batch RPC for optimization
- StorageManager.batched_unpin() uses backend.batched_unpin() directly

* fix: revert batch RPC in batched_async_contains and add error handling in cleanup_memory_objs

- Replace batch_exists/batch_exists_and_pin with per-key contains() loop
  (batch RPC optimization will be handled in a separate PR)
- Add try/finally in cleanup_memory_objs to ensure ref_count_down runs
  even if batched_unpin fails

* fix: add ref_count management to batched_submit_put_task

Match submit_put_task pattern: ref_count_up x2 before async RPC,
ref_count_down in _async_batch_store finally block.

* refactor: simplify ref_count and remove local pin in MaruBackend

- ref_count: single up only (no down), matching LocalCPUBackend pool pattern
  1(alloc) → 2(ref_up) → 1(SM ref_down) — pool reference retained
- remove memory_obj.pin() from all get paths (batched_get_blocking,
  batched_get_non_blocking) — server pin handles eviction protection

* revert: remove batched_unpin and revert to per-key unpin

Revert batched_unpin optimization to keep implementation simple.
The batch RPC optimization will be addressed in a follow-up PR.

* fix: add ref_count_down on async store failure and MLA worker_id fix in remove

- Add ref_count_down() in _async_store() finally block when store fails
- Add ref_count_down() for failed stores in _async_batched_store()
- Add MLA worker_id normalization in remove() for consistency

* feat: add batched_unpin to MaruBackend for single-RPC batch optimization
Align method names with updated MaruHandler API:
- exists_and_pin -> pin
- batch_exists_and_pin -> batch_pin
- unpin_kv -> unpin
- batch_unpin_kv -> batch_unpin
@seohui-XCENA seohui-XCENA changed the title feat: Maru storage backend feat: Maru storage backend to dev Mar 20, 2026
hyunyul-XCENA and others added 11 commits March 20, 2026 05:45
…intent

- Fix AttributeError: MaruHandler exposes pin(), not pin_kv()
- Add comment explaining intentional ref_count retention on successful store

Signed-off-by: hyunyul-XCENA <hyunyul.cho@xcena.com>
- Replace old connector config (remote_url, remote_serde) with
  maru_path and maru_pool_size top-level parameters
- Remove maru_operation_timeout (unused in MaruBackend)
- Update maru_timeout_ms default from 2000 to 5000

Signed-off-by: hyunyul-XCENA <hyunyul.cho@xcena.com>
MaruBackend manages its own CXL memory allocation. LocalCPUBackend
must be disabled to avoid allocator conflicts.

Signed-off-by: hyunyul-XCENA <hyunyul.cho@xcena.com>
- Add MLA worker_id_as0 skip logic to submit_put_task and
  batched_submit_put_task, matching RemoteBackend behavior
- Fix test mock/assert to use handler.pin() instead of handler.pin_kv()
  (pin_kv is internal RPC method, pin is public MaruHandler API)
- Add MLA skip tests for both put paths
- Clean up docstring for unused params
- Re-raise exceptions in _async_store and _async_batch_store after
  logging, so Future reflects actual store failure instead of silent
  success (matches RemoteBackend error propagation pattern)
- Fix blocking_store test helper to avoid mock recursion
Add memory_obj.pin() in batched_get_non_blocking() so that
cleanup_memory_objs()'s memory_obj.unpin() call doesn't cause
negative pin_count warnings. Sync retrieve paths are unaffected
as they use server-side unpin via lookup_unpin().
* refactor: simplify maru_pool_size config from str to float (GB)

Align with existing config pattern (max_local_cpu_size uses float GB).
Remove _parse_pool_size string parser in favor of simple GB-to-bytes
conversion. This eliminates silent fallback on invalid values.

* fix: drain in-flight put tasks before closing MaruBackend

Wait for all pending async store tasks to complete before closing
the handler, consistent with PDBackend and GDSBackend patterns.
Prevents crash/data loss when _async_store is still running.
hyunyul-XCENA and others added 4 commits March 20, 2026 08:44
Verify that ref_count returns to pre-submit level and put_tasks
are cleaned up when handler.store / batch_store raises an exception.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants