From 3c47b50a3593394c0b6ab3b31309acd41c8aa5f6 Mon Sep 17 00:00:00 2001 From: wcwxy <26245345+ChaoWao@users.noreply.github.com> Date: Fri, 10 Apr 2026 19:25:59 +0800 Subject: [PATCH] =?UTF-8?q?Refactor:=20WorkerType=20CHIP=E2=86=92NEXT=5FLE?= =?UTF-8?q?VEL,=20consolidate=20add=5Fworker=20API,=20level=20hygiene?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename WorkerType enum: CHIP→NEXT_LEVEL, remove DIST (only two kinds of sub-worker: NEXT_LEVEL and SUB) - Rename scheduler pools: chip_workers→next_level_workers, chip_threads_→next_level_threads_ - Simplify DistWorker::add_worker — clean NEXT_LEVEL/SUB binary split instead of the old CHIP+DIST grouping hack - Consolidate nanobind API: replace add_chip_worker/add_chip_worker_native/ add_chip_process with overloaded add_next_level_worker - Worker.register() now raises RuntimeError at L2 (was silently useless) - Extract L2 WorkerPayload unpacking into _run_l2_from_payload() helper Part of hierarchical runtime refactor (Steps 2-4). Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/distributed_level_runtime.md | 4 +- python/bindings/dist_worker_bind.h | 23 ++++---- python/simpler/worker.py | 34 ++++++----- src/common/distributed/dist_scheduler.cpp | 12 ++-- src/common/distributed/dist_scheduler.h | 6 +- src/common/distributed/dist_types.h | 7 +-- src/common/distributed/dist_worker.cpp | 4 +- src/common/distributed/dist_worker.h | 6 +- .../test_l3_dependency.py | 4 +- .../tensormap_and_ringbuffer/test_l3_group.py | 4 +- tests/ut/cpp/test_dist_orchestrator.cpp | 23 ++++---- tests/ut/cpp/test_dist_scheduler.cpp | 57 ++++++++++--------- .../py/test_dist_worker/test_host_worker.py | 2 +- 13 files changed, 95 insertions(+), 91 deletions(-) diff --git a/docs/distributed_level_runtime.md b/docs/distributed_level_runtime.md index 8a73dca7c..cff122582 100644 --- a/docs/distributed_level_runtime.md +++ b/docs/distributed_level_runtime.md @@ -228,7 +228,7 @@ def my_orch(w, args): payload.callable = chip_callable.buffer_ptr() payload.args = task_args.__ptr__() payload.block_dim = 24 - r = w.submit(WorkerType.CHIP, payload, outputs=[64]) + r = w.submit(WorkerType.NEXT_LEVEL, payload, outputs=[64]) # SubWorker task: runs Python callable, depends on chip output sub_p = WorkerPayload() @@ -254,7 +254,7 @@ def my_orch(w, args): args_list.append(a.__ptr__()) # 1 DAG node, 4 chips execute in parallel - w.submit(WorkerType.CHIP, payload, args_list=args_list, outputs=[out_size]) + w.submit(WorkerType.NEXT_LEVEL, payload, args_list=args_list, outputs=[out_size]) ``` ### Why It's Uniform diff --git a/python/bindings/dist_worker_bind.h b/python/bindings/dist_worker_bind.h index 0bf961343..667cfa496 100644 --- a/python/bindings/dist_worker_bind.h +++ b/python/bindings/dist_worker_bind.h @@ -36,10 +36,7 @@ namespace nb = nanobind; inline void bind_dist_worker(nb::module_ &m) { // --- WorkerType --- - nb::enum_(m, "WorkerType") - .value("CHIP", WorkerType::CHIP) - .value("SUB", WorkerType::SUB) - .value("DIST", WorkerType::DIST); + nb::enum_(m, "WorkerType").value("NEXT_LEVEL", WorkerType::NEXT_LEVEL).value("SUB", WorkerType::SUB); // --- TaskState --- nb::enum_(m, "TaskState") @@ -167,27 +164,27 @@ inline void bind_dist_worker(nb::module_ &m) { ) .def( - "add_chip_worker", + "add_next_level_worker", [](DistWorker &self, DistWorker &w) { - self.add_worker(WorkerType::CHIP, &w); + self.add_worker(WorkerType::NEXT_LEVEL, &w); }, - nb::arg("worker"), "Add a lower-level DistWorker as a CHIP sub-worker (for L4+)." + nb::arg("worker"), "Add a lower-level DistWorker as a NEXT_LEVEL sub-worker." ) .def( - "add_chip_worker_native", + "add_next_level_worker", [](DistWorker &self, ChipWorker &w) { - self.add_worker(WorkerType::CHIP, &w); + self.add_worker(WorkerType::NEXT_LEVEL, &w); }, - nb::arg("worker"), "Add a ChipWorker (_ChipWorker) as a CHIP sub-worker (for L3)." + nb::arg("worker"), "Add a ChipWorker as a NEXT_LEVEL sub-worker." ) .def( - "add_chip_process", + "add_next_level_worker", [](DistWorker &self, DistChipProcess &w) { - self.add_worker(WorkerType::CHIP, &w); + self.add_worker(WorkerType::NEXT_LEVEL, &w); }, - nb::arg("worker"), "Add a forked ChipProcess as a CHIP sub-worker (process-isolated)." + nb::arg("worker"), "Add a forked process as a NEXT_LEVEL sub-worker." ) .def( diff --git a/python/simpler/worker.py b/python/simpler/worker.py index d6c69b7e1..3e8d2e9aa 100644 --- a/python/simpler/worker.py +++ b/python/simpler/worker.py @@ -23,7 +23,7 @@ w.init() def my_orch(w, args): - r = w.submit(WorkerType.CHIP, chip_payload, inputs=[...], outputs=[64]) + r = w.submit(WorkerType.NEXT_LEVEL, chip_payload, inputs=[...], outputs=[64]) w.submit(WorkerType.SUB, sub_payload(cid), inputs=[r.outputs[0].ptr]) w.run(Task(orch=my_orch, args=my_args)) @@ -235,6 +235,8 @@ def __init__(self, level: int, **config) -> None: def register(self, fn: Callable) -> int: """Register a callable for SubWorker use. Must be called before init().""" + if self.level < 3: + raise RuntimeError("Worker.register() is only available at level 3+") if self._initialized: raise RuntimeError("Worker.register() must be called before init()") cid = len(self._callable_registry) @@ -365,7 +367,7 @@ def _start_level3(self) -> None: for shm in self._chip_shms: cp = DistChipProcess(_mailbox_addr(shm), self._l3_args_size) self._dist_chip_procs.append(cp) - dw.add_chip_process(cp) + dw.add_next_level_worker(cp) for shm in self._shms: sw = DistSubWorker(_mailbox_addr(shm)) @@ -391,19 +393,8 @@ def run(self, task_or_payload, args=None, **kwargs) -> None: if self.level == 2: assert self._chip_worker is not None if isinstance(task_or_payload, WorkerPayload): - from .task_interface import ChipCallConfig # noqa: PLC0415 - - config = ChipCallConfig() - config.block_dim = task_or_payload.block_dim - config.aicpu_thread_num = task_or_payload.aicpu_thread_num - config.enable_profiling = task_or_payload.enable_profiling - self._chip_worker.run( - task_or_payload.callable, # type: ignore[arg-type] - task_or_payload.args, - config, - ) + self._run_l2_from_payload(task_or_payload) else: - # run(callable, args, **kwargs) self._chip_worker.run(task_or_payload, args, **kwargs) else: self._start_level3() @@ -412,6 +403,21 @@ def run(self, task_or_payload, args=None, **kwargs) -> None: task.orch(self, task.args) self._dist_worker.drain() + def _run_l2_from_payload(self, payload: WorkerPayload) -> None: + """Unpack a WorkerPayload and forward to ChipWorker (L2 only).""" + from .task_interface import ChipCallConfig # noqa: PLC0415 + + assert self._chip_worker is not None + config = ChipCallConfig() + config.block_dim = payload.block_dim + config.aicpu_thread_num = payload.aicpu_thread_num + config.enable_profiling = payload.enable_profiling + self._chip_worker.run( + payload.callable, # type: ignore[arg-type] + payload.args, + config, + ) + # ------------------------------------------------------------------ # Orchestration API (called from inside orch functions at L3+) # ------------------------------------------------------------------ diff --git a/src/common/distributed/dist_scheduler.cpp b/src/common/distributed/dist_scheduler.cpp index 00aec76d2..9908c49e2 100644 --- a/src/common/distributed/dist_scheduler.cpp +++ b/src/common/distributed/dist_scheduler.cpp @@ -80,7 +80,7 @@ void DistScheduler::start(const Config &cfg) { threads.push_back(std::move(wt)); } }; - make_threads(cfg_.chip_workers, chip_threads_); + make_threads(cfg_.next_level_workers, next_level_threads_); make_threads(cfg_.sub_workers, sub_threads_); stop_requested_.store(false, std::memory_order_relaxed); @@ -95,11 +95,11 @@ void DistScheduler::stop() { if (sched_thread_.joinable()) sched_thread_.join(); - for (auto &wt : chip_threads_) + for (auto &wt : next_level_threads_) wt->stop(); for (auto &wt : sub_threads_) wt->stop(); - chip_threads_.clear(); + next_level_threads_.clear(); sub_threads_.clear(); running_.store(false, std::memory_order_release); @@ -157,7 +157,7 @@ void DistScheduler::run() { // Exit when stop requested and all workers idle if (stop_requested_.load(std::memory_order_acquire)) { bool any_busy = false; - for (auto &wt : chip_threads_) + for (auto &wt : next_level_threads_) if (!wt->idle()) { any_busy = true; break; @@ -268,7 +268,7 @@ void DistScheduler::dispatch_ready() { } WorkerThread *DistScheduler::pick_idle(WorkerType type) { - auto &threads = (type == WorkerType::CHIP) ? chip_threads_ : sub_threads_; + auto &threads = (type == WorkerType::NEXT_LEVEL) ? next_level_threads_ : sub_threads_; for (auto &wt : threads) { if (wt->idle()) return wt.get(); } @@ -276,7 +276,7 @@ WorkerThread *DistScheduler::pick_idle(WorkerType type) { } std::vector DistScheduler::pick_n_idle(WorkerType type, int n) { - auto &threads = (type == WorkerType::CHIP) ? chip_threads_ : sub_threads_; + auto &threads = (type == WorkerType::NEXT_LEVEL) ? next_level_threads_ : sub_threads_; std::vector result; result.reserve(n); for (auto &wt : threads) { diff --git a/src/common/distributed/dist_scheduler.h b/src/common/distributed/dist_scheduler.h index ebe396448..0374e539b 100644 --- a/src/common/distributed/dist_scheduler.h +++ b/src/common/distributed/dist_scheduler.h @@ -89,8 +89,8 @@ class DistScheduler { DistTaskSlotState *slots; int32_t num_slots; DistReadyQueue *ready_queue; - std::vector chip_workers; // WorkerType::CHIP - std::vector sub_workers; // WorkerType::SUB + std::vector next_level_workers; // WorkerType::NEXT_LEVEL + std::vector sub_workers; // WorkerType::SUB // Called when a task reaches CONSUMED (TensorMap cleanup + ring release). std::function on_consumed_cb; }; @@ -104,7 +104,7 @@ class DistScheduler { Config cfg_; // Per-worker threads - std::vector> chip_threads_; + std::vector> next_level_threads_; std::vector> sub_threads_; // Shared completion queue (WorkerThread → Scheduler) diff --git a/src/common/distributed/dist_types.h b/src/common/distributed/dist_types.h index a8ea03675..b3a9695ee 100644 --- a/src/common/distributed/dist_types.h +++ b/src/common/distributed/dist_types.h @@ -51,9 +51,8 @@ using DistTaskSlot = int32_t; // ============================================================================= enum class WorkerType : int32_t { - CHIP = 0, // ChipWorker: L2 hardware device - SUB = 1, // SubWorker: fork/shm Python function - DIST = 2, // DistWorker: lower-level node (L4+) + NEXT_LEVEL = 0, // Next-level Worker (L3→ChipWorker, L4→DistWorker(L3), …) + SUB = 1, // SubWorker: fork/shm Python function }; // ============================================================================= @@ -75,7 +74,7 @@ enum class TaskState : int32_t { struct WorkerPayload { DistTaskSlot task_slot = DIST_INVALID_SLOT; - WorkerType worker_type = WorkerType::CHIP; + WorkerType worker_type = WorkerType::NEXT_LEVEL; // --- ChipWorker fields (set in PR 2-2) --- const void *callable = nullptr; // ChipCallable buffer ptr diff --git a/src/common/distributed/dist_worker.cpp b/src/common/distributed/dist_worker.cpp index 4995c7dd4..fc66aa81d 100644 --- a/src/common/distributed/dist_worker.cpp +++ b/src/common/distributed/dist_worker.cpp @@ -24,7 +24,7 @@ DistWorker::~DistWorker() { void DistWorker::add_worker(WorkerType type, IWorker *worker) { if (initialized_) throw std::runtime_error("DistWorker: add_worker after init"); - if (type == WorkerType::CHIP || type == WorkerType::DIST) chip_workers_.push_back(worker); + if (type == WorkerType::NEXT_LEVEL) next_level_workers_.push_back(worker); else sub_workers_.push_back(worker); } @@ -38,7 +38,7 @@ void DistWorker::init() { cfg.slots = slots_.get(); cfg.num_slots = DIST_TASK_WINDOW_SIZE; cfg.ready_queue = &ready_queue_; - cfg.chip_workers = chip_workers_; + cfg.next_level_workers = next_level_workers_; cfg.sub_workers = sub_workers_; cfg.on_consumed_cb = [this](DistTaskSlot slot) { on_consumed(slot); diff --git a/src/common/distributed/dist_worker.h b/src/common/distributed/dist_worker.h index e6a321964..32530d22d 100644 --- a/src/common/distributed/dist_worker.h +++ b/src/common/distributed/dist_worker.h @@ -20,7 +20,7 @@ * Usage (L3 host worker, instantiated from Python via nanobind): * * DistWorker dw(level=3); - * dw.add_worker(WorkerType::CHIP, chip_worker_ptr); + * dw.add_worker(WorkerType::NEXT_LEVEL, chip_worker_ptr); * dw.add_worker(WorkerType::SUB, sub_worker_ptr); * dw.init(); * @@ -32,7 +32,7 @@ * dw.execute(); // blocks until all submitted tasks complete * * // When used as an IWorker by a higher-level DistWorker (L4+): - * parent.add_worker(WorkerType::DIST, &dw); + * parent.add_worker(WorkerType::NEXT_LEVEL, &dw); * // parent scheduler calls dw.dispatch() / dw.poll() */ @@ -107,7 +107,7 @@ class DistWorker : public IWorker { DistOrchestrator orchestrator_; DistScheduler scheduler_; - std::vector chip_workers_; + std::vector next_level_workers_; std::vector sub_workers_; // --- Drain support --- diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_dependency.py b/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_dependency.py index e9ae45d4b..cace2ef64 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_dependency.py +++ b/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_dependency.py @@ -34,12 +34,12 @@ def run_dag(w, callables, task_args, config): callables.keep(chip_args) # prevent GC before drain chip_p = WorkerPayload() - chip_p.worker_type = WorkerType.CHIP + chip_p.worker_type = WorkerType.NEXT_LEVEL chip_p.callable = callables.vector_kernel.buffer_ptr() chip_p.args = chip_args.__ptr__() chip_p.block_dim = config.block_dim chip_p.aicpu_thread_num = config.aicpu_thread_num - chip_result = w.submit(WorkerType.CHIP, chip_p, inputs=[], outputs=[task_args.f.numel() * 4]) + chip_result = w.submit(WorkerType.NEXT_LEVEL, chip_p, inputs=[], outputs=[task_args.f.numel() * 4]) sub_p = WorkerPayload() sub_p.worker_type = WorkerType.SUB diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_group.py b/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_group.py index dfe919256..84ecd2cc5 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_group.py +++ b/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_group.py @@ -42,11 +42,11 @@ def run_dag(w, callables, task_args, config): callables.keep(args0, args1) # prevent GC before drain chip_p = WorkerPayload() - chip_p.worker_type = WorkerType.CHIP + chip_p.worker_type = WorkerType.NEXT_LEVEL chip_p.callable = callables.vector_kernel.buffer_ptr() chip_p.block_dim = config.block_dim chip_p.aicpu_thread_num = config.aicpu_thread_num - group_result = w.submit(WorkerType.CHIP, chip_p, args_list=[args0.__ptr__(), args1.__ptr__()], outputs=[4]) + group_result = w.submit(WorkerType.NEXT_LEVEL, chip_p, args_list=[args0.__ptr__(), args1.__ptr__()], outputs=[4]) sub_p = WorkerPayload() sub_p.worker_type = WorkerType.SUB diff --git a/tests/ut/cpp/test_dist_orchestrator.cpp b/tests/ut/cpp/test_dist_orchestrator.cpp index 59066a67a..bd8456be1 100644 --- a/tests/ut/cpp/test_dist_orchestrator.cpp +++ b/tests/ut/cpp/test_dist_orchestrator.cpp @@ -39,11 +39,12 @@ struct OrchestratorFixture : public ::testing::Test { void TearDown() override { ring.shutdown(); } - // Submit a CHIP task with the given input/output specs. - DistSubmitResult submit_chip(const std::vector &inputs, const std::vector &outputs) { + // Submit a NEXT_LEVEL task with the given input/output specs. + DistSubmitResult + submit_next_level(const std::vector &inputs, const std::vector &outputs) { WorkerPayload p; - p.worker_type = WorkerType::CHIP; - return orch.submit(WorkerType::CHIP, p, inputs, outputs); + p.worker_type = WorkerType::NEXT_LEVEL; + return orch.submit(WorkerType::NEXT_LEVEL, p, inputs, outputs); } }; @@ -52,7 +53,7 @@ struct OrchestratorFixture : public ::testing::Test { // --------------------------------------------------------------------------- TEST_F(OrchestratorFixture, IndependentTaskIsImmediatelyReady) { - auto res = submit_chip({}, {{64}}); + auto res = submit_next_level({}, {{64}}); EXPECT_NE(res.task_slot, DIST_INVALID_SLOT); ASSERT_EQ(res.outputs.size(), 1u); EXPECT_NE(res.outputs[0].ptr, nullptr); @@ -65,14 +66,14 @@ TEST_F(OrchestratorFixture, IndependentTaskIsImmediatelyReady) { TEST_F(OrchestratorFixture, DependentTaskIsPending) { // Task A produces a buffer - auto a = submit_chip({}, {{128}}); + auto a = submit_next_level({}, {{128}}); DistTaskSlot a_slot; rq.try_pop(a_slot); // drain ready queue uint64_t a_out = reinterpret_cast(a.outputs[0].ptr); // Task B depends on A's output - auto b = submit_chip({{a_out}}, {{64}}); + auto b = submit_next_level({{a_out}}, {{64}}); EXPECT_EQ(slots[b.task_slot].state.load(), TaskState::PENDING); EXPECT_EQ(slots[b.task_slot].fanin_count, 1); @@ -81,7 +82,7 @@ TEST_F(OrchestratorFixture, DependentTaskIsPending) { } TEST_F(OrchestratorFixture, TensorMapTracksProducer) { - auto a = submit_chip({}, {{256}}); + auto a = submit_next_level({}, {{256}}); DistTaskSlot drain_slot; rq.try_pop(drain_slot); @@ -90,7 +91,7 @@ TEST_F(OrchestratorFixture, TensorMapTracksProducer) { } TEST_F(OrchestratorFixture, OnConsumedCleansUpTensorMap) { - auto a = submit_chip({}, {{64}}); + auto a = submit_next_level({}, {{64}}); DistTaskSlot slot; rq.try_pop(slot); @@ -107,7 +108,7 @@ TEST_F(OrchestratorFixture, OnConsumedCleansUpTensorMap) { TEST_F(OrchestratorFixture, ScopeRegistersAndReleasesRef) { orch.scope_begin(); - auto a = submit_chip({}, {{64}}); + auto a = submit_next_level({}, {{64}}); DistTaskSlot slot; rq.try_pop(slot); @@ -126,7 +127,7 @@ TEST_F(OrchestratorFixture, ScopeRegistersAndReleasesRef) { } TEST_F(OrchestratorFixture, MultipleOutputsAllocated) { - auto res = submit_chip({}, {{32}, {64}, {128}}); + auto res = submit_next_level({}, {{32}, {64}, {128}}); ASSERT_EQ(res.outputs.size(), 3u); EXPECT_EQ(res.outputs[0].size, 32u); EXPECT_EQ(res.outputs[1].size, 64u); diff --git a/tests/ut/cpp/test_dist_scheduler.cpp b/tests/ut/cpp/test_dist_scheduler.cpp index b3082747f..f27d31a56 100644 --- a/tests/ut/cpp/test_dist_scheduler.cpp +++ b/tests/ut/cpp/test_dist_scheduler.cpp @@ -93,7 +93,7 @@ struct SchedulerFixture : public ::testing::Test { DistScope scope; DistReadyQueue rq; DistOrchestrator orch; - MockWorker chip_worker; + MockWorker mock_worker; DistScheduler sched; std::vector consumed_slots; @@ -108,7 +108,7 @@ struct SchedulerFixture : public ::testing::Test { cfg.slots = slots.get(); cfg.num_slots = N; cfg.ready_queue = &rq; - cfg.chip_workers = {&chip_worker}; + cfg.next_level_workers = {&mock_worker}; cfg.on_consumed_cb = [this](DistTaskSlot s) { orch.on_consumed(s); std::lock_guard lk(consumed_mu); @@ -122,10 +122,11 @@ struct SchedulerFixture : public ::testing::Test { ring.shutdown(); } - DistSubmitResult submit_chip(const std::vector &inputs, const std::vector &outputs) { + DistSubmitResult + submit_next_level(const std::vector &inputs, const std::vector &outputs) { WorkerPayload p; - p.worker_type = WorkerType::CHIP; - return orch.submit(WorkerType::CHIP, p, inputs, outputs); + p.worker_type = WorkerType::NEXT_LEVEL; + return orch.submit(WorkerType::NEXT_LEVEL, p, inputs, outputs); } void wait_consumed(DistTaskSlot slot, int timeout_ms = 500) { @@ -147,40 +148,40 @@ struct SchedulerFixture : public ::testing::Test { // --------------------------------------------------------------------------- TEST_F(SchedulerFixture, IndependentTaskDispatchedAndConsumed) { - auto res = submit_chip({}, {{64}}); + auto res = submit_next_level({}, {{64}}); DistTaskSlot slot = res.task_slot; // WorkerThread calls MockWorker.run() — wait for it to start - chip_worker.wait_running(); - ASSERT_GE(chip_worker.dispatched_count(), 1); - EXPECT_EQ(chip_worker.dispatched[0].slot, slot); + mock_worker.wait_running(); + ASSERT_GE(mock_worker.dispatched_count(), 1); + EXPECT_EQ(mock_worker.dispatched[0].slot, slot); // Signal completion → WorkerThread pushes to completion_queue → Scheduler consumes - chip_worker.complete(); + mock_worker.complete(); wait_consumed(slot); } TEST_F(SchedulerFixture, DependentTaskDispatchedAfterProducerCompletes) { - auto a = submit_chip({}, {{128}}); + auto a = submit_next_level({}, {{128}}); uint64_t a_key = reinterpret_cast(a.outputs[0].ptr); - auto b = submit_chip({{a_key}}, {{64}}); + auto b = submit_next_level({{a_key}}, {{64}}); EXPECT_EQ(slots[b.task_slot].state.load(), TaskState::PENDING); // Complete A → B should become ready - chip_worker.wait_running(); - EXPECT_EQ(chip_worker.dispatched[0].slot, a.task_slot); - chip_worker.complete(); // A done + mock_worker.wait_running(); + EXPECT_EQ(mock_worker.dispatched[0].slot, a.task_slot); + mock_worker.complete(); // A done // Wait for B to be dispatched auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(300); - while (chip_worker.dispatched_count() < 2 && std::chrono::steady_clock::now() < deadline) { + while (mock_worker.dispatched_count() < 2 && std::chrono::steady_clock::now() < deadline) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - ASSERT_GE(chip_worker.dispatched_count(), 2); - EXPECT_EQ(chip_worker.dispatched[1].slot, b.task_slot); + ASSERT_GE(mock_worker.dispatched_count(), 2); + EXPECT_EQ(mock_worker.dispatched[1].slot, b.task_slot); - chip_worker.complete(); // B done + mock_worker.complete(); // B done wait_consumed(b.task_slot); } @@ -213,7 +214,7 @@ struct GroupSchedulerFixture : public ::testing::Test { cfg.slots = slots.get(); cfg.num_slots = N; cfg.ready_queue = &rq; - cfg.chip_workers = {&worker_a, &worker_b}; + cfg.next_level_workers = {&worker_a, &worker_b}; cfg.on_consumed_cb = [this](DistTaskSlot s) { orch.on_consumed(s); std::lock_guard lk(consumed_mu); @@ -247,10 +248,10 @@ TEST_F(GroupSchedulerFixture, GroupDispatchesToNWorkers) { int dummy_args_1 = 1; WorkerPayload p; - p.worker_type = WorkerType::CHIP; + p.worker_type = WorkerType::NEXT_LEVEL; std::vector args_list = {&dummy_args_0, &dummy_args_1}; - auto res = orch.submit_group(WorkerType::CHIP, p, args_list, {}, {{64}}); + auto res = orch.submit_group(WorkerType::NEXT_LEVEL, p, args_list, {}, {{64}}); DistTaskSlot slot = res.task_slot; // Both workers should receive dispatches @@ -274,9 +275,9 @@ TEST_F(GroupSchedulerFixture, GroupDispatchesToNWorkers) { TEST_F(GroupSchedulerFixture, GroupCompletesOnlyWhenAllDone) { int d0 = 0, d1 = 1; WorkerPayload p; - p.worker_type = WorkerType::CHIP; + p.worker_type = WorkerType::NEXT_LEVEL; - auto res = orch.submit_group(WorkerType::CHIP, p, {&d0, &d1}, {}, {}); + auto res = orch.submit_group(WorkerType::NEXT_LEVEL, p, {&d0, &d1}, {}, {}); DistTaskSlot slot = res.task_slot; worker_a.wait_running(); @@ -297,15 +298,15 @@ TEST_F(GroupSchedulerFixture, GroupDependencyChain) { // Task B depends on A's output — B stays PENDING until group A finishes. int d0 = 0, d1 = 1; WorkerPayload pa; - pa.worker_type = WorkerType::CHIP; + pa.worker_type = WorkerType::NEXT_LEVEL; - auto a = orch.submit_group(WorkerType::CHIP, pa, {&d0, &d1}, {}, {{128}}); + auto a = orch.submit_group(WorkerType::NEXT_LEVEL, pa, {&d0, &d1}, {}, {{128}}); uint64_t a_out = reinterpret_cast(a.outputs[0].ptr); // Submit B depending on A's output WorkerPayload pb; - pb.worker_type = WorkerType::CHIP; - auto b = orch.submit(WorkerType::CHIP, pb, {{a_out}}, {}); + pb.worker_type = WorkerType::NEXT_LEVEL; + auto b = orch.submit(WorkerType::NEXT_LEVEL, pb, {{a_out}}, {}); EXPECT_EQ(slots[b.task_slot].state.load(), TaskState::PENDING); // Complete group A diff --git a/tests/ut/py/test_dist_worker/test_host_worker.py b/tests/ut/py/test_dist_worker/test_host_worker.py index d0334a78f..6243ddf33 100644 --- a/tests/ut/py/test_dist_worker/test_host_worker.py +++ b/tests/ut/py/test_dist_worker/test_host_worker.py @@ -193,7 +193,7 @@ def orch(hw, _args): # no workers — submit with empty workers list isn't useful here; # instead verify that submit() allocates output buffers correctly # by using a SUB worker that immediately signals done - p.worker_type = WorkerType.CHIP # no CHIP workers — task stays RUNNING + p.worker_type = WorkerType.NEXT_LEVEL # no NEXT_LEVEL workers — task stays RUNNING # For output allocation test, just verify DistSubmitResult has outputs # We re-init with sub workers for a real execution test pass