Refactor(umbp): introduce IUMBPClient interface for dual-scheme architecture#257
Open
isytwu wants to merge 62 commits into
Open
Refactor(umbp): introduce IUMBPClient interface for dual-scheme architecture#257isytwu wants to merge 62 commits into
isytwu wants to merge 62 commits into
Conversation
Collaborator
|
@isytwu Thanks for PR, I have two questions:
|
Collaborator
Author
|
17c9725 to
38ae911
Compare
718ef91 to
4cbb825
Compare
223ef79 to
b26edfc
Compare
…tecture - Add IUMBPClient ABC and CreateUMBPClient() factory - Implement StandaloneClient from old UMBPClient, remove all PoolClient code - Extract shared I/O drivers (POSIX/io_uring/SPDK) to top-level io/ and spdk/ - Decouple umbp_core from umbp_common (no gRPC/Protobuf for standalone) - Update pybind to use factory; sglang umbp_store.py unchanged - Migrate tests to StandaloneClient Made-with: Cursor
The source code uses distributed/ but the test directory was still named pool/ (legacy). Align test directory naming with source layout. Made-with: Cursor
Disambiguate from top-level storage/ (shared I/O infrastructure) by renaming the standalone tier management layer to local/tiers/. Made-with: Cursor
Header + stub .cpp with all IUMBPClient methods returning false/empty. Constructor validates config.distributed is set. Added to umbp_common CMake target. Actual master-led control flow is TODO. Made-with: Cursor
- Fill DistributedClient with MasterClient/IOEngine members and constructor/destructor lifecycle skeleton - Add Lookup RPC to proto/MasterClient/MasterServer (read-only existence check without access-count side-effects) - Extract CreateUMBPClient factory into umbp_client_factory.cpp (runtime switch via config.distributed, no compile flags) - pybind unconditionally links umbp_core + umbp_common Made-with: Cursor
…tion - DistributedClient wraps PoolClient (mmap DRAM pool + shared_mutex close guard) - PoolClient: local memcpy path, BatchPutRemote/BatchGetRemote with parallel RDMA - Proto: BatchRoutePut/Get/Finalize RPCs, depth field in FinalizeRequest - Router: batch methods (single GetAliveClients call) - Eviction: global LRU + depth-weighted tie-breaker + atomic lease + tier-aware - Finalize: idempotent with key/location/tier validation, retry with backoff - Cross-node fix: FinalizeAllocation uses target node_id, not requester - RecordAccess: atomic fields + shared_lock for read-priority - ParseLocationId: shared utility in types.h - pybind: register_memory/deregister_memory bindings - Tests: finalize idempotency, eviction, RDMA cross-node smoke (56 tests pass) Made-with: Cursor
Replaces the per-buffer byte-granularity PoolAllocator (DRAM/HBM only) with a page-granularity PageBitmapAllocator on the master, enabling cross-buffer discrete allocation and fixing the long-standing #2 Heartbeat-overwrites-tier_capacities bug. Master side - New include/umbp/distributed/page_bitmap_allocator.h: bitmap allocator with first-fit + cross-buffer fallback, idempotent Deallocate, caller-owned locking (no internal mutex). - ClientRecord stores std::map<TierType, shared_ptr<PageBitmapAllocator>> for DRAM/HBM (SSD path keeps capacity-only PoolAllocator). - Heartbeat is now per-tier owner-aware: master is the source of truth for DRAM/HBM available_bytes (Client's reported value is ignored); SSD continues to be Client-owned. total_bytes mismatches log a throttled WARN. - AllocateForPut returns AllocResult{location_id, pages, dram_memory_descs, page_size}; canonical location_id is "<buf>:p<idx>[,<idx>...][;<buf>:...]". - New ClientRegistry::GetDramMemoryDescsForPages() and GetNodeDramPageSize() for master_server route handlers. - RegisterClient accepts a per-node dram_page_size (0 falls back to ClientRegistryConfig::default_dram_page_size). Proto - New BufferMemoryDesc and PageLocation messages. - RoutePut/RouteGet (and Batch* counterparts) carry location_id, pages, dram_memory_descs (deduplicated, ascending) and page_size. - RegisterClientRequest carries dram_page_size. - Old single-buffer fields (dram_memory_desc, allocated_offset, buffer_index) are reserved. Client side - PoolClientConfig.dram_page_size flows through MasterClient::RegisterSelf. - New RemoteDramScatterWrite/RemoteDramScatterRead group pages by buffer_index and issue a single IOEngine BatchWrite/BatchRead (single-page is the trivial N=1, K=1 case). - New EnsureBufferDescsCached hydrates peer.dram_memories[] from the Route response's dram_memory_descs list before RDMA, eliminating the chicken-and-egg lazy-fetch problem on cross-buffer allocations. - Put/Get/BatchPut/BatchGet uniformly use the scatter-gather path. Eviction & cleanup - EvictionManager calls registry_.DeallocateForUnregister(node, location) which dispatches DRAM/HBM (PageBitmapAllocator::Deallocate) vs SSD (PoolAllocator::Deallocate) internally. Tests (144/144 pass) - New test_page_bitmap_allocator (33 cases) covers all three allocate strategies, idempotent free, location_id roundtrip and edge cases. - test_client_registry adds HeartbeatDoesNotOverwriteDramAvailable as a #2 regression test. - test_cross_node_smoke gains a CrossNodeMultiPage suite covering Strategy 1/2/3 end-to-end with real RDMA. Made-with: Cursor
Master allocates ceil(size/page_size) pages for non-aligned Puts; the client used to reject any size != num_pages * page_size and failed every such request. Accept size in ((N-1)*ps, N*ps] and only transfer the real tail bytes (local memcpy + scatter RDMA). GetRemote/BatchGetRemote also require size == Location.size to prevent stale-tail leaks and silent truncation. Tests: 4 new partial-tail cases with sentinel-byte guards. Made-with: Cursor
Adds a per-node dram_page_size override on UMBPDistributedConfig so sglang UMBPStore can align master's PageBitmapAllocator page_size with the actual KV-page byte size (avoids partial-tail waste). - common/config.h: new dram_page_size field on UMBPDistributedConfig (defaults to 2 MiB; 0 means fall back to master's registry-wide default_dram_page_size). - distributed_client.cpp: forward dc.dram_page_size to PoolClientConfig so it reaches Master via RegisterClient. - pybind: expose dram_page_size on the UMBPDistributedConfig binding. Made-with: Cursor
Replaces the per-key Lookup loop in DistributedClient::BatchExists / BatchExistsConsecutive with a single batched gRPC, eliminating 128 roundtrips per call in sglang's batch=128 probe path. - proto: BatchLookup RPC + BatchLookupRequest/Response - GlobalBlockIndex::BatchLookupExists takes one shared_lock for the whole batch (read-only, same no-side-effect semantics as Lookup) - MasterServer handler + MasterClient/PoolClient wrappers - tests: BatchLookupExistsMatchesSingleLookup + BatchLookupExistsHasNoMetricsSideEffects Made-with: Cursor
- PoolClient public API drops misleading Remote suffix - dram_page_size default unified (0 on client = delegate to master) - master_*/io_engine_* fields nested into UMBPMasterClientConfig / UMBPIoEngineConfig; ToPoolClientConfig helper replaces 17-line ctor glue. Pybind hard-cut; sglang updated in lockstep. Made-with: Cursor
…rollback Master auto-rolls back pending on FinalizeAllocation field mismatch and guards AllocateForPut invariants; PoolClient drops Abort on master-detected failures, keeping it only for remote RDMA scatter-write failures. Share SizeMatchesAllocation in types.h. Add FinalizeAutoRollback test.
New BatchAbortAllocation RPC; PoolClient::BatchPut accumulates write-failed items and flushes once at end-of-batch (worst case N -> 1 gRPC). Semantics mirror BatchFinalizeAllocation (wire OK + per-entry vector<bool>). Adds macro-gated PoolClient counters (MORI_UMBP_OBS_COUNTERS, OFF in release) used by the new tests. Made-with: Cursor
When a BatchPut entry's src is not registered for zero-copy, surface the silent staging-path degradation at batch granularity (60s/PoolClient throttle). Substring-distinct from the per-call WARN inside RemoteDramScatterWrite. Closes distributed-known-issues #12. Made-with: Cursor
Both shutdown-sensitive RPCs (Heartbeat in HeartbeatLoop, UnregisterClient in UnregisterSelf) now carry kMasterRpcShutdownTimeoutMs=3s. Worst-case destructor wall time ~6s instead of unbounded. Closes distributed-known-issues #7. Made-with: Cursor
Adds two virtuals with no-op defaults (Standalone inherits, Distributed overrides with uintptr_t signature). Pybind drops dynamic_cast and binds the interface methods directly. Also adds null/zero-size guards in PoolClient::RegisterMemory + null-safe DeregisterMemory (R3 follow-up: IOEngine::RegisterMemory does not null-check). Closes distributed-known-issues #15. Note: ABI break in IUMBPClient vtable — dependents must rebuild. Made-with: Cursor
Replace hardcoded TTLs, heartbeat/reaper intervals, RPC shutdown deadlines, lease retry counts, and SPDK proxy poll/yield/stale values with UMBP_* env overrides. Defaults match prior behavior. Centralized parser in umbp/common/env_time.h (parse-once-with-WARN, function-local static caching). Master logs the resolved timing config at startup. Full env reference in src/umbp/doc/runtime-env-vars.md. Made-with: Cursor
Three-phase locking with shared_lock-paired index_ load and Phase 3 client re-check. Adds 3 unit tests; TSan + distributed regressions green. Made-with: Cursor
- 3-phase pipeline: per-peer snapshot+bucket -> single IOEngine::BatchWrite fire-and-return -> drain+map failures - Cross-item SG fusion: items_per_pair = batch in single-peer single-buf - LOCAL memcpy + staging fallback overlap with NIC RDMA in flight - Two-layer exception guard drains in-flight on Phase 1 bad_alloc - Adds virtual IssueBatchWrite test seam, 4 obs counters, fused unit tests, and a v2-only micro-bench
Mirror BatchPut v2 3-phase pipeline: per-peer fused BatchRead with fire-and-return + Wait, overlapping NIC RDMA with LOCAL memcpy and STG fallback. Adds IssueBatchRead test seam, 4 BatchGet* obs counters, fused unit tests, and a v2-only micro-bench.
- Adds mori_umbp_master_client_rpc_latency_seconds (Histogram) and _errors_total / _metrics_dropped_total counters - flushed through the existing ReportMetrics pipeline + new Grafana dashboard.
- Replace per-observation histogram wire encoding with cumulative HistogramAggregate; ReportMetrics payload bounded by series cardinality, not QPS - Add MetricsServer::observeAggregated for master-side per-bucket merge; /metrics text byte-equivalent to legacy - Widen Observe to take bounds by const-ref; reuse static kBounds in RecordRpcLatency to drop per-RPC alloc - Reserve MetricKey buffer + per-accumulator warned_mismatch dedup - Direct cap-branch test + 2 aggregation tests via friend-based test fixture hook
- Refactor current design to decouple master & client allocation logics - New pattern follows eventual consistency and master-as-advisor
a432c03 to
58fd9c9
Compare
Snapshot the metric maps under the lock then format the body without holding it, so RPC handlers' addCounter/setGauge/observeAggregated are not blocked by the scrape.
…r node Match() previously overwrote each node's tier with whatever hash was processed last, so the returned NodeMatch.tier was non-deterministic when a node held the same block-set across multiple tiers. Aggregate into per-node Accumulators and pick the lowest TierType value (HBM < DRAM < SSD), with explicit handling so TierType::UNKNOWN (=0) never wins over a known tier. Add two regression tests: - MatchReturnsFastestTierForMixedNodeHits: HBM wins over DRAM/SSD on the same node - MatchDoesNotLetUnknownOverrideKnownTier: UNKNOWN entries don't shadow the known DRAM/SSD entries on the same node
Replace the single best-tier return value on ExternalKvNodeMatch with
hashes_by_tier (map<TierType, vector<string>>) so callers can see the
actual per-tier distribution of cache hits on each node, not just the
fastest tier. The cost-aware scheduler needs per-tier counts to pick
the worker with the lowest data-movement cost; collapsing to best-tier
loses the L2/L3 distribution that drives that decision.
API surface:
- proto: ExternalKvNodeMatch.matched_hashes/tier removed in favour of
repeated TierHashes hashes_by_tier (per-tier {tier, hashes} buckets).
Wrapper message used because proto3 disallows map<int, repeated_*>.
- C++ structs: ExternalKvBlockIndex::NodeMatch, MasterClient::Externa
lKvNodeMatch, IUMBPClient::ExternalKvMatch all expose
std::map<TierType, std::vector<std::string>> hashes_by_tier and a
MatchedHashCount() convenience method.
- Match() in external_kv_block_index aggregates per (node, tier) so
the same node holding hashes at multiple tiers reports each bucket
separately (tested).
- pybind11 exposes hashes_by_tier as a Python dict keyed by
UMBPTierType; matched_hash_count() returns the total.
Tests:
- Rewrites test_external_kv_block_index covering single tier,
per-tier splitting on the same node, UNKNOWN bucket, multi-node
grouping, register-overwrite, and concurrency (12 cases).
- Updates test_client_registry_external_kv assertions.
- Updates Python integration tests and demo script.
No backward-compatibility shims — callers must read hashes_by_tier
(or call matched_hash_count()) instead of matched_hashes/tier.
The KV index used to track a single tier per (node, hash) pair — every
Register call overwrote the previous tier, and Match() returned only the
best tier each block was reported on. Physically a HiCache block can
live on GPU + CPU + Storage simultaneously (write_through creates a CPU
mirror while the GPU copy is still alive; backup adds an SSD copy on top),
so the old single-tier view collapsed real layout information that the
cost-aware scheduler needs to estimate per-tier fetch cost.
Switch the index to an additive model: each (node, hash) tracks a
std::set<TierType>; Register adds a tier without dropping existing ones;
Revoke takes a tier parameter and drops only that bucket. Match()
expands per-tier so a block on HBM+DRAM appears in both buckets, and
NodeMatch.MatchedHashCount() returns the *distinct* count (size of the
union) so callers don't double-count multi-tier hashes.
API surface:
- proto: RevokeExternalKvBlocksRequest gains a field; new
RevokeAllExternalKvBlocksAtTier RPC for bulk-wiping a whole tier
(storage clear/detach, host pool reset).
- C++: ExternalKvBlockIndex now exposes Register (additive),
Unregister(tier-specific), UnregisterByNodeAtTier (bulk),
UnregisterByNode (all tiers). IUMBPClient / DistributedClient /
PoolClient / MasterClient signatures threaded with tier; new
RevokeAllExternalKvBlocksAtTier method.
- pybind: revoke_external_kv_blocks(hashes, tier),
revoke_all_external_kv_blocks_at_tier(tier).
ClientRegistry tracks tier on revoke and exposes
UnregisterExternalKvBlocksByTier so the master server can route the new
bulk RPC into the index.
Tests:
- Rewrites test_external_kv_block_index covering additive register,
per-tier revoke leaving other tiers intact, bulk-revoke at tier
semantics, multi-tier same-hash matching, and dedup in
MatchedHashCount() / GetKvCount() (17 cases).
- Updates test_client_registry_external_kv signatures.
- Adds Python integration tests for additive register, tier-specific
revoke, and bulk revoke at tier.
Docs (docs/api/umbp.rst): updates UMBPExternalKvNodeMatch field
description, methods table, and examples to reflect additive multi-tier
semantics.
Caveat: UnregisterByNodeAtTier is an O(N hashes) full scan under a
write lock. Only invoked from admin paths (clear/detach/reset), so it
should not contend with the report/match hot path; the perf comment
points at a reverse index as the optimization
to add if profiling shows contention at scale.
…tor/umbp-dual-scheme-abc
…efer leased pages on Clear
- master BatchRoutePut returns already_exists for keys in GlobalBlockIndex - peer AllocateSlot adds key and rejects when owned_ already has it - allocator Commit keeps first-writer-wins idempotent as race-window fallback - BatchPut tracks PutEntryOutcome so dedup hits stay out of bandwidth metric
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
IUMBPClientABC andCreateUMBPClient()factory for dual-scheme UMBP architecture (Standalone / Distributed)StandaloneClientextracted from oldUMBPClient, all PoolClient code removedio/andspdk/umbp_corefromumbp_common— standalone builds no longer depend on gRPC/Protobufumbp_store.pyrequires zero modificationsDesign
See
src/umbp/doc/design-umbp-dual-scheme-refactor.mdfor full design document.Why ABC + Factory: Standalone (client-local DRAM+SSD) and Distributed (master-led routing, similar to Mooncake) have fundamentally different control flows — not "base + hooks", but two independent execution paths behind a shared interface.
Factory behavior: currently always returns
StandaloneClient.config.distributedis reserved for futureDistributedClient(seesrc/umbp/doc/handoff-next-steps.md).Test plan
BUILD_UMBP=ON, zero errors)test_umbp_local_client,test_umbp_e2e,test_follower_mode,test_prefix_aware_eviction--hicache-storage-backend umbp