Refactor: unify TaskArgs and bind DistOrchestrator (drop WorkerPayload from public API)#543
Merged
ChaoWao merged 1 commit intohw-native-sys:mainfrom Apr 14, 2026
Conversation
4b3f83d to
2c75244
Compare
There was a problem hiding this comment.
Code Review
This pull request refactors the distributed runtime by introducing a unified, tag-driven dependency inference system. It replaces specialized task argument types with a single TaskArgs class and shifts task submission logic to the DistOrchestrator, which now automatically manages dependencies based on tensor tags (INPUT, OUTPUT, INOUT, etc.). The DistWorker interface is simplified to provide an orchestrator handle, and internal dispatching is updated to use a streamlined WorkerPayload. Feedback highlights an opportunity to optimize producer deduplication in the orchestrator to avoid linear scans and potential narrowing conversion warnings when calculating fan-in counts.
9b9d6d6 to
0b0346f
Compare
B + A2 step of the hierarchical-runtime plan (.claude/plans/HIERARCHICAL_RUNTIME_REFACTOR.md). Builds on hw-native-sys#536. Unify TaskArgs (B-1): - Rename storage template `TaskArgs<...>` -> `TaskArgsTpl<...>` so the unqualified name `TaskArgs` is free for the unified user-facing builder. - Add `using TaskArgs = TaskArgsTpl<ContinuousTensor, uint64_t, 0, 0, TensorArgType>` — vector-backed + per-tensor TensorArgType tags. - Add TaskArgsView, make_view(), task_args_blob_size, write_blob, read_blob, view_to_chip_storage for the dispatch / wire / L2 ABI edge. - nanobind: drop DynamicTaskArgs / TaggedTaskArgs bindings, expose unified TaskArgs; extend TensorArgType with OUTPUT_EXISTING / NO_DEP. Tag-driven submit (B): - DistOrchestrator exposes submit_next_level / submit_next_level_group / submit_sub / submit_sub_group, each taking a TaskArgs (with tags). - Tags drive dependency inference: INPUT/INOUT -> tensormap.lookup producer; OUTPUT/INOUT/OUTPUT_EXISTING -> tensormap.insert; NO_DEP skip. - Drop `inputs=` / `outputs=` from the submit API; downstream consumers reference output tensors by their own data pointers. - Shrink DistSubmitResult to {slot_id} only. Delete DistInputSpec / DistOutputSpec / DistSubmitOutput from both C++ and Python surfaces. Slot storage and dispatch (B): - DistTaskSlotState drops `payload` / `args_list<const void*>`; gains worker_type / callable_ptr / callable_id / config (ChipCallConfig) / chip_storage_list<ChipStorageTaskArgs> built by Orchestrator at submit. - DistScheduler::dispatch_ready assembles a per-worker WorkerPayload from slot fields + chip_storage_list[i] and hands it to IWorker::run. - WorkerPayload kept as an internal dispatch carrier (mailbox layout unchanged); not exposed to Python. Worker / Orchestrator separation (A2): - Delete DistWorker::submit / submit_group / scope_begin / scope_end entirely — those concepts belong on Orchestrator. - Add DistWorker::get_orchestrator() accessor; nanobind exposes the C++ DistOrchestrator directly with submit_* (public) and _scope_begin / _scope_end (invoked only by the Python facade). - Python Orchestrator becomes a thin wrapper over the bound C++ DistOrchestrator (no more WorkerPayload construction, no inputs/outputs kwargs). - Python Worker.run() fetches the orchestrator handle once at init and runs scope_begin -> orch_fn -> scope_end -> drain inside one DAG. Orchestrator.alloc for runtime-managed intermediates: - DistOrchestrator::alloc(shape, dtype) -> ContinuousTensor. Mirrors L2's "task slot owns its output buffer" model: alloc creates a synthetic task slot in COMPLETED state that owns an mmap'd buffer; the buffer is munmap'd when the slot reaches CONSUMED (all downstream consumers done + scope ref released). Users tag the returned tensor as OUTPUT / INPUT in TaskArgs to wire deps naturally via the TensorMap — no separate alloc-lifecycle API needed. - mmap(MAP_SHARED|MAP_ANONYMOUS) so forked child workers see the same virtual address. - DistTaskSlotState gains alloc_bufs / alloc_sizes (empty for non-alloc slots). on_consumed munmap's them. Orchestrator consume-lifecycle fixes (required by alloc): - infer_deps now wires fanout on COMPLETED producers (previously skipped): consumer doesn't wait on the producer (live_fanins not bumped) but is added to fanin_producers so its deferred try_consume keeps the producer alive until the consumer finishes. CONSUMED producers are still skipped (resources already freed). - release_ref threshold changed from `>= total` to `>= total + 1` to match try_consume — prevents scope_end from prematurely consuming slots whose downstream consumers haven't finished. Total contributors = 1 (self try_consume or alloc's sim) + N (consumer deferreds) + 1 (scope_end) = total + 1. - on_consumed is idempotent (CAS on state); both release paths can now hit the threshold concurrently without double-freeing alloc buffers. Returns bool (true iff this call performed the transition). - active_tasks_ fetch_sub lives inside orchestrator.on_consumed (gated on CAS win). A notify_consumed callback wired from DistWorker at init signals drain from both scheduler-driven and scope_end-driven paths. Plumbing: - Extract ChipCallConfig to its own header (chip_call_config.h) to break the circular include between dist_types.h and chip_worker.h. - Rename runtime `Arg` base from TaskArgs<...> to TaskArgsTpl<...> in a2a3/aicpu_build_graph, a2a3/tensormap_and_ringbuffer, and a5/tensormap_and_ringbuffer. Tests: - C++ tests/ut/cpp/test_dist_orchestrator.cpp + test_dist_scheduler.cpp rewritten against the new TaskArgs-tag API. - Python ut tests migrated: test_host_worker (TestSubmitResult replaces TestOutputAllocation; new TestOrchAlloc class), test_group_task (synthetic-tensor dep wiring). - L3 ST tests (test_l3_dependency, test_l3_group) build TaskArgs with tags directly; scene_test._build_chip_task_args returns TaskArgs. - test_task_interface.py — TestTaskArgs covers the merged surface. - 105/105 Python ut pass on macOS.
0b0346f to
547a0df
Compare
ChaoWao
added a commit
to ChaoWao/simpler-fork
that referenced
this pull request
Apr 15, 2026
Replace the per-call mmap placeholder in DistOrchestrator::alloc with a pre-allocated MAP_SHARED|MAP_ANONYMOUS heap region taken in DistWorker's ctor, and merge the slot ring with the heap region into a single DistRing (mirrors L2's PTO2TaskAllocator, Strict-2). Matches the plan's PR-H line and supersedes the hw-native-sys#543 alloc placeholder. Key behaviour changes: - DistRing now owns both the task-slot window (legacy DIST_TASK_WINDOW_SIZE = 128, tracked for PR-I removal) and a heap region whose size is a Worker ctor parameter (heap_ring_size, default 1 GiB). alloc(bytes) atomically returns {slot, heap_ptr, heap_end_offset} under one mutex; release(slot) advances a FIFO last_alive_ pointer, which in turn advances heap_tail_. No partial- failure rollback path. - 1024 B alignment (DIST_HEAP_ALIGN, matches PTO2_PACKED_OUTPUT_ALIGN, Strict-3) on every heap slab. - Heap mmap happens in the DistWorker ctor, before the Python Worker forks chip/sub-worker children, so children inherit the region at the same virtual address. The Python Worker now constructs DistWorker in _init_level3 (pre-fork) and wires the worker threads in _start_level3 (post-fork). - OUTPUT with data==0 is auto-allocated from the HeapRing inside submit_impl (all OUTPUTs for one submit share a single allocator call). data!=0 OUTPUT and OUTPUT_EXISTING stay pure inserts; only INPUT / INOUT lookup in infer_deps -- matches L2 pto_orchestrator.cpp step B. WaW dependencies (e.g. writing into an orch.alloc() buffer) are expressed with INOUT. - Fork hygiene in the DistWorker ctor: setenv (overwrite=0) of OMP / OPENBLAS / MKL / BLIS thread-count knobs, KMP_DUPLICATE_LIB_OK=TRUE on macOS, and a single pthread_atfork handler registered once per process as a landing pad for Worker-owned locks. - Back-pressure: alloc() spin-waits on a cv; after DIST_ALLOC_TIMEOUT_MS (default 10 s) it throws std::runtime_error so an exhausted heap or full slot window surfaces as a Python exception instead of silently deadlocking. - DistTaskSlotState::alloc_bufs / alloc_sizes fields removed -- heap reclamation is implicit via last_alive_, no per-slot munmap runs. Tests: - cpput: test_dist_ring covers FIFO reclamation, heap wrap-around, timeout, shutdown, alignment. test_dist_orchestrator adds OutputAutoAllocsFromHeapRing, InoutWiresCreatorAsFanin, and OutputAndOutputExistingAreInsertOnly. - pyut: test_alloc_dep_wires_via_tensormap migrated to INOUT so the alloc-slot is pinned as a WaW producer (plain OUTPUT would be a pure overwrite and UAF under the real heap ring). Docs: - docs/orchestrator.md section 5 rewritten as the unified allocator, section 8b rewritten for HeapRing + tag semantics table, new section 8c for fork hygiene. Slot-window transitional note points to the follow-up. - docs/roadmap.md moves the PR-H items to Landed.
3 tasks
ChaoWao
added a commit
to ChaoWao/simpler-fork
that referenced
this pull request
Apr 15, 2026
Replace the per-call mmap placeholder in DistOrchestrator::alloc with a pre-allocated MAP_SHARED|MAP_ANONYMOUS heap region taken in DistWorker's ctor, and merge the slot ring with the heap region into a single DistRing (mirrors L2's PTO2TaskAllocator, Strict-2). Matches the plan's PR-H line and supersedes the hw-native-sys#543 alloc placeholder. Key behaviour changes: - DistRing now owns both the task-slot window (legacy DIST_TASK_WINDOW_SIZE = 128, tracked for PR-I removal) and a heap region whose size is a Worker ctor parameter (heap_ring_size, default 1 GiB). alloc(bytes) atomically returns {slot, heap_ptr, heap_end_offset} under one mutex; release(slot) advances a FIFO last_alive_ pointer, which in turn advances heap_tail_. No partial- failure rollback path. - 1024 B alignment (DIST_HEAP_ALIGN, matches PTO2_PACKED_OUTPUT_ALIGN, Strict-3) on every heap slab. - Heap mmap happens in the DistWorker ctor, before the Python Worker forks chip/sub-worker children, so children inherit the region at the same virtual address. The Python Worker now constructs DistWorker in _init_level3 (pre-fork) and wires the worker threads in _start_level3 (post-fork). - OUTPUT with data==0 is auto-allocated from the HeapRing inside submit_impl (all OUTPUTs for one submit share a single allocator call). data!=0 OUTPUT and OUTPUT_EXISTING stay pure inserts; only INPUT / INOUT lookup in infer_deps -- matches L2 pto_orchestrator.cpp step B. WaW dependencies (e.g. writing into an orch.alloc() buffer) are expressed with INOUT. - Fork hygiene in the DistWorker ctor: setenv (overwrite=0) of OMP / OPENBLAS / MKL / BLIS thread-count knobs, KMP_DUPLICATE_LIB_OK=TRUE on macOS, and a single pthread_atfork handler registered once per process as a landing pad for Worker-owned locks. - Back-pressure: alloc() spin-waits on a cv; after DIST_ALLOC_TIMEOUT_MS (default 10 s) it throws std::runtime_error so an exhausted heap or full slot window surfaces as a Python exception instead of silently deadlocking. - DistTaskSlotState::alloc_bufs / alloc_sizes fields removed -- heap reclamation is implicit via last_alive_, no per-slot munmap runs. Tests: - cpput: test_dist_ring covers FIFO reclamation, heap wrap-around, timeout, shutdown, alignment. test_dist_orchestrator adds OutputAutoAllocsFromHeapRing, InoutWiresCreatorAsFanin, and OutputAndOutputExistingAreInsertOnly. - pyut: test_alloc_dep_wires_via_tensormap migrated to INOUT so the alloc-slot is pinned as a WaW producer (plain OUTPUT would be a pure overwrite and UAF under the real heap ring). Docs: - docs/orchestrator.md section 5 rewritten as the unified allocator, section 8b rewritten for HeapRing + tag semantics table, new section 8c for fork hygiene. Slot-window transitional note points to the follow-up. - docs/roadmap.md moves the PR-H items to Landed.
ChaoWao
added a commit
that referenced
this pull request
Apr 15, 2026
…ne (#560) Replace the per-call mmap placeholder in DistOrchestrator::alloc with a pre-allocated MAP_SHARED|MAP_ANONYMOUS heap region taken in DistWorker's ctor, and merge the slot ring with the heap region into a single DistRing (mirrors L2's PTO2TaskAllocator, Strict-2). Matches the plan's PR-H line and supersedes the #543 alloc placeholder. Key behaviour changes: - DistRing now owns both the task-slot window (legacy DIST_TASK_WINDOW_SIZE = 128, tracked for PR-I removal) and a heap region whose size is a Worker ctor parameter (heap_ring_size, default 1 GiB). alloc(bytes) atomically returns {slot, heap_ptr, heap_end_offset} under one mutex; release(slot) advances a FIFO last_alive_ pointer, which in turn advances heap_tail_. No partial- failure rollback path. - 1024 B alignment (DIST_HEAP_ALIGN, matches PTO2_PACKED_OUTPUT_ALIGN, Strict-3) on every heap slab. - Heap mmap happens in the DistWorker ctor, before the Python Worker forks chip/sub-worker children, so children inherit the region at the same virtual address. The Python Worker now constructs DistWorker in _init_level3 (pre-fork) and wires the worker threads in _start_level3 (post-fork). - OUTPUT with data==0 is auto-allocated from the HeapRing inside submit_impl (all OUTPUTs for one submit share a single allocator call). data!=0 OUTPUT and OUTPUT_EXISTING stay pure inserts; only INPUT / INOUT lookup in infer_deps -- matches L2 pto_orchestrator.cpp step B. WaW dependencies (e.g. writing into an orch.alloc() buffer) are expressed with INOUT. - Fork hygiene in the DistWorker ctor: setenv (overwrite=0) of OMP / OPENBLAS / MKL / BLIS thread-count knobs, KMP_DUPLICATE_LIB_OK=TRUE on macOS, and a single pthread_atfork handler registered once per process as a landing pad for Worker-owned locks. - Back-pressure: alloc() spin-waits on a cv; after DIST_ALLOC_TIMEOUT_MS (default 10 s) it throws std::runtime_error so an exhausted heap or full slot window surfaces as a Python exception instead of silently deadlocking. - DistTaskSlotState::alloc_bufs / alloc_sizes fields removed -- heap reclamation is implicit via last_alive_, no per-slot munmap runs. Tests: - cpput: test_dist_ring covers FIFO reclamation, heap wrap-around, timeout, shutdown, alignment. test_dist_orchestrator adds OutputAutoAllocsFromHeapRing, InoutWiresCreatorAsFanin, and OutputAndOutputExistingAreInsertOnly. - pyut: test_alloc_dep_wires_via_tensormap migrated to INOUT so the alloc-slot is pinned as a WaW producer (plain OUTPUT would be a pure overwrite and UAF under the real heap ring). Docs: - docs/orchestrator.md section 5 rewritten as the unified allocator, section 8b rewritten for HeapRing + tag semantics table, new section 8c for fork hygiene. Slot-window transitional note points to the follow-up. - docs/roadmap.md moves the PR-H items to Landed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This is the B + A2 step of the hierarchical-runtime plan. Builds on #536.
B — Unified TaskArgs + tag-driven submit
DynamicTaskArgs+TaggedTaskArgsinto oneTaskArgs(vector-backed + per-tensorTensorArgTypetags). The storage template is renamedTaskArgs<...>→TaskArgsTpl<...>so the unqualified name is free for the unified user-facing builder.submit_next_level(callable, TaskArgs, config)/submit_next_level_group(...)/submit_sub(callable_id, TaskArgs)/submit_sub_group(...). Tags on the TaskArgs drive dependency inference (INPUT/INOUT → tensormap.lookup; OUTPUT/INOUT/OUTPUT_EXISTING → insert; NO_DEP → skip).inputs=/outputs=kwargs from submit; downstream consumers reference output tensors by their own data pointers.DistSubmitResultto{slot_id}only. DeleteDistInputSpec/DistOutputSpec/DistSubmitOutput.TaskArgsView,make_view(),task_args_blob_size(),write_blob(),read_blob(),view_to_chip_storage()for the dispatch / wire / L2 ABI edge.A2 — Worker / Orchestrator separation
DistWorker::submit/submit_group/scope_begin/scope_endentirely (those concepts belong to Orchestrator).DistWorker::get_orchestrator()accessor; nanobind exposesDistOrchestratordirectly withsubmit_*(public) and_scope_begin/_scope_end(invoked only from the Python facade /Worker::run).Orchestratorbecomes a thin wrapper over the bound C++DistOrchestrator, no moreWorkerPayloadconstruction orinputs=/outputs=translation.Worker.run()fetches the orchestrator handle once at init and runs_scope_begin → orch_fn → _scope_end → draininside one DAG execution.Slot storage / dispatch
DistTaskSlotStatedropspayload/args_list<const void*>; gainsworker_type/callable_ptr/callable_id/config (ChipCallConfig)/chip_storage_list<ChipStorageTaskArgs>(built by Orchestrator at submit time viaview_to_chip_storage).DistScheduler::dispatch_readyassembles a per-workerWorkerPayloadfrom the slot fields +chip_storage_list[i]and hands it toIWorker::run.WorkerPayloadis kept as an internal dispatch carrier; it is no longer part of the user-facing surface, and the mailbox layout inDistChipProcess/DistSubWorkeris unchanged.DistOrchestratorviainit()so thedrain()bookkeeping is automatic.Plumbing
ChipCallConfigtosrc/common/task_interface/chip_call_config.hto break the circular include betweendist_types.handchip_worker.h.Argbase fromTaskArgs<...>toTaskArgsTpl<...>in a2a3/aicpu_build_graph, a2a3/tensormap_and_ringbuffer, and a5/tensormap_and_ringbuffer.Test plan
pytest tests/ut/py/test_dist_worker tests/ut/py/test_task_interface.py— 101/101 passed on macOSruff check/clang-format/pyright— cleaninfer_depsindist_orchestrator.cpp) and the slot-storedchip_storage_listlifetime