Skip to content

Commit 2a53df4

Browse files
authored
Refactor: unify TaskArgs, bind DistOrchestrator, add orch.alloc (#543)
Unify TaskArgs: - Rename storage template `TaskArgs<...>` -> `TaskArgsTpl<...>` so the unqualified name `TaskArgs` is free for the unified user-facing builder. - Add `using TaskArgs = TaskArgsTpl<ContinuousTensor, uint64_t, 0, 0, TensorArgType>` — vector-backed + per-tensor TensorArgType tags. - Add TaskArgsView, make_view(), task_args_blob_size, write_blob, read_blob, view_to_chip_storage for the dispatch / wire / L2 ABI edge. - nanobind: drop DynamicTaskArgs / TaggedTaskArgs bindings, expose unified TaskArgs; extend TensorArgType with OUTPUT_EXISTING / NO_DEP. Tag-driven submit: - DistOrchestrator exposes submit_next_level / submit_next_level_group / submit_sub / submit_sub_group, each taking a TaskArgs (with tags). - Tags drive dependency inference: INPUT/INOUT -> tensormap.lookup producer; OUTPUT/INOUT/OUTPUT_EXISTING -> tensormap.insert; NO_DEP skip. - Drop `inputs=` / `outputs=` from the submit API; downstream consumers reference output tensors by their own data pointers. - Shrink DistSubmitResult to {slot_id} only. Delete DistInputSpec / DistOutputSpec / DistSubmitOutput from both C++ and Python surfaces. Slot storage and dispatch: - DistTaskSlotState drops `payload` / `args_list<const void*>`; gains worker_type / callable_ptr / callable_id / config (ChipCallConfig) / chip_storage_list<ChipStorageTaskArgs> built by Orchestrator at submit. - DistScheduler::dispatch_ready assembles a per-worker WorkerPayload from slot fields + chip_storage_list[i] and hands it to IWorker::run. - WorkerPayload kept as an internal dispatch carrier (mailbox layout unchanged); not exposed to Python. Worker / Orchestrator separation: - Delete DistWorker::submit / submit_group / scope_begin / scope_end entirely — those concepts belong on Orchestrator. - Add DistWorker::get_orchestrator() accessor; nanobind exposes the C++ DistOrchestrator directly with submit_* (public) and _scope_begin / _scope_end (invoked only by the Python facade). - Python Orchestrator becomes a thin wrapper over the bound C++ DistOrchestrator (no more WorkerPayload construction, no inputs/outputs kwargs). - Python Worker.run() fetches the orchestrator handle once at init and runs scope_begin -> orch_fn -> scope_end -> drain inside one DAG. Orchestrator.alloc for runtime-managed intermediates: - DistOrchestrator::alloc(shape, dtype) -> ContinuousTensor. Mirrors L2's "task slot owns its output buffer" model: alloc creates a synthetic task slot in COMPLETED state that owns an mmap'd buffer; the buffer is munmap'd when the slot reaches CONSUMED (all downstream consumers done + scope ref released). Users tag the returned tensor as OUTPUT / INPUT in TaskArgs to wire deps naturally via the TensorMap — no separate alloc-lifecycle API needed. - mmap(MAP_SHARED|MAP_ANONYMOUS) so forked child workers see the same virtual address. - DistTaskSlotState gains alloc_bufs / alloc_sizes (empty for non-alloc slots). on_consumed munmap's them. Orchestrator consume-lifecycle fixes (required by alloc): - infer_deps now wires fanout on COMPLETED producers (previously skipped): consumer doesn't wait on the producer (live_fanins not bumped) but is added to fanin_producers so its deferred try_consume keeps the producer alive until the consumer finishes. CONSUMED producers are still skipped (resources already freed). - release_ref threshold changed from `>= total` to `>= total + 1` to match try_consume — prevents scope_end from prematurely consuming slots whose downstream consumers haven't finished. Total contributors = 1 (self try_consume or alloc's sim) + N (consumer deferreds) + 1 (scope_end) = total + 1. - on_consumed is idempotent (CAS on state); both release paths can now hit the threshold concurrently without double-freeing alloc buffers. Returns bool (true iff this call performed the transition). - active_tasks_ fetch_sub lives inside orchestrator.on_consumed (gated on CAS win). A notify_consumed callback wired from DistWorker at init signals drain from both scheduler-driven and scope_end-driven paths. Plumbing: - Extract ChipCallConfig to its own header (chip_call_config.h) to break the circular include between dist_types.h and chip_worker.h. - Rename runtime `Arg` base from TaskArgs<...> to TaskArgsTpl<...> in a2a3/aicpu_build_graph, a2a3/tensormap_and_ringbuffer, and a5/tensormap_and_ringbuffer. Tests: - C++ tests/ut/cpp/test_dist_orchestrator.cpp + test_dist_scheduler.cpp rewritten against the new TaskArgs-tag API. - Python ut tests migrated: test_host_worker (TestSubmitResult replaces TestOutputAllocation; new TestOrchAlloc class), test_group_task (synthetic-tensor dep wiring). - L3 ST tests (test_l3_dependency, test_l3_group) build TaskArgs with tags directly; scene_test._build_chip_task_args returns TaskArgs. - test_task_interface.py — TestTaskArgs covers the merged surface. - 105/105 Python ut pass on macOS.
1 parent 554bf89 commit 2a53df4

34 files changed

Lines changed: 1424 additions & 912 deletions

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ export ASCEND_HOME_PATH=/usr/local/Ascend/ascend-toolkit/latest
7474
| [Orchestrator](docs/orchestrator.md) | DAG submission internals: submit flow, TensorMap, Scope, Ring, task state machine |
7575
| [Scheduler](docs/scheduler.md) | DAG dispatch internals: wiring/ready/completion queues, dispatch loop |
7676
| [Worker Manager](docs/worker-manager.md) | Worker pool, WorkerThread, THREAD/PROCESS modes, fork + mailbox mechanics |
77+
| [Roadmap](docs/roadmap.md) | Hierarchical-runtime refactor — what has landed and what is still in flight |
7778
| [Getting Started](docs/getting-started.md) | Setup, prerequisites, build process, configuration |
7879
| [Developer Guide](docs/developer-guide.md) | Directory structure, role ownership, conventions |
7980
| [Testing Guide](docs/testing.md) | CI pipeline, test types, writing new tests |

docs/distributed_level_runtime.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Distributed Level Runtime — Level Model and Component Composition
22

3+
> **Status**: level model + high-level component split are accurate for
4+
> current code. Low-level details (e.g. `IWorker::run` signature,
5+
> `WorkerThread` unified mode) describe the target; see the
6+
> per-component docs for current vs target, or
7+
> [roadmap.md](roadmap.md) for the full landed-vs-planned breakdown.
8+
39
This document covers:
410

511
- The **L0–L6 level model** (what each level represents)

docs/orchestrator.md

Lines changed: 118 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Orchestrator — DAG Submission Internals
22

3+
> **Status**: describes the **target** design. Current code matches the
4+
> user-facing submit API and `alloc` surface; inline "Status:" notes flag
5+
> the few remaining divergences. See [roadmap.md](roadmap.md) for the
6+
> full landed-vs-planned breakdown.
7+
38
The Orchestrator is the **DAG builder**. It runs single-threaded on the user's
49
thread (inside `Worker::run` between `scope_begin` and `drain`) and owns the
510
three data structures that turn a sequence of `submit_*` calls into a scheduled
@@ -18,26 +23,40 @@ The user's orch fn receives an `Orchestrator*` as its first argument:
1823
```cpp
1924
class Orchestrator {
2025
public:
21-
SubmitResult submit_next_level(Callable cb, TaskArgs args, const CallConfig &config);
22-
SubmitResult submit_next_level_group(Callable cb,
23-
std::vector<TaskArgs> args_list,
24-
const CallConfig &config);
25-
SubmitResult submit_sub(Callable cb, TaskArgs args, const CallConfig &config);
26-
27-
private:
28-
friend class Worker;
26+
// --- User-facing submit API (tags inside TaskArgs drive deps) ---
27+
SubmitResult submit_next_level(uint64_t callable,
28+
const TaskArgs &args,
29+
const ChipCallConfig &config);
30+
SubmitResult submit_next_level_group(uint64_t callable,
31+
const std::vector<TaskArgs> &args_list,
32+
const ChipCallConfig &config);
33+
SubmitResult submit_sub(int32_t callable_id, const TaskArgs &args);
34+
SubmitResult submit_sub_group(int32_t callable_id,
35+
const std::vector<TaskArgs> &args_list);
36+
37+
// --- Intermediate-buffer allocation (runtime-owned lifetime) ---
38+
ContinuousTensor alloc(const std::vector<uint32_t> &shape, DataType dtype);
39+
40+
// --- Internal lifecycle (invoked by Worker::run only, bound as _scope_begin
41+
// / _scope_end / _drain in the Python facade) ---
2942
void scope_begin();
3043
void scope_end();
3144
void drain();
32-
// ... components: Ring, TensorMap, Scope, slot pool
45+
46+
private:
47+
// ... components: Ring, TensorMap, Scope, slot pool, active_tasks_ counter
3348
};
3449

35-
struct SubmitResult { TaskSlot slot_id; };
50+
struct SubmitResult { TaskSlot task_slot; }; // field is `task_slot` in current code
3651
```
3752
38-
`scope_begin` / `scope_end` / `drain` are not user-visible — they are invoked
39-
by `Worker::run` around the orch fn. See
40-
[task-flow.md](task-flow.md) §5 for the Worker::run wrapper.
53+
**Status**: `submit_sub` takes only `(callable_id, args)` — no `config`, SUB
54+
has no per-call config. Target design (plan §"Why L2 has no submit") allows
55+
callable IDs that may later unify with ChipCallable pointers; see PR-E.
56+
57+
`scope_begin` / `scope_end` / `drain` are invoked from Python `Worker.run` via
58+
`_scope_begin` / `_scope_end` / `_drain` bindings. They are not part of the
59+
user-facing orch-fn API.
4160
4261
---
4362
@@ -389,6 +408,92 @@ State transitions are driven by atomic CAS operations:
389408
- Orch: FREE → PENDING/READY at submit time
390409
- Scheduler: READY → RUNNING → COMPLETED → CONSUMED during dispatch/completion
391410
411+
### Fanout-release threshold
412+
413+
Both paths that can trigger COMPLETED → CONSUMED (the scheduler's
414+
`try_consume` and the scope-end `release_ref`) use the same threshold:
415+
416+
```cpp
417+
if (fanout_released >= fanout_total + 1 && state == COMPLETED) on_consumed(slot);
418+
```
419+
420+
The `+1` accounts for the slot's own self-release contribution, which normal
421+
tasks emit from `on_task_complete` (`try_consume(slot)` self-call). Alloc
422+
slots (§8b) bypass the scheduler and pre-bump `fanout_released` to `1` at
423+
`alloc()` time to stand in for the self-release. Both paths use `on_consumed`,
424+
which uses a CAS on `state` from `COMPLETED` to `CONSUMED` to remain idempotent
425+
when both fire concurrently at threshold.
426+
427+
---
428+
429+
## 8b. `alloc(shape, dtype)` — runtime-owned intermediate buffers
430+
431+
Mirrors L2's "task slot owns its output buffer" model: `alloc` creates a
432+
synthetic task slot in `COMPLETED` state that owns an mmap'd buffer. The
433+
buffer is freed when the slot reaches `CONSUMED` — i.e. after all downstream
434+
consumers have completed and the scope ref has been released.
435+
436+
```cpp
437+
ContinuousTensor Orchestrator::alloc(const std::vector<uint32_t> &shape, DataType dtype) {
438+
// 1. mmap(MAP_SHARED|MAP_ANONYMOUS) a page-aligned region — visible to
439+
// forked child workers at the same virtual address.
440+
void *buf = mmap(...);
441+
// 2. Claim a task slot.
442+
TaskSlot sid = ring_.alloc();
443+
TaskSlotState &s = slots_[sid];
444+
// 3. Record buffer for on_consumed munmap.
445+
s.alloc_bufs.push_back(buf);
446+
s.alloc_sizes.push_back(mmap_bytes);
447+
// 4. Register as this slot's output so downstream `INPUT`-tagged tensors
448+
// with the same data ptr look up this slot as producer.
449+
tensormap_.insert(reinterpret_cast<uint64_t>(buf), sid);
450+
s.output_keys.push_back(reinterpret_cast<uint64_t>(buf));
451+
// 5. No fanin — alloc has no work to wait on.
452+
s.fanin_count = 0;
453+
// 6. Initial fanout = scope_ref. Consumers that wire on this slot in
454+
// infer_deps bump fanout_total; this slot's CONSUMED transition waits
455+
// for all of them + scope_end.
456+
s.fanout_total = (scope_.depth() > 0) ? 1 : 0;
457+
if (s.fanout_total > 0) scope_.register_task(sid);
458+
// 7. Sim self-consume so the fanout-release threshold math aligns with
459+
// normal slots (see §8 Fanout-release threshold).
460+
s.fanout_released = 1;
461+
// 8. Straight to COMPLETED — no dispatch needed.
462+
s.state = TaskState::COMPLETED;
463+
active_tasks_++;
464+
return ContinuousTensor{buf, shape, dtype};
465+
}
466+
```
467+
468+
On `on_consumed`, in addition to the usual `tensormap.erase_task_outputs` and
469+
`ring.release(sid)`, the slot's `alloc_bufs` are `munmap`'d.
470+
471+
### Consumer interaction
472+
473+
`infer_deps` treats `COMPLETED` producers specially: it still wires the
474+
fanout edge (so the producer waits for the consumer before being consumed and
475+
freeing its buffer) but does not bump `live_fanins` (the consumer is
476+
immediately ready because the producer is already done).
477+
478+
```cpp
479+
if (ps_state == TaskState::CONSUMED) continue; // already gone
480+
ps.fanout_consumers.push_back(slot);
481+
ps.fanout_total++;
482+
s.fanin_producers.push_back(prod);
483+
if (ps_state != TaskState::COMPLETED) live_fanins++; // wait only if not yet done
484+
```
485+
486+
### Status — placeholder vs target (PR-H)
487+
488+
The current implementation uses **per-alloc `mmap`** (one syscall per
489+
`alloc()` invocation). This is a placeholder. The target design (PR-H,
490+
"HeapRing") pre-allocates a single MAP_SHARED region at `Worker::init()`
491+
before any fork, bump-allocates from it, and reclaims via FIFO
492+
`last_alive` tracking — mirroring L2's `PTO2TaskAllocator`. Under the
493+
target design, `OUTPUT`-tagged tensors will be auto-allocated by the
494+
Orchestrator (no explicit `alloc` call), and `OUTPUT_EXISTING` will
495+
preserve the current "user-provided buffer" path.
496+
392497
---
393498
394499
## 9. Invariants

docs/roadmap.md

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Hierarchical Runtime — Roadmap
2+
3+
The six per-component docs (`orchestrator.md`, `scheduler.md`,
4+
`worker-manager.md`, `task-flow.md`, `chip-level-arch.md`,
5+
`distributed_level_runtime.md`) describe the **target** design of the
6+
hierarchical runtime. This page tracks what has already landed vs. what is
7+
still in flight, so readers can tell which bits of the design are running
8+
today and which are planned.
9+
10+
If you only read one file to understand "what will this look like when
11+
it's done", read the per-component doc. If you want to know "what do I
12+
get if I pip install `main` today", this page.
13+
14+
---
15+
16+
## Landed
17+
18+
### Schedule engine shape
19+
20+
- **Component split**`Orchestrator` (DAG builder) / `Scheduler` (DAG
21+
executor) / `WorkerManager` + `WorkerThread` (execution layer) — lives
22+
in `src/common/distributed/`.
23+
- **Level model** — L0–L6 as described in
24+
[distributed_level_runtime.md](distributed_level_runtime.md) §1. L2
25+
(single-chip) and L3 (composite over ChipWorker + SubWorker) are
26+
implemented; L4+ recursion is not (see below).
27+
28+
### User-facing API
29+
30+
- **Unified `TaskArgs`** — vector-backed builder with per-tensor
31+
`TensorArgType` tags (`INPUT` / `OUTPUT` / `INOUT` / `OUTPUT_EXISTING`
32+
/ `NO_DEP`). Replaces separate `TaggedTaskArgs` / `DynamicTaskArgs`.
33+
- **Tag-driven `submit_*` on `Orchestrator`**
34+
`submit_next_level` / `submit_next_level_group` / `submit_sub` /
35+
`submit_sub_group`. No `inputs=`/`outputs=` kwargs; tags inside the
36+
`TaskArgs` drive `tensormap.lookup`/`insert` automatically.
37+
- **`SubmitResult = {slot_id}`** — downstream consumers reference output
38+
tensors by their own data pointers.
39+
- **`Worker` has no `submit`/`scope`/`drain`** — those concepts belong
40+
to `Orchestrator` (accessed via `worker.get_orchestrator()`).
41+
`Orchestrator._scope_begin` / `_scope_end` / `_drain` are invoked by
42+
the Python `Worker.run` facade only.
43+
- **`orch.alloc(shape, dtype)`** — runtime-owned intermediate buffer
44+
backed by `mmap(MAP_SHARED | MAP_ANONYMOUS)`. Lifetime follows a
45+
synthetic task slot so the buffer is freed once all downstream
46+
consumers have completed (see
47+
[orchestrator.md](orchestrator.md) §8b).
48+
49+
### Dispatch internals
50+
51+
- `Scheduler` dispatches via a single ready queue into `WorkerManager`
52+
pools (next-level + sub). Slot stores `chip_storage_list` (one
53+
`ChipStorageTaskArgs` per group worker) that dispatch passes through
54+
a `WorkerPayload` handed to `IWorker::run`.
55+
- `DistChipProcess` / `DistSubWorker` are separate classes today;
56+
unified `WorkerThread` with `THREAD | PROCESS` modes is not yet
57+
implemented.
58+
59+
---
60+
61+
## In flight / not yet landed
62+
63+
### PR-H: HeapRing + `OUTPUT` auto-alloc
64+
65+
- Replace the current per-call `mmap` in `orch.alloc` with a single
66+
pre-allocated `MAP_SHARED` region at `Worker.init()` (default 1 GB),
67+
bump-allocated with FIFO reclamation (mirrors L2's
68+
`PTO2TaskAllocator`).
69+
- `OUTPUT` tag will auto-allocate from the ring;
70+
`OUTPUT_EXISTING` keeps the "user-provided buffer" path.
71+
- Merge slot ring + heap ring into one allocator
72+
(matches L2-consistency audit Strict-2).
73+
- Fork-safety hygiene at `Worker.init()` (`setenv
74+
OMP_NUM_THREADS=1` / `pthread_atfork` on runtime-owned locks).
75+
76+
### PR-C: drop `WorkerPayload`, new `IWorker::run` signature
77+
78+
- `IWorker::run(callable, TaskArgsView, config)` — no `WorkerPayload`
79+
wrapper; mailbox encodes a length-prefixed blob of `callable +
80+
config + args` at dispatch.
81+
- Slot drops `chip_storage_list` and stores the `TaskArgs` itself.
82+
Child assembles `ChipStorageTaskArgs` from the view at the L2 ABI
83+
edge only.
84+
- Strict-1 (per-scope rings, 4 depth) lands here.
85+
86+
### PR-D: WorkerThread unification + per-shape ready queues
87+
88+
- Fold `DistChipProcess` / `DistSubWorker` into `WorkerThread` with
89+
`Mode = THREAD | PROCESS`.
90+
- Strict-4: 3 ready queues (AIC / AIV / MIX) instead of a single queue.
91+
92+
### PR-E: uniform `Worker.run` + callable registry unification
93+
94+
- Python `Worker.run` drops the `if level==2` branch.
95+
- Callable registry moves fully into C++
96+
(`unordered_map<uint64_t, nb::object>` owned by `Worker`) so
97+
`ChipCallable` and Python `sub` callables share one lookup path.
98+
This unblocks L4+ recursion.
99+
100+
### PR-F: C++ `Worker::run(Task)` for L4+ recursion
101+
102+
- C++ `Task { OrchFn orch; TaskArgs task_args; CallConfig config; }`
103+
so a higher-level `Worker` can register a lower-level `Worker` as a
104+
next-level child and dispatch via `IWorker::run`.
105+
106+
### PR-G: drop the `Dist` prefix
107+
108+
- Final rename sweep: `DistOrchestrator``Orchestrator`, files
109+
`dist_*.{h,cpp}``*.{h,cpp}`.
110+
111+
---
112+
113+
## Behavioural notes on the current implementation
114+
115+
- **`DistOrchestrator::release_ref` threshold is `>= total + 1`** (not
116+
`>= total`). This matches `DistScheduler::try_consume` — the
117+
`+1` accounts for the slot's own self-release contribution. Alloc
118+
slots (synthetic, never dispatched) pre-bump `fanout_released` to
119+
`1` in `alloc()` so this threshold math works for them too.
120+
`on_consumed` uses a CAS on state to remain idempotent across the two
121+
call paths (`release_ref` and `try_consume`).
122+
- **scene_test has two helper functions**
123+
`_build_chip_task_args` returns `ChipStorageTaskArgs` (POD, for the
124+
current L2 path: `ChipWorker.run(callable, POD, config)`) and
125+
`_build_l3_task_args` returns a tagged `TaskArgs` (for
126+
`orch.submit_next_level`). PR-C will collapse these into one helper
127+
when `ChipWorker::run` takes a `TaskArgsView`.

docs/scheduler.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Scheduler — DAG Dispatch Internals
22

3+
> **Status**: target design. Current code dispatches via
4+
> `IWorker::run(const WorkerPayload&)` rather than `run(callable, view,
5+
> config)`; per-worker-type ready queue split (Strict-4) is not yet
6+
> implemented. See [roadmap.md](roadmap.md) for the full
7+
> landed-vs-planned breakdown.
8+
39
The Scheduler is the **DAG executor**. A dedicated C++ thread that consumes
410
submitted slots, wires fanout edges, dispatches ready tasks to worker threads,
511
and handles completion callbacks. It is the bridge between the Orchestrator

docs/task-flow.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Task Flow — Callable / TaskArgs / CallConfig Pass-Through
22

3+
> **Status**: describes the **target** design. The unified `TaskArgs` +
4+
> tag-driven submit + Orchestrator-owned drain are landed; the
5+
> `IWorker::run(callable, view, config)` signature and length-prefixed
6+
> mailbox blob are not yet (target landing: PR-C). See
7+
> [roadmap.md](roadmap.md) for the full landed-vs-planned breakdown.
8+
39
This document specifies **what data flows through the hierarchical runtime and
410
what shapes it takes at each stage**. It covers:
511

docs/worker-manager.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Worker Manager — Pool, Threading, and Dispatch Modes
22

3+
> **Status**: describes the **target** design. Current code still has
4+
> separate `DistChipProcess` / `DistSubWorker` classes (target: merged
5+
> into `WorkerThread` in PR-D) and passes `const WorkerPayload&` to
6+
> `IWorker::run` (target: replaced in PR-C). See
7+
> [roadmap.md](roadmap.md) for the full landed-vs-planned breakdown.
8+
39
`WorkerManager` and `WorkerThread` together implement the **execution layer**
410
of a `Worker` engine. `WorkerManager` owns two pools of `WorkerThread`s (one
511
for next-level workers, one for sub workers); each `WorkerThread` owns an

0 commit comments

Comments
 (0)