diff --git a/examples/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/golden.py b/examples/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/golden.py new file mode 100644 index 00000000..89df9622 --- /dev/null +++ b/examples/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/golden.py @@ -0,0 +1,7 @@ +from pathlib import Path +import sys + +_BASE = Path(__file__).resolve().parents[1] / "paged_attention" +sys.path.insert(0, str(_BASE)) + +from golden import ALL_CASES, ATOL, DEFAULT_CASE, RTOL, __outputs__, compute_golden, generate_inputs # noqa: E402,F401 diff --git a/examples/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/kernel_config.py b/examples/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/kernel_config.py new file mode 100644 index 00000000..534a84f1 --- /dev/null +++ b/examples/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/kernel_config.py @@ -0,0 +1,72 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from pathlib import Path + +from task_interface import ArgDirection as D # pyright: ignore[reportAttributeAccessIssue] + +_ROOT = Path(__file__).parent +_PA_KERNELS = _ROOT.parent.parent / "paged_attention" / "kernels" + +ORCHESTRATION = { + "source": str(_ROOT / "orchestration" / "paged_attention_orch.cpp"), + "function_name": "aicpu_orchestration_entry", + "signature": [D.IN, D.IN, D.IN, D.IN, D.IN, D.OUT], +} + +KERNELS = [ + { + "func_id": 0, + "name": "QK", + "source": str(_PA_KERNELS / "aic" / "aic_qk_matmul.cpp"), + "core_type": "aic", + "signature": [D.IN, D.IN, D.OUT], + }, + { + "func_id": 2, + "name": "PV", + "source": str(_PA_KERNELS / "aic" / "aic_pv_matmul.cpp"), + "core_type": "aic", + "signature": [D.IN, D.IN, D.OUT], + }, + { + "func_id": 4, + "name": "AIC_HUB", + "source": str(_PA_KERNELS / "aic" / "aic_hub.cpp"), + "core_type": "aic", + "signature": [], + }, + { + "func_id": 1, + "name": "SF", + "source": str(_PA_KERNELS / "aiv" / "aiv_softmax_prepare.cpp"), + "core_type": "aiv", + "signature": [D.IN, D.OUT, D.OUT, D.OUT], + }, + { + "func_id": 3, + "name": "UP", + "source": str(_PA_KERNELS / "aiv" / "aiv_online_update.cpp"), + "core_type": "aiv", + "signature": [D.IN, D.IN, D.IN, D.INOUT, D.INOUT, D.INOUT, D.INOUT], + }, + { + "func_id": 5, + "name": "AIV_HUB", + "source": str(_PA_KERNELS / "aiv" / "aiv_hub.cpp"), + "core_type": "aiv", + "signature": [], + }, +] + +RUNTIME_CONFIG = { + "runtime": "tensormap_and_ringbuffer", + "aicpu_thread_num": 4, + "orch_thread_num": 1, + "block_dim": 24, +} diff --git a/examples/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/orchestration/paged_attention_orch.cpp b/examples/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/orchestration/paged_attention_orch.cpp new file mode 100644 index 00000000..0416f861 --- /dev/null +++ b/examples/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/orchestration/paged_attention_orch.cpp @@ -0,0 +1,168 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +#include +#include + +#include +#include + +#include "pto_orchestration_api.h" // NOLINT(build/include_subdir) + +#define FUNC_QK_MATMUL 0 +#define FUNC_SOFTMAX_PREPARE 1 +#define FUNC_PV_MATMUL 2 +#define FUNC_ONLINE_UPDATE 3 +#define FUNC_AIC_HUB 4 +#define FUNC_AIV_HUB 5 + +extern "C" { + +__attribute__((visibility("default"))) PTO2OrchestrationConfig +aicpu_orchestration_config(const ChipStorageTaskArgs &orch_args) { + (void)orch_args; // NOLINT(readability/casting) + return PTO2OrchestrationConfig{ + .expected_arg_count = 7, + }; +} + +__attribute__((visibility("default"))) void +aicpu_orchestration_entry(const ChipStorageTaskArgs &orch_args, int orch_thread_num, int orch_thread_index) { + uint64_t batch = orch_args.tensor(0).shapes[0]; + uint64_t num_heads = orch_args.tensor(0).shapes[1]; + uint64_t head_dim = orch_args.tensor(0).shapes[2]; + DataType data_type = orch_args.tensor(0).dtype; + uint64_t block_size = orch_args.tensor(1).shapes[1]; + uint64_t block_num = orch_args.tensor(3).shapes[1]; + uint64_t scale_value = orch_args.scalar(0); + + uint64_t q_head_num = num_heads; + uint64_t q_tile = 16; + uint64_t q_loop = (q_head_num + q_tile - 1) / q_tile; + + uint64_t b_start = batch * orch_thread_index / orch_thread_num; + uint64_t b_end = batch * (orch_thread_index + 1) / orch_thread_num; + + void *query_ptr = orch_args.tensor(0).data_as(); + void *kc_ptr = orch_args.tensor(1).data_as(); + void *vc_ptr = orch_args.tensor(2).data_as(); + void *out_ptr = orch_args.tensor(5).data_as(); + + uint64_t total_blocks_count = orch_args.tensor(1).shapes[0]; + uint64_t kv_total_rows = total_blocks_count * block_size; + + uint32_t query_shapes[2] = {static_cast(batch * num_heads), static_cast(head_dim)}; + uint32_t key_cache_shapes[2] = {static_cast(kv_total_rows), static_cast(head_dim)}; + uint32_t value_cache_shapes[2] = {static_cast(kv_total_rows), static_cast(head_dim)}; + uint32_t out_shapes[2] = {static_cast(batch * num_heads), static_cast(head_dim)}; + Tensor query = make_tensor_external(query_ptr, query_shapes, 2, data_type); + Tensor key_cache = make_tensor_external(kc_ptr, key_cache_shapes, 2, data_type); + Tensor value_cache = make_tensor_external(vc_ptr, value_cache_shapes, 2, data_type); + Tensor out = make_tensor_external(out_ptr, out_shapes, 2, DataType::FLOAT32); + + int *host_block_table = orch_args.tensor(3).data_as(); + int *host_context_lens = orch_args.tensor(4).data_as(); + + uint32_t tile2d_shapes[2] = {static_cast(q_tile), static_cast(head_dim)}; + uint32_t scalar_shapes[1] = {static_cast(q_tile)}; + uint32_t sij_shapes[2] = {static_cast(q_tile), static_cast(block_size)}; + TensorCreateInfo tile2d_ci(tile2d_shapes, 2, DataType::FLOAT32); + TensorCreateInfo scalar_ci(scalar_shapes, 1, DataType::FLOAT32); + TensorCreateInfo sij_ci(sij_shapes, 2, DataType::FLOAT32); + TensorCreateInfo pij_f16_ci(sij_shapes, 2, data_type); + + for (uint64_t b_idx = b_start; b_idx < b_end; b_idx++) { + uint64_t cur_seq = host_context_lens[b_idx]; + uint64_t bn_this_batch = (cur_seq + block_size - 1) / block_size; + for (uint64_t q_idx = 0; q_idx < q_loop; q_idx++) { + PTO2_SCOPE() { + uint32_t cur_offset = static_cast(b_idx * q_head_num + q_idx * q_tile); + uint32_t qi_offsets[2] = {cur_offset, 0}; + uint32_t out_view_offsets[2] = {cur_offset, 0}; + Tensor qi = query.view(tile2d_shapes, qi_offsets); + Tensor out_view = out.view(tile2d_shapes, out_view_offsets); + + Arg params_inplace; + params_inplace.add_output(tile2d_ci); + params_inplace.add_output(scalar_ci); + params_inplace.add_output(scalar_ci); + TaskOutputTensors hub_outs = pto2_rt_submit_aiv_task(FUNC_AIV_HUB, params_inplace); + const Tensor &oi = hub_outs.get_ref(0); + const Tensor &li_update = hub_outs.get_ref(1); + const Tensor &mi_update = hub_outs.get_ref(2); + + for (uint64_t bn = 0; bn < bn_this_batch; bn++) { + uint64_t cur_block_idx = host_block_table[b_idx * block_num + bn]; + uint64_t valid_len = std::min(block_size, cur_seq - bn * block_size); + uint32_t kv_shapes[2] = {static_cast(block_size), static_cast(head_dim)}; + uint32_t kv_offsets[2] = {static_cast(cur_block_idx * block_size), 0}; + Tensor kj = key_cache.view(kv_shapes, kv_offsets); + Tensor vj = value_cache.view(kv_shapes, kv_offsets); + + PTO2_SCOPE(PTO2ScopeMode::MANUAL) { + Arg params_qk; + params_qk.add_input(qi); + params_qk.add_input(kj); + params_qk.add_output(sij_ci); + PTO2ManualSubmitResult qk_outs = pto2_rt_submit_aic_task_manual(FUNC_QK_MATMUL, params_qk); + const Tensor &sij = qk_outs.outputs.get_ref(0); + + uint32_t sij_valid_shapes[2] = { + static_cast(q_tile), static_cast(valid_len) + }; + uint32_t sij_valid_offsets[2] = {0, 0}; + Tensor sij_valid = sij.view(sij_valid_shapes, sij_valid_offsets); + + Arg params_sf; + params_sf.add_input(sij_valid); + params_sf.add_output(pij_f16_ci); + params_sf.add_output(scalar_ci); + params_sf.add_output(scalar_ci); + params_sf.add_scalar(scale_value); + PTO2ManualSubmitResult sf_outs = pto2_rt_submit_aiv_task_manual_with_deps( + FUNC_SOFTMAX_PREPARE, params_sf, {qk_outs.task_id} + ); + const Tensor &pij_f16 = sf_outs.outputs.get_ref(0); + const Tensor &mi = sf_outs.outputs.get_ref(1); + const Tensor &li = sf_outs.outputs.get_ref(2); + + Arg params_pv; + params_pv.add_input(pij_f16); + params_pv.add_input(vj); + params_pv.add_output(tile2d_ci); + PTO2ManualSubmitResult pv_outs = + pto2_rt_submit_aic_task_manual_with_deps(FUNC_PV_MATMUL, params_pv, {sf_outs.task_id}); + const Tensor &oi_tmp = pv_outs.outputs.get_ref(0); + + uint64_t is_first = (bn == 0) ? 1 : 0; + uint64_t is_last = (bn == bn_this_batch - 1) ? 1 : 0; + + Arg params_up; + params_up.add_input(mi); + params_up.add_input(li); + params_up.add_input(oi_tmp); + params_up.add_inout(mi_update); + params_up.add_inout(li_update); + params_up.add_inout(oi); + params_up.add_inout(out_view); + params_up.add_scalar(is_first); + params_up.add_scalar(is_last); + PTO2ManualSubmitResult up_outs = pto2_rt_submit_aiv_task_manual_with_deps( + FUNC_ONLINE_UPDATE, params_up, {sf_outs.task_id, pv_outs.task_id} + ); + } + } + } + } + } +} + +} // extern "C" diff --git a/examples/scripts/run_example.py b/examples/scripts/run_example.py index 89ab8419..76ae16f1 100644 --- a/examples/scripts/run_example.py +++ b/examples/scripts/run_example.py @@ -21,12 +21,12 @@ Examples: # Run hardware example (requires Ascend device) - python examples/scripts/run_example.py -k examples/host_build_graph/vector_example/kernels \ - -g examples/host_build_graph/vector_example/golden.py + python examples/scripts/run_example.py -k examples/a2a3/host_build_graph/vector_example/kernels \ + -g examples/a2a3/host_build_graph/vector_example/golden.py # Run simulation example (no hardware required) - python examples/scripts/run_example.py -k examples/host_build_graph/vector_example/kernels \ - -g examples/host_build_graph/vector_example/golden.py \ + python examples/scripts/run_example.py -k examples/a2a3/host_build_graph/vector_example/kernels \ + -g examples/a2a3/host_build_graph/vector_example/golden.py \ -p a2a3sim # Run with specific device diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 97afd6a4..18f73926 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -2364,6 +2364,17 @@ int32_t AicpuExecutor::run(Runtime *runtime) { } } + if (rt != nullptr) { + void* sm = runtime->get_pto2_gm_sm_ptr(); + if (sm != nullptr) { + int32_t orch_err = static_cast(sm)->orch_error_code.load(std::memory_order_acquire); + if (orch_err != PTO2_ERROR_NONE) { + DEV_ERROR("Thread %d: Exiting with orchestrator error code=%d", thread_idx, orch_err); + return -1; + } + } + } + return 0; } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h b/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h index cf752ef2..045880e0 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h @@ -31,9 +31,11 @@ #include #include +#include #include // Type headers needed by orchestration +#include "pto_task_id.h" // PTO2TaskId // NOLINT(build/include_subdir) #include "pto_submit_types.h" // MixedKernels, INVALID_KERNEL_ID, subtask slots // NOLINT(build/include_subdir) #include "pto_types.h" // Arg, TaskOutputTensors, TensorArgType // NOLINT(build/include_subdir) #include "task_args.h" // ChipStorageTaskArgs, ContinuousTensor // NOLINT(build/include_subdir) @@ -84,6 +86,16 @@ inline Tensor from_tensor_arg(const ContinuousTensor &t, bool manual_dep = false // Ops Table and Opaque Runtime // ============================================================================= +enum class PTO2ScopeMode : uint8_t { + AUTO = 0, + MANUAL = 1, +}; + +struct PTO2ManualSubmitResult { + PTO2TaskId task_id; + TaskOutputTensors outputs; +}; + /** * Forward declaration — the orchestration sees PTO2Runtime as a partial * struct whose first field is the ops pointer. The full definition @@ -115,7 +127,13 @@ void pto2_framework_bind_runtime(PTO2Runtime *rt); */ typedef struct PTO2RuntimeOps { TaskOutputTensors (*submit_task)(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args); - void (*scope_begin)(PTO2Runtime *rt); + PTO2ManualSubmitResult (*submit_task_manual)(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args); + PTO2ManualSubmitResult (*submit_task_manual_with_deps)( + PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, const PTO2TaskId explicit_producer_ids[], + int32_t explicit_producer_count + ); + void (*add_dependency)(PTO2Runtime *rt, PTO2TaskId producer, PTO2TaskId consumer); + void (*scope_begin)(PTO2Runtime *rt, PTO2ScopeMode mode); void (*scope_end)(PTO2Runtime *rt); void (*orchestration_done)(PTO2Runtime *rt); bool (*is_fatal)(PTO2Runtime *rt); @@ -179,12 +197,25 @@ static inline TaskOutputTensors alloc_tensors(const CIs &...cis) { always_assert(!args.has_error && "alloc_tensors failed to construct output-only Arg"); return alloc_tensors(args); } - static inline TaskOutputTensors pto2_rt_submit_task(const MixedKernels &mixed_kernels, const Arg &args) { PTO2Runtime *rt = pto2_current_runtime(); return rt->ops->submit_task(rt, mixed_kernels, args); } +static inline PTO2ManualSubmitResult pto2_rt_submit_task_manual(const MixedKernels &mixed_kernels, const Arg &args) { + PTO2Runtime *rt = pto2_current_runtime(); + return rt->ops->submit_task_manual(rt, mixed_kernels, args); +} + +static inline PTO2ManualSubmitResult pto2_rt_submit_task_manual_with_deps( + const MixedKernels &mixed_kernels, const Arg &args, std::initializer_list explicit_producers +) { + PTO2Runtime *rt = pto2_current_runtime(); + return rt->ops->submit_task_manual_with_deps( + rt, mixed_kernels, args, explicit_producers.begin(), static_cast(explicit_producers.size()) + ); +} + /** * Convenience wrapper: submit an AIC-only task. */ @@ -205,9 +236,50 @@ static inline TaskOutputTensors pto2_rt_submit_aiv_task(int32_t kernel_id, const return rt->ops->submit_task(rt, mk, args); } -static inline void pto2_rt_scope_begin() { +static inline PTO2ManualSubmitResult pto2_rt_submit_aic_task_manual(int32_t kernel_id, const Arg &args) { + PTO2Runtime *rt = pto2_current_runtime(); + MixedKernels mk; + mk.aic_kernel_id = kernel_id; + return rt->ops->submit_task_manual(rt, mk, args); +} + +static inline PTO2ManualSubmitResult pto2_rt_submit_aic_task_manual_with_deps( + int32_t kernel_id, const Arg &args, std::initializer_list explicit_producers +) { + PTO2Runtime *rt = pto2_current_runtime(); + MixedKernels mk; + mk.aic_kernel_id = kernel_id; + return rt->ops->submit_task_manual_with_deps( + rt, mk, args, explicit_producers.begin(), static_cast(explicit_producers.size()) + ); +} + +static inline PTO2ManualSubmitResult pto2_rt_submit_aiv_task_manual(int32_t kernel_id, const Arg &args) { + PTO2Runtime *rt = pto2_current_runtime(); + MixedKernels mk; + mk.aiv0_kernel_id = kernel_id; + return rt->ops->submit_task_manual(rt, mk, args); +} + +static inline PTO2ManualSubmitResult pto2_rt_submit_aiv_task_manual_with_deps( + int32_t kernel_id, const Arg &args, std::initializer_list explicit_producers +) { + PTO2Runtime *rt = pto2_current_runtime(); + MixedKernels mk; + mk.aiv0_kernel_id = kernel_id; + return rt->ops->submit_task_manual_with_deps( + rt, mk, args, explicit_producers.begin(), static_cast(explicit_producers.size()) + ); +} + +static inline void pto2_rt_add_dependency(PTO2TaskId producer, PTO2TaskId consumer) { + PTO2Runtime *rt = pto2_current_runtime(); + rt->ops->add_dependency(rt, producer, consumer); +} + +static inline void pto2_rt_scope_begin(PTO2ScopeMode mode = PTO2ScopeMode::AUTO) { PTO2Runtime *rt = pto2_current_runtime(); - rt->ops->scope_begin(rt); + rt->ops->scope_begin(rt, mode); } static inline void pto2_rt_scope_end() { @@ -300,9 +372,9 @@ static inline void set_tensor_data(const Tensor &tensor, uint32_t ndims, const u */ class PTO2ScopeGuard { public: // NOLINT(whitespace/indent) - PTO2ScopeGuard() : + explicit PTO2ScopeGuard(PTO2ScopeMode mode = PTO2ScopeMode::AUTO) : rt_(pto2_current_runtime()) { - rt_->ops->scope_begin(rt_); + rt_->ops->scope_begin(rt_, mode); } ~PTO2ScopeGuard() { rt_->ops->scope_end(rt_); } @@ -313,7 +385,8 @@ class PTO2ScopeGuard { #define _PTO2_CONCATENATE_IMPL(x, y) x##y #define _PTO2_CONCATENATE(x, y) _PTO2_CONCATENATE_IMPL(x, y) -#define PTO2_SCOPE_GUARD() [[maybe_unused]] PTO2ScopeGuard _PTO2_CONCATENATE(scope_guard_, __COUNTER__) +#define PTO2_SCOPE_GUARD(...) \ + [[maybe_unused]] PTO2ScopeGuard _PTO2_CONCATENATE(scope_guard_, __COUNTER__) { __VA_ARGS__ } /** * Scoped block macro: @@ -321,7 +394,7 @@ class PTO2ScopeGuard { * pto2_rt_submit_task(...); * } */ -#define PTO2_SCOPE() if (PTO2_SCOPE_GUARD(); true) +#define PTO2_SCOPE(...) if (PTO2_SCOPE_GUARD(__VA_ARGS__); true) // ============================================================================= // Orchestration Config diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index defc1ec4..19d4af9b 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -150,6 +150,9 @@ struct PTO2FaninBuilder { } }; +static int32_t current_manual_scope_begin(const PTO2OrchestratorState *orch); +static int32_t find_current_manual_scope_task_index(const PTO2OrchestratorState *orch, PTO2TaskId task_id); + static bool pto2_append_fanin_or_fail( PTO2OrchestratorState *orch, PTO2TaskId task_id, int32_t tensor_arg_index, TensorArgType ptype, PTO2TaskSlotState *prod_state, PTO2FaninBuilder *fanin_builder, PTO2SchedulerState *sched, PTO2RingFlowControl &fc, @@ -195,7 +198,96 @@ static bool pto2_append_fanin_or_fail( return true; } -static void scope_tasks_push(PTO2OrchestratorState *orch, PTO2TaskSlotState *task_slot_state); +static bool pto2_append_manual_explicit_fanins_to_payload_or_fail( + PTO2OrchestratorState *orch, PTO2TaskId consumer_id, PTO2TaskPayload *consumer_payload, + PTO2TaskSlotState *consumer_slot_state, const PTO2TaskId explicit_producer_ids[], int32_t explicit_producer_count +) { + if (explicit_producer_count <= 0 || explicit_producer_ids == nullptr) { + return true; + } + + if (consumer_payload->manual_explicit_fanin_count == 0) { + consumer_payload->manual_explicit_fanin_begin = consumer_payload->fanin_actual_count; + } + + for (int32_t i = 0; i < explicit_producer_count; i++) { + PTO2TaskId producer_id = explicit_producer_ids[i]; + if (!producer_id.is_valid()) { + LOG_ERROR("manual submit explicit dependency requires valid producer task ids"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return false; + } + if (producer_id == consumer_id) { + LOG_ERROR("manual submit explicit dependency does not allow self-dependency"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return false; + } + int32_t producer_idx = find_current_manual_scope_task_index(orch, producer_id); + if (producer_idx < 0) { + LOG_ERROR("manual submit explicit dependency requires producers from the current manual scope"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return false; + } + PTO2TaskSlotState *producer_slot_state = orch->scope_tasks[current_manual_scope_begin(orch) + producer_idx]; + bool duplicate = false; + pto2_for_each_fanin_slot_state(*consumer_payload, [&](PTO2TaskSlotState *fanin_slot_state) { + if (fanin_slot_state == producer_slot_state) { + duplicate = true; + return false; + } + return true; + }); + if (duplicate) { + continue; + } + if (consumer_payload->fanin_actual_count >= PTO2_MAX_INPUTS) { + LOG_ERROR("========================================"); + LOG_ERROR("FATAL: Dependency Overflow Detected!"); + LOG_ERROR("========================================"); + LOG_ERROR("Task requires more than PTO2_MAX_INPUTS unique fanin dependencies."); + LOG_ERROR(" consumer_id.raw: %" PRIu64, consumer_id.raw); + LOG_ERROR(" fanin_count: %d / %d", consumer_payload->fanin_actual_count + 1, PTO2_MAX_INPUTS); + LOG_ERROR(" reason: manual submit explicit dependency"); + LOG_ERROR("========================================"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_DEPENDENCY_OVERFLOW, std::memory_order_release); + orch->fatal = true; + return false; + } + int32_t current_fanin_count = consumer_payload->fanin_actual_count; + if (current_fanin_count < PTO2_FANIN_INLINE_CAP) { + consumer_payload->fanin_inline_slot_states[current_fanin_count] = producer_slot_state; + } else { + PTO2FaninPool *spill_pool = (consumer_payload->fanin_spill_pool != nullptr) + ? consumer_payload->fanin_spill_pool + : &orch->rings[consumer_slot_state->ring_id].fanin_pool; + PTO2FaninPool &fanin_pool = *spill_pool; + auto &fc = orch->sm_handle->header->rings[consumer_slot_state->ring_id].fc; + fanin_pool.ensure_space(*orch->scheduler, fc, consumer_slot_state->ring_id, 1); + int32_t spill_idx = fanin_pool.top; + PTO2FaninSpillEntry *entry = fanin_pool.alloc(); + if (entry == nullptr) { + orch->fatal = true; + return false; + } + if (current_fanin_count == PTO2_FANIN_INLINE_CAP) { + consumer_payload->fanin_spill_start = spill_idx; + consumer_payload->fanin_spill_pool = spill_pool; + } else if (consumer_payload->fanin_spill_pool == nullptr) { + consumer_payload->fanin_spill_pool = spill_pool; + } + entry->slot_state = producer_slot_state; + } + consumer_payload->fanin_actual_count = current_fanin_count + 1; + consumer_payload->manual_explicit_fanin_count += 1; + consumer_slot_state->fanin_count += 1; + } + return true; +} + +static bool scope_tasks_push(PTO2OrchestratorState *orch, PTO2TaskSlotState *task_slot_state); struct PTO2OutputLayout { uint64_t offsets[MAX_TENSOR_ARGS] = {}; @@ -345,7 +437,6 @@ bool pto2_orchestrator_init( sm_handle->task_descriptors[r], sm_handle->header->rings[r].task_window_size, &fc.current_task_index, &fc.last_task_alive, ring_heap_base, heap_size, &sm_handle->header->orch_error_code ); - size_t fanin_pool_bytes = PTO2_ALIGN_UP(static_cast(dep_pool_capacity) * sizeof(PTO2FaninSpillEntry), PTO2_ALIGN_SIZE); PTO2FaninSpillEntry *fanin_entries = @@ -407,6 +498,8 @@ bool pto2_orchestrator_init( orch->scope_tasks_capacity = init_cap; orch->scope_stack_top = -1; orch->scope_stack_capacity = max_depth; + orch->manual_scope_active = false; + orch->manual_scope_needs_dep_pool_repair = false; return true; } @@ -435,26 +528,171 @@ void pto2_orchestrator_set_scheduler(PTO2OrchestratorState *orch, PTO2SchedulerS // Scope Management // ============================================================================= -static void scope_tasks_push(PTO2OrchestratorState *orch, PTO2TaskSlotState *task_slot_state) { +static bool scope_tasks_push(PTO2OrchestratorState *orch, PTO2TaskSlotState *task_slot_state) { + if (orch->fatal) { + return false; + } if (orch->scope_tasks_size >= orch->scope_tasks_capacity) { int32_t new_cap = orch->scope_tasks_capacity * 2; PTO2TaskSlotState **new_buf = reinterpret_cast(realloc(orch->scope_tasks, new_cap * sizeof(PTO2TaskSlotState *))); - assert(new_buf && "Failed to grow scope task buffer"); + if (new_buf == nullptr) { + LOG_ERROR("Failed to grow scope task buffer to %d entries", new_cap); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_OUT_OF_MEMORY, std::memory_order_release); + orch->fatal = true; + return false; + } orch->scope_tasks = new_buf; orch->scope_tasks_capacity = new_cap; } orch->scope_tasks[orch->scope_tasks_size++] = task_slot_state; + return true; +} + +static bool in_manual_scope(const PTO2OrchestratorState *orch) { + return orch->manual_scope_active; +} + +static int32_t current_manual_scope_begin(const PTO2OrchestratorState *orch) { + return orch->scope_begins[orch->scope_stack_top]; +} + +static int32_t find_current_manual_scope_task_index(const PTO2OrchestratorState *orch, PTO2TaskId task_id) { + if (!in_manual_scope(orch) || !task_id.is_valid()) { + return -1; + } + + int32_t begin = current_manual_scope_begin(orch); + int32_t count = orch->scope_tasks_size - begin; + if (count <= 0) { + return -1; + } + + PTO2TaskSlotState *first_slot_state = orch->scope_tasks[begin]; + if (first_slot_state == nullptr) { + return -1; + } + + PTO2TaskId first_task_id = first_slot_state->task->task_id; + if (first_task_id.ring() != task_id.ring()) { + return -1; + } + + uint32_t first_local = first_task_id.local(); + uint32_t task_local = task_id.local(); + if (task_local < first_local) { + return -1; + } + + uint32_t delta = task_local - first_local; + if (delta >= static_cast(count)) { + return -1; + } + + PTO2TaskSlotState *candidate = orch->scope_tasks[begin + static_cast(delta)]; + if (candidate != nullptr && candidate->task->task_id == task_id) { + return static_cast(delta); + } + return -1; +} + +static bool task_owned_by_current_manual_scope(const PTO2OrchestratorState *orch, PTO2TaskId task_id) { + return find_current_manual_scope_task_index(orch, task_id) >= 0; } -void pto2_scope_begin(PTO2OrchestratorState *orch) { +static bool link_manual_scope_explicit_edges(PTO2OrchestratorState *orch, int32_t begin, int32_t count) { + if (orch->scheduler == nullptr || count <= 0) { + return true; + } + + int32_t total_explicit_edges = 0; + for (int32_t task_idx = 0; task_idx < count; task_idx++) { + PTO2TaskSlotState *consumer_slot_state = orch->scope_tasks[begin + task_idx]; + PTO2TaskPayload *consumer_payload = consumer_slot_state->payload; + if (consumer_payload->fanin_actual_count > PTO2_MAX_INPUTS) { + LOG_ERROR("========================================"); + LOG_ERROR("FATAL: Dependency Overflow Detected!"); + LOG_ERROR("========================================"); + LOG_ERROR("Task requires more than PTO2_MAX_INPUTS unique fanin dependencies."); + LOG_ERROR(" task_id.raw: %" PRIu64, consumer_slot_state->task->task_id.raw); + LOG_ERROR(" fanin_count: %d / %d", consumer_payload->fanin_actual_count, PTO2_MAX_INPUTS); + LOG_ERROR(" reason: manual dependency bookkeeping"); + LOG_ERROR("This is a runtime dependency-tracking limit."); + LOG_ERROR("========================================"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_DEPENDENCY_OVERFLOW, std::memory_order_release); + orch->fatal = true; + return false; + } + total_explicit_edges += consumer_payload->manual_explicit_fanin_count; + } + + if (total_explicit_edges == 0) { + return true; + } + + uint8_t ring_id = orch->scope_tasks[begin]->ring_id; + auto &dep_pool = orch->rings[ring_id].dep_pool; + auto &fc = orch->sm_handle->header->rings[ring_id].fc; + dep_pool.ensure_space(*orch->scheduler, fc, ring_id, total_explicit_edges); + + int32_t dep_pool_mark_prefix = 0; + for (int32_t task_idx = 0; task_idx < count; task_idx++) { + PTO2TaskSlotState *consumer_slot_state = orch->scope_tasks[begin + task_idx]; + PTO2TaskPayload *consumer_payload = consumer_slot_state->payload; + + bool fanout_ok = pto2_for_each_fanin_slot_state_range( + *consumer_payload, consumer_payload->manual_explicit_fanin_begin, + consumer_payload->manual_explicit_fanin_count, + [&](PTO2TaskSlotState *producer_slot_state) { + always_assert( + producer_slot_state->ring_id == consumer_slot_state->ring_id && + "manual explicit dependencies must stay within one ring" + ); + producer_slot_state->fanout_count += 1; + producer_slot_state->fanout_head = + dep_pool.prepend(producer_slot_state->fanout_head, consumer_slot_state); + if (producer_slot_state->fanout_head == nullptr) { + orch->fatal = true; + return false; + } + return true; + } + ); + if (!fanout_ok) { + return false; + } + + if (consumer_slot_state->dep_pool_mark < dep_pool.top) { + consumer_slot_state->dep_pool_mark = dep_pool.top; + } + if (consumer_slot_state->dep_pool_mark < dep_pool_mark_prefix) { + consumer_slot_state->dep_pool_mark = dep_pool_mark_prefix; + } else { + dep_pool_mark_prefix = consumer_slot_state->dep_pool_mark; + } + } + return true; +} + +void pto2_scope_begin(PTO2OrchestratorState *orch, PTO2ScopeMode mode) { if (orch->fatal) { return; } assert(orch->scope_stack_top < static_cast(orch->scope_stack_capacity - 1) && "Scope stack overflow"); + if (in_manual_scope(orch)) { + LOG_ERROR( + "nested PTO2_SCOPE(PTO2ScopeMode::MANUAL) is not supported in v1; manual scope inside manual scope is not supported" + ); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return; + } + ++orch->scope_stack_top; orch->scope_begins[orch->scope_stack_top] = orch->scope_tasks_size; + orch->manual_scope_active = (mode == PTO2ScopeMode::MANUAL); + orch->manual_scope_needs_dep_pool_repair = false; } void pto2_scope_end(PTO2OrchestratorState *orch) { @@ -467,15 +705,41 @@ void pto2_scope_end(PTO2OrchestratorState *orch) { uint64_t _se0 = get_sys_cnt_aicpu(); #endif - int32_t begin = orch->scope_begins[orch->scope_stack_top--]; + int32_t top = orch->scope_stack_top; + int32_t begin = orch->scope_begins[top]; int32_t count = orch->scope_tasks_size - begin; + bool manual_scope = orch->manual_scope_active; + + if (!manual_scope) { + orch->scope_stack_top--; + orch->manual_scope_active = false; + orch->manual_scope_needs_dep_pool_repair = false; + + if (orch->scheduler && count > 0) { + orch->scheduler->on_scope_end(&orch->scope_tasks[begin], count); + } + + orch->scope_tasks_size = begin; + +#if PTO2_ORCH_PROFILING + uint64_t _se1 = get_sys_cnt_aicpu(); + g_orch_scope_end_cycle += (_se1 - _se0); +#endif + return; + } if (orch->scheduler && count > 0) { - orch->scheduler->on_scope_end(&orch->scope_tasks[begin], count); + if (!link_manual_scope_explicit_edges(orch, begin, count)) { + return; + } + orch->scheduler->publish_manual_scope_tasks_and_end_scope(&orch->scope_tasks[begin], count); } // Rewind the task buffer — these entries are no longer needed orch->scope_tasks_size = begin; + orch->scope_stack_top--; + orch->manual_scope_active = false; + orch->manual_scope_needs_dep_pool_repair = false; #if PTO2_ORCH_PROFILING uint64_t _se1 = get_sys_cnt_aicpu(); @@ -487,8 +751,12 @@ void pto2_scope_end(PTO2OrchestratorState *orch) { // ============================================================================= // Task Submission // ============================================================================= -TaskOutputTensors -pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_kernels, const Arg &args) { +template +static TaskOutputTensors pto2_submit_mixed_task_impl( + PTO2OrchestratorState *orch, const MixedKernels &mixed_kernels, const Arg &args, + PTO2TaskId *submitted_task_id = nullptr, const PTO2TaskId explicit_producer_ids[] = nullptr, + int32_t explicit_producer_count = 0 +) { CYCLE_COUNT_START(); TaskOutputTensors result; @@ -520,7 +788,7 @@ pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_ke always_assert(block_num >= 1 && "block_num must be >= 1"); // Normalize single-AIV tasks: if only aiv1 is set (no aic, no aiv0), move - // it to the aiv0 slot. This guarantees the dispatch path can always use + // it to the aiv0 slot. This guarantees the dispatch path can always use // PTO2SubtaskSlot::AIV0 for single-AIV shapes without inspecting active_mask. // Mixed tasks (AIC+AIV) keep their original AIV identity so the correct // hardware channel (AIV0→AIC vs AIV1→AIC) is used at dispatch time. @@ -534,6 +802,20 @@ pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_ke active_mask = pto2_mixed_kernels_to_active_mask(normalized); } + // Submission without an open scope is illegal. + always_assert(orch->scope_stack_top >= 0 && "Cannot submit task outside a scope"); + + if constexpr (!kManualSubmit) { + (void)explicit_producer_ids; + (void)explicit_producer_count; + if (in_manual_scope(orch)) { + LOG_ERROR("PTO2_SCOPE(PTO2ScopeMode::MANUAL) requires pto2_rt_submit_*_manual task APIs"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return result; + } + } + // Encode require_sync_start into active_mask bit 3 (only meaningful for tasks with block_num > 1) if (block_num > 1 && args.launch_spec.require_sync_start()) { // Deadlock check: block_num >= total available slots of the required type. @@ -562,9 +844,47 @@ pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_ke int32_t slot = prepared.alloc_result.slot; PTO2FaninBuilder fanin_builder; - fanin_builder.count = 0; - fanin_builder.spill_start = 0; fanin_builder.spill_pool = &orch->rings[ring_id].fanin_pool; + uint64_t manual_owner_retention_mask = 0; + uint64_t manual_lookup_mask = 0; + uint64_t manual_insert_mask = 0; + bool needs_tensormap_sync = true; + if constexpr (kManualSubmit) { + needs_tensormap_sync = false; + for (int i = 0; i < args.tensor_count(); i++) { + TensorArgType ptype = args.tag(i); + if (ptype == TensorArgType::OUTPUT) { + continue; + } + + const Tensor *tensor = args.tensor(i).ptr; + PTO2TaskId owner = tensor->owner_task_id; + if (tensor->manual_dep && !owner.is_valid()) { + continue; + } + + if (task_owned_by_current_manual_scope(orch, owner)) { + continue; + } + + if (owner.is_valid() && sched != nullptr) { + manual_owner_retention_mask |= static_cast(1ULL << i); + } + + bool needs_lookup = (ptype == TensorArgType::INPUT || ptype == TensorArgType::INOUT) && !tensor->manual_dep; + bool needs_insert = + (ptype == TensorArgType::INOUT || ptype == TensorArgType::OUTPUT_EXISTING) && !tensor->manual_dep; + if (needs_lookup) { + manual_lookup_mask |= static_cast(1ULL << i); + } + if (needs_insert) { + manual_insert_mask |= static_cast(1ULL << i); + } + if (needs_lookup || needs_insert) { + needs_tensormap_sync = true; + } + } + } CYCLE_COUNT_LAP_RECORD(g_orch_alloc_cycle, AicpuPhaseId::ORCH_ALLOC, task_id.raw); @@ -576,63 +896,109 @@ pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_ke #endif // === STEP 2: Sync TensorMap validity and optional cleanup === - // Read current last_task_alive from shared memory for this ring int32_t sm_last_task_alive = fc.last_task_alive.load(std::memory_order_acquire); - orch->tensor_map.sync_tensormap(ring_id, sm_last_task_alive); + if (!kManualSubmit || needs_tensormap_sync) { + orch->tensor_map.sync_tensormap(ring_id, sm_last_task_alive); + } - if (sched) { - orch->rings[ring_id].dep_pool.reclaim(*sched, ring_id, sm_last_task_alive); + if constexpr (!kManualSubmit) { + if (sched) { + orch->rings[ring_id].dep_pool.reclaim(*sched, ring_id, sm_last_task_alive); + } } CYCLE_COUNT_LAP_RECORD(g_orch_sync_cycle, AicpuPhaseId::ORCH_SYNC, task_id.raw); // === STEP 3: Lookup inputs + materialize runtime-created outputs === - for (int i = 0; i < args.tensor_count(); i++) { - TensorArgType ptype = args.tag(i); - if (ptype == TensorArgType::OUTPUT) { - // Runtime-created OUTPUT tensors are not looked up in the TensorMap since they have no dependencies. - continue; - } + if constexpr (!kManualSubmit) { + for (int i = 0; i < args.tensor_count(); i++) { + TensorArgType ptype = args.tag(i); + if (ptype == TensorArgType::OUTPUT) { + continue; + } - const Tensor *tensor = args.tensor(i).ptr; + const Tensor *tensor = args.tensor(i).ptr; + PTO2TaskId owner = tensor->owner_task_id; + if (owner.is_valid() && sched != nullptr) { + PTO2TaskSlotState *prod_state = + &sched->ring_sched_states[owner.ring()].get_slot_state_by_task_id(owner.local()); + if (!pto2_append_fanin_or_fail( + orch, task_id, i, ptype, prod_state, &fanin_builder, sched, fc, ring_id, "creator retention" + )) { + return result; + } + } - // Step A: creator retention — all existing tensors extend their creator lifetime. - PTO2TaskId owner = tensor->owner_task_id; - if (owner.is_valid() && sched != nullptr) { - PTO2TaskSlotState *prod_state = - &sched->ring_sched_states[owner.ring()].get_slot_state_by_task_id(owner.local()); - if (!pto2_append_fanin_or_fail( - orch, task_id, i, ptype, prod_state, &fanin_builder, sched, fc, ring_id, "creator retention" - )) { - return result; + if (ptype != TensorArgType::INPUT && ptype != TensorArgType::INOUT) { + continue; + } + if (tensor->manual_dep) { + continue; } - } - // Step B: only INPUT/INOUT need modifier dependency lookup. - if (ptype != TensorArgType::INPUT && ptype != TensorArgType::INOUT) { - continue; - } - if (tensor->manual_dep) { - continue; + PTO2LookupResult lookup_result; + orch->tensor_map.lookup(*tensor, lookup_result); + + for (int r = 0; r < lookup_result.count; r++) { + PTO2TensorMapEntry &entry = *lookup_result.entries[r].entry; + auto overlap_status = lookup_result.entries[r].overlap_status; + auto prod_ring = entry.producer_task_id.ring(); + auto prod_local = entry.producer_task_id.local(); + PTO2TaskSlotState *prod_state = + &sched->ring_sched_states[prod_ring].get_slot_state_by_task_id(prod_local); + if (!pto2_append_fanin_or_fail( + orch, task_id, i, ptype, prod_state, &fanin_builder, sched, fc, ring_id, "overlap lookup" + )) { + return result; + } + if (ptype == TensorArgType::INOUT && overlap_status == OverlapStatus::COVERED) { + orch->tensor_map.remove_entry(entry); + } + } } + } else { + uint64_t manual_step3_mask = manual_owner_retention_mask | manual_lookup_mask; + while (manual_step3_mask != 0) { + int i = __builtin_ctzll(manual_step3_mask); + uint64_t arg_mask = static_cast(1ULL << i); + manual_step3_mask &= (manual_step3_mask - 1); - PTO2LookupResult lookup_result; - orch->tensor_map.lookup(*tensor, lookup_result); + TensorArgType ptype = args.tag(i); + const Tensor *tensor = args.tensor(i).ptr; + if ((manual_owner_retention_mask & arg_mask) != 0) { + PTO2TaskId owner = tensor->owner_task_id; + PTO2TaskSlotState *prod_state = + &sched->ring_sched_states[owner.ring()].get_slot_state_by_task_id(owner.local()); + if (!pto2_append_fanin_or_fail( + orch, task_id, i, ptype, prod_state, &fanin_builder, sched, fc, ring_id, "creator retention" + )) { + return result; + } + } - for (int r = 0; r < lookup_result.count; r++) { - PTO2TensorMapEntry &entry = *lookup_result.entries[r].entry; - auto overlap_status = lookup_result.entries[r].overlap_status; - auto prod_ring = entry.producer_task_id.ring(); - auto prod_local = entry.producer_task_id.local(); - PTO2TaskSlotState *prod_state = &sched->ring_sched_states[prod_ring].get_slot_state_by_task_id(prod_local); - if (!pto2_append_fanin_or_fail( - orch, task_id, i, ptype, prod_state, &fanin_builder, sched, fc, ring_id, "overlap lookup" - )) { - return result; + if ((manual_lookup_mask & arg_mask) == 0) { + continue; } - if (ptype == TensorArgType::INOUT && overlap_status == OverlapStatus::COVERED) { - orch->tensor_map.remove_entry(entry); + + PTO2LookupResult lookup_result; + orch->tensor_map.lookup(*tensor, lookup_result); + + for (int r = 0; r < lookup_result.count; r++) { + PTO2TensorMapEntry &entry = *lookup_result.entries[r].entry; + auto overlap_status = lookup_result.entries[r].overlap_status; + auto prod_ring = entry.producer_task_id.ring(); + auto prod_local = entry.producer_task_id.local(); + PTO2TaskSlotState *prod_state = + &sched->ring_sched_states[prod_ring].get_slot_state_by_task_id(prod_local); + if (!pto2_append_fanin_or_fail( + orch, task_id, i, ptype, prod_state, &fanin_builder, sched, fc, ring_id, "overlap lookup" + )) { + return result; + } + if (ptype == TensorArgType::INOUT && overlap_status == OverlapStatus::COVERED) { + orch->tensor_map.remove_entry(entry); + } } } } @@ -640,15 +1006,20 @@ pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_ke CYCLE_COUNT_LAP_RECORD(g_orch_lookup_cycle, AicpuPhaseId::ORCH_LOOKUP, task_id.raw); // === STEP 4: Register outputs/inouts in TensorMap (must be separate from lookup) === - { + if constexpr (!kManualSubmit) { for (int i = 0; i < args.tensor_count(); i++) { TensorArgType ptype = args.tag(i); - if (ptype == TensorArgType::INOUT || ptype == TensorArgType::OUTPUT_EXISTING) { - if (!args.tensor(i).ptr->manual_dep) { - orch->tensor_map.insert(*args.tensor(i).ptr, task_id); - } + if ((ptype == TensorArgType::INOUT || ptype == TensorArgType::OUTPUT_EXISTING) && + !args.tensor(i).ptr->manual_dep) { + orch->tensor_map.insert(*args.tensor(i).ptr, task_id); } } + } else { + while (manual_insert_mask != 0) { + int i = __builtin_ctzll(manual_insert_mask); + manual_insert_mask &= (manual_insert_mask - 1); + orch->tensor_map.insert(*args.tensor(i).ptr, task_id); + } } CYCLE_COUNT_LAP_RECORD(g_orch_insert_cycle, AicpuPhaseId::ORCH_INSERT, task_id.raw); @@ -705,55 +1076,121 @@ pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_ke cur_slot_state.block_num = block_num; cur_slot_state.next_block_idx = 0; - auto &dep_pool = orch->rings[ring_id].dep_pool; - int32_t fanin_count = fanin_builder.count; - int32_t inline_count = std::min(fanin_count, PTO2_FANIN_INLINE_CAP); - int32_t spill_count = fanin_count - inline_count; - dep_pool.ensure_space(*sched, fc, ring_id, fanin_count + 1); - - int32_t early_finished = 0; - cur_slot_state.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early - payload->fanin_actual_count = fanin_count; - payload->fanin_spill_start = (spill_count > 0) ? fanin_builder.spill_start : 0; - payload->fanin_spill_pool = (spill_count > 0) ? fanin_builder.spill_pool : nullptr; - for (int i = 0; i < inline_count; i++) { - payload->fanin_inline_slot_states[i] = fanin_builder.inline_slots[i]; - } - pto2_for_each_fanin_slot_state(*payload, [&](PTO2TaskSlotState *producer_slot) { - PTO2TaskSlotState &producer_slot_state = *producer_slot; + if constexpr (kManualSubmit) { + int32_t fanin_count = fanin_builder.count; + int32_t inline_count = std::min(fanin_count, PTO2_FANIN_INLINE_CAP); + int32_t spill_count = fanin_count - inline_count; + payload->fanin_actual_count = fanin_count; + payload->fanin_spill_start = (spill_count > 0) ? fanin_builder.spill_start : 0; + payload->fanin_spill_pool = (spill_count > 0) ? fanin_builder.spill_pool : nullptr; + payload->manual_explicit_fanin_begin = fanin_count; + payload->manual_explicit_fanin_count = 0; + for (int i = 0; i < inline_count; i++) { + payload->fanin_inline_slot_states[i] = fanin_builder.inline_slots[i]; + } + + auto &dep_pool = orch->rings[ring_id].dep_pool; + dep_pool.ensure_space(*sched, fc, ring_id, fanin_count); + + int32_t early_finished = 0; + bool fanout_ok = fanin_builder.for_each([&](PTO2TaskSlotState *producer_slot) { + PTO2TaskSlotState &producer_slot_state = *producer_slot; #if PTO2_ORCH_PROFILING - pto2_fanout_lock(producer_slot_state, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle); + pto2_fanout_lock(producer_slot_state, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle); #else - pto2_fanout_lock(producer_slot_state); + pto2_fanout_lock(producer_slot_state); #endif - producer_slot_state.fanout_count += 1; - int32_t prod_state = producer_slot_state.task_state.load(std::memory_order_acquire); - if (prod_state >= PTO2_TASK_COMPLETED) { - early_finished++; - } else { - producer_slot_state.fanout_head = dep_pool.prepend(producer_slot_state.fanout_head, &cur_slot_state); + producer_slot_state.fanout_count += 1; + int32_t prod_state = producer_slot_state.task_state.load(std::memory_order_acquire); + if (prod_state >= PTO2_TASK_COMPLETED) { + early_finished++; + } else { + producer_slot_state.fanout_head = + dep_pool.prepend(producer_slot_state.fanout_head, &cur_slot_state); + if (producer_slot_state.fanout_head == nullptr) { + pto2_fanout_unlock(producer_slot_state); + orch->fatal = true; + return false; + } + } + pto2_fanout_unlock(producer_slot_state); + return true; + }); + if (!fanout_ok) { + return result; + } + cur_slot_state.fanin_count = fanin_count + 1; + if (early_finished > 0) { + cur_slot_state.fanin_refcount.fetch_add(early_finished, std::memory_order_acq_rel); + } + cur_slot_state.dep_pool_mark = dep_pool.top; + if (!pto2_append_manual_explicit_fanins_to_payload_or_fail( + orch, task_id, payload, &cur_slot_state, explicit_producer_ids, explicit_producer_count + )) { + return result; } - pto2_fanout_unlock(producer_slot_state); - }); - // Combined release: merge early_finished batch with the +1 init release - // into a single atomic fetch_add (saves one acq_rel cache-line bounce per task). - int32_t initial_refcount = early_finished + 1; // +1 for the init release - int32_t new_rc = - cur_slot_state.fanin_refcount.fetch_add(initial_refcount, std::memory_order_acq_rel) + initial_refcount; - if (new_rc >= fanin_count + 1) { - PTO2ResourceShape shape = pto2_active_mask_to_shape(active_mask); - sched->ready_queues[static_cast(shape)].push(&cur_slot_state); - } - // Record dep pool watermark in local slot state (used by tail reclamation) - cur_slot_state.dep_pool_mark = orch->rings[ring_id].dep_pool.top; #if PTO2_ORCH_PROFILING - // Per producer: fetch_add(fanout_count) + load(task_state) + store(unlock) = 3 atomics - // Lock atomics (loads + CAS) are counted inside pto2_fanout_lock - g_orch_fanin_atomic_count += fanin_count * 3; - if (early_finished > 0) { - g_orch_fanin_atomic_count += 1; // fanin_refcount.fetch_add - } + g_orch_fanin_atomic_count += fanin_count * 3; + if (early_finished > 0) { + g_orch_fanin_atomic_count += 1; // fanin_refcount.fetch_add + } #endif + } else { + auto &dep_pool = orch->rings[ring_id].dep_pool; + int32_t fanin_count = fanin_builder.count; + int32_t inline_count = std::min(fanin_count, PTO2_FANIN_INLINE_CAP); + int32_t spill_count = fanin_count - inline_count; + dep_pool.ensure_space(*sched, fc, ring_id, fanin_count + 1); + + int32_t early_finished = 0; + cur_slot_state.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early + payload->fanin_actual_count = fanin_count; + payload->fanin_spill_start = (spill_count > 0) ? fanin_builder.spill_start : 0; + payload->fanin_spill_pool = (spill_count > 0) ? fanin_builder.spill_pool : nullptr; + payload->manual_explicit_fanin_begin = fanin_count; + payload->manual_explicit_fanin_count = 0; + for (int i = 0; i < inline_count; i++) { + payload->fanin_inline_slot_states[i] = fanin_builder.inline_slots[i]; + } + pto2_for_each_fanin_slot_state(*payload, [&](PTO2TaskSlotState *producer_slot) { + PTO2TaskSlotState &producer_slot_state = *producer_slot; +#if PTO2_ORCH_PROFILING + pto2_fanout_lock(producer_slot_state, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle); +#else + pto2_fanout_lock(producer_slot_state); +#endif + // Normal path: prepend consumer to producer's fanout list + producer_slot_state.fanout_count += 1; + int32_t prod_state = producer_slot_state.task_state.load(std::memory_order_acquire); + if (prod_state >= PTO2_TASK_COMPLETED) { + // Early return optimization: if producer already completed, we can skip adding dependency and + // directly decrement fanin_count + early_finished++; + } else { + producer_slot_state.fanout_head = + dep_pool.prepend(producer_slot_state.fanout_head, &cur_slot_state); + } + pto2_fanout_unlock(producer_slot_state); + return true; + }); + int32_t initial_refcount = early_finished + 1; // +1 for the init release + int32_t new_rc = + cur_slot_state.fanin_refcount.fetch_add(initial_refcount, std::memory_order_acq_rel) + initial_refcount; + if (new_rc >= fanin_count + 1) { + PTO2ResourceShape shape = pto2_active_mask_to_shape(active_mask); + sched->ready_queues[static_cast(shape)].push(&cur_slot_state); + } + // Record dep pool watermark in local slot state (used by tail reclamation) + cur_slot_state.dep_pool_mark = orch->rings[ring_id].dep_pool.top; +#if PTO2_ORCH_PROFILING + // Per producer: fetch_add(fanout_count) + load(task_state) + store(unlock) = 3 atomics + // Lock atomics (loads + CAS) are counted inside pto2_fanout_lock + g_orch_fanin_atomic_count += fanin_count * 3; + if (early_finished > 0) { + g_orch_fanin_atomic_count += 1; // fanin_refcount.fetch_add + } +#endif + } } CYCLE_COUNT_LAP_RECORD(g_orch_fanin_cycle, AicpuPhaseId::ORCH_FANIN, task_id.raw); @@ -765,6 +1202,9 @@ pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_ke #endif g_orch_submit_idx++; #endif + if (submitted_task_id != nullptr) { + *submitted_task_id = task_id; + } return result; } @@ -821,6 +1261,8 @@ TaskOutputTensors pto2_alloc_tensors(PTO2OrchestratorState *orch, const Arg &arg payload->fanin_actual_count = 0; payload->fanin_spill_start = 0; payload->fanin_spill_pool = nullptr; + payload->manual_explicit_fanin_begin = 0; + payload->manual_explicit_fanin_count = 0; for (int32_t i = 0; i < args.tensor_count(); i++) { payload->tensors[i].owner_task_id = prepared.task_id; } @@ -853,6 +1295,148 @@ TaskOutputTensors pto2_alloc_tensors(PTO2OrchestratorState *orch, const Arg &arg return outputs; } +TaskOutputTensors +pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_kernels, const Arg &args) { + if (in_manual_scope(orch)) { + LOG_ERROR("PTO2_SCOPE(PTO2ScopeMode::MANUAL) requires pto2_rt_submit_*_manual task APIs"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return {}; + } + return pto2_submit_mixed_task_impl(orch, mixed_kernels, args); +} + +PTO2ManualSubmitResult +pto2_submit_mixed_task_manual(PTO2OrchestratorState *orch, const MixedKernels &mixed_kernels, const Arg &args) { + PTO2ManualSubmitResult result{}; + if (!in_manual_scope(orch)) { + LOG_ERROR("manual submit APIs require PTO2_SCOPE(PTO2ScopeMode::MANUAL)"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return result; + } + PTO2TaskId task_id = PTO2TaskId::invalid(); + TaskOutputTensors outputs = pto2_submit_mixed_task_impl(orch, mixed_kernels, args, &task_id); + if (orch->fatal || !task_id.is_valid()) { + return result; + } + result.task_id = task_id; + result.outputs = outputs; + return result; +} + +PTO2ManualSubmitResult pto2_submit_mixed_task_manual_with_deps( + PTO2OrchestratorState *orch, const MixedKernels &mixed_kernels, const Arg &args, + const PTO2TaskId explicit_producer_ids[], int32_t explicit_producer_count +) { + PTO2ManualSubmitResult result{}; + if (!in_manual_scope(orch)) { + LOG_ERROR("manual submit APIs require PTO2_SCOPE(PTO2ScopeMode::MANUAL)"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return result; + } + PTO2TaskId task_id = PTO2TaskId::invalid(); + TaskOutputTensors outputs = pto2_submit_mixed_task_impl( + orch, mixed_kernels, args, &task_id, explicit_producer_ids, explicit_producer_count + ); + if (orch->fatal || !task_id.is_valid()) { + return result; + } + result.task_id = task_id; + result.outputs = outputs; + return result; +} + +void pto2_add_dependency(PTO2OrchestratorState *orch, PTO2TaskId producer_id, PTO2TaskId consumer_id) { + if (orch->fatal) { + return; + } + + if (!in_manual_scope(orch)) { + LOG_ERROR("pto2_rt_add_dependency is only valid inside PTO2_SCOPE(PTO2ScopeMode::MANUAL)"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return; + } + if (producer_id == consumer_id) { + LOG_ERROR("add_dependency does not allow self-dependency"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return; + } + int32_t producer_idx = find_current_manual_scope_task_index(orch, producer_id); + int32_t consumer_idx = find_current_manual_scope_task_index(orch, consumer_id); + if (producer_idx < 0 || consumer_idx < 0) { + LOG_ERROR("add_dependency requires producer and consumer to belong to the current manual scope"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch->fatal = true; + return; + } + + PTO2TaskSlotState *producer_slot_state = orch->scope_tasks[current_manual_scope_begin(orch) + producer_idx]; + PTO2TaskSlotState *consumer_slot_state = orch->scope_tasks[current_manual_scope_begin(orch) + consumer_idx]; + PTO2TaskPayload *consumer_payload = consumer_slot_state->payload; + + bool duplicate = false; + pto2_for_each_fanin_slot_state(*consumer_payload, [&](PTO2TaskSlotState *fanin_slot_state) { + if (fanin_slot_state == producer_slot_state) { + duplicate = true; + return false; + } + return true; + }); + if (duplicate) { + return; + } + + if (consumer_payload->fanin_actual_count >= PTO2_MAX_INPUTS) { + LOG_ERROR("========================================"); + LOG_ERROR("FATAL: Dependency Overflow Detected!"); + LOG_ERROR("========================================"); + LOG_ERROR("Task requires more than PTO2_MAX_INPUTS unique fanin dependencies."); + LOG_ERROR(" consumer_id.raw: %" PRIu64, consumer_id.raw); + LOG_ERROR(" fanin_count: %d / %d", consumer_payload->fanin_actual_count + 1, PTO2_MAX_INPUTS); + LOG_ERROR(" reason: explicit add_dependency"); + LOG_ERROR("========================================"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_DEPENDENCY_OVERFLOW, std::memory_order_release); + orch->fatal = true; + return; + } + + int32_t current_fanin_count = consumer_payload->fanin_actual_count; + if (consumer_payload->manual_explicit_fanin_count == 0) { + consumer_payload->manual_explicit_fanin_begin = current_fanin_count; + } + if (current_fanin_count < PTO2_FANIN_INLINE_CAP) { + consumer_payload->fanin_inline_slot_states[current_fanin_count] = producer_slot_state; + } else { + PTO2FaninPool *spill_pool = (consumer_payload->fanin_spill_pool != nullptr) + ? consumer_payload->fanin_spill_pool + : &orch->rings[consumer_slot_state->ring_id].fanin_pool; + PTO2FaninPool &fanin_pool = *spill_pool; + auto &fc = orch->sm_handle->header->rings[consumer_slot_state->ring_id].fc; + fanin_pool.ensure_space(*orch->scheduler, fc, consumer_slot_state->ring_id, 1); + int32_t spill_idx = fanin_pool.top; + PTO2FaninSpillEntry *entry = fanin_pool.alloc(); + if (entry == nullptr) { + orch->fatal = true; + return; + } + if (current_fanin_count == PTO2_FANIN_INLINE_CAP) { + consumer_payload->fanin_spill_start = spill_idx; + consumer_payload->fanin_spill_pool = spill_pool; + } else if (consumer_payload->fanin_spill_pool == nullptr) { + consumer_payload->fanin_spill_pool = spill_pool; + } + entry->slot_state = producer_slot_state; + } + + consumer_payload->fanin_actual_count = current_fanin_count + 1; + consumer_payload->manual_explicit_fanin_count += 1; + consumer_slot_state->fanin_count += 1; +} + // ============================================================================= // Flow Control // ============================================================================= diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index 9db96eaa..0c8e2041 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -63,8 +63,10 @@ struct PTO2OrchestratorState { int32_t scope_tasks_size; // Number of task IDs currently in the buffer int32_t scope_tasks_capacity; // Allocated capacity of scope_tasks int32_t *scope_begins; // scope_begins[i] = start index of scope i in scope_tasks - int32_t scope_stack_top; // Current top of stack (-1 = no scope open) - uint64_t scope_stack_capacity; // Max nesting depth (PTO2_MAX_SCOPE_DEPTH) + int32_t scope_stack_top; // Current top of stack (-1 = no scope open) + uint64_t scope_stack_capacity; // Max nesting depth (PTO2_MAX_SCOPE_DEPTH) + bool manual_scope_active{false}; + bool manual_scope_needs_dep_pool_repair{false}; // === SCHEDULER REFERENCE === // Note: In simulated mode, orchestrator and scheduler share address space @@ -93,7 +95,6 @@ struct PTO2OrchestratorState { // The executor adds this count into its completed_tasks_ progress counter // after orchestration finishes so shutdown/profiling totals remain closed. int64_t inline_completed_tasks{0}; - // === STATISTICS === #if PTO2_PROFILING int64_t tasks_submitted; @@ -151,7 +152,7 @@ void pto2_orchestrator_set_scheduler(PTO2OrchestratorState *orch, PTO2SchedulerS * Tasks submitted while this scope is at the top of the stack are * owned by it and have their fanout_count initialized to 1. */ -void pto2_scope_begin(PTO2OrchestratorState *orch); +void pto2_scope_begin(PTO2OrchestratorState *orch, PTO2ScopeMode mode = PTO2ScopeMode::AUTO); /** * End current scope @@ -190,6 +191,14 @@ pto2_submit_mixed_task(PTO2OrchestratorState *orch, const MixedKernels &mixed_ke * task id for scope lifetime and future creator-retention dependencies. */ TaskOutputTensors pto2_alloc_tensors(PTO2OrchestratorState *orch, const Arg &args); +PTO2ManualSubmitResult +pto2_submit_mixed_task_manual(PTO2OrchestratorState *orch, const MixedKernels &mixed_kernels, const Arg &args); +PTO2ManualSubmitResult pto2_submit_mixed_task_manual_with_deps( + PTO2OrchestratorState *orch, const MixedKernels &mixed_kernels, const Arg &args, + const PTO2TaskId explicit_producer_ids[], int32_t explicit_producer_count +); + +void pto2_add_dependency(PTO2OrchestratorState *orch, PTO2TaskId producer, PTO2TaskId consumer); // ============================================================================= // Flow Control diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h index beb634ad..492fdead 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h @@ -521,6 +521,41 @@ inline PTO2FaninForEachReturn pto2_for_each_fanin_slot_state(const PTO2TaskP ); } +template +inline PTO2FaninForEachReturn +pto2_for_each_fanin_slot_state_range(const PTO2TaskPayload &payload, int32_t begin, int32_t count, Fn &&fn) { + if (count <= 0) { + return true; + } + always_assert(begin >= 0 && begin + count <= payload.fanin_actual_count); + + int32_t inline_count = std::min(payload.fanin_actual_count, PTO2_FANIN_INLINE_CAP); + int32_t inline_begin = std::min(begin, inline_count); + int32_t inline_end = std::min(begin + count, inline_count); + for (int32_t i = inline_begin; i < inline_end; i++) { + if (!fn(payload.fanin_inline_slot_states[i])) { + return false; + } + } + + int32_t spill_begin = std::max(begin, inline_count); + int32_t spill_end = begin + count; + if (spill_begin >= spill_end) { + return true; + } + + PTO2FaninPool *pool = payload.fanin_spill_pool; + always_assert(pool != nullptr && "fanin spill pool must exist when iterating spilled fanins"); + for (int32_t i = spill_begin; i < spill_end; i++) { + int32_t spill_offset = i - inline_count; + int32_t spill_idx = (payload.fanin_spill_start + spill_offset) % pool->capacity; + if (!fn(pool->base[spill_idx].slot_state)) { + return false; + } + } + return true; +} + // ============================================================================= // Dependency List Pool // ============================================================================= diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp index 8085ed63..c57935a5 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp @@ -45,7 +45,26 @@ static TaskOutputTensors alloc_tensors_impl(PTO2Runtime *rt, const Arg &args) { return pto2_alloc_tensors(&rt->orchestrator, args); } -void pto2_rt_scope_begin(PTO2Runtime *rt) { pto2_scope_begin(&rt->orchestrator); } +PTO2ManualSubmitResult pto2_rt_submit_task_manual(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args) { + return pto2_submit_mixed_task_manual(&rt->orchestrator, mixed_kernels, args); +} + +PTO2ManualSubmitResult pto2_rt_submit_task_manual_with_deps( + PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, const PTO2TaskId explicit_producer_ids[], + int32_t explicit_producer_count +) { + return pto2_submit_mixed_task_manual_with_deps( + &rt->orchestrator, mixed_kernels, args, explicit_producer_ids, explicit_producer_count + ); +} + +void pto2_rt_add_dependency(PTO2Runtime *rt, PTO2TaskId producer, PTO2TaskId consumer) { + pto2_add_dependency(&rt->orchestrator, producer, consumer); +} + +void pto2_rt_scope_begin(PTO2Runtime *rt, PTO2ScopeMode mode) { + pto2_scope_begin(&rt->orchestrator, mode); +} void pto2_rt_scope_end(PTO2Runtime *rt) { pto2_scope_end(&rt->orchestrator); } @@ -53,6 +72,20 @@ void pto2_rt_orchestration_done(PTO2Runtime *rt) { pto2_orchestrator_done(&rt->o static bool is_fatal_impl(PTO2Runtime *rt) { return rt->orchestrator.fatal; } +static bool in_manual_scope_runtime(PTO2Runtime *rt) { + return rt->orchestrator.manual_scope_active; +} + +static void fail_manual_tensor_access(PTO2Runtime *rt, const char *caller) { + PTO2OrchestratorState &orch = rt->orchestrator; + orch.sm_handle->header->orch_error_code.store(PTO2_ERROR_INVALID_ARGS, std::memory_order_release); + orch.fatal = true; + unified_log_error( + caller, + "blocking tensor data access is not supported inside PTO2_SCOPE(PTO2ScopeMode::MANUAL); exit the manual scope first" + ); +} + // Wait for all producers of this tensor to be safe for data access. // Checks owner metadata (lifecycle anchor) and OverlapMap (modifier writers). // For reads: wait until each producer COMPLETED (done writing). @@ -137,6 +170,10 @@ static bool wait_for_tensor_ready(PTO2Runtime *rt, const Tensor &tensor, bool wa MAYBE_UNINITIALIZED_END uint64_t pto2_get_tensor_data(PTO2Runtime *rt, const Tensor &tensor, uint32_t ndims, const uint32_t indices[]) { + if (in_manual_scope_runtime(rt)) { + fail_manual_tensor_access(rt, __FUNCTION__); + return 0; + } if (tensor.buffer.addr == 0) { unified_log_error( __FUNCTION__, "get_tensor_data: buffer not allocated (addr=0). " @@ -160,6 +197,10 @@ uint64_t pto2_get_tensor_data(PTO2Runtime *rt, const Tensor &tensor, uint32_t nd void pto2_set_tensor_data( PTO2Runtime *rt, const Tensor &tensor, uint32_t ndims, const uint32_t indices[], uint64_t value ) { + if (in_manual_scope_runtime(rt)) { + fail_manual_tensor_access(rt, __FUNCTION__); + return; + } if (tensor.buffer.addr == 0) { unified_log_error( __FUNCTION__, "set_tensor_data: buffer not allocated (addr=0). " @@ -181,6 +222,9 @@ void pto2_set_tensor_data( static const PTO2RuntimeOps s_runtime_ops = { .submit_task = submit_task_impl, + .submit_task_manual = pto2_rt_submit_task_manual, + .submit_task_manual_with_deps = pto2_rt_submit_task_manual_with_deps, + .add_dependency = pto2_rt_add_dependency, .scope_begin = pto2_rt_scope_begin, .scope_end = pto2_rt_scope_end, .orchestration_done = pto2_rt_orchestration_done, diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h index 779b7514..cd900b9f 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h @@ -67,7 +67,13 @@ typedef struct PTO2Runtime PTO2Runtime; // forward declare for ops signatures struct PTO2RuntimeOps { TaskOutputTensors (*submit_task)(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args); - void (*scope_begin)(PTO2Runtime *rt); + PTO2ManualSubmitResult (*submit_task_manual)(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args); + PTO2ManualSubmitResult (*submit_task_manual_with_deps)( + PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, const PTO2TaskId explicit_producer_ids[], + int32_t explicit_producer_count + ); + void (*add_dependency)(PTO2Runtime *rt, PTO2TaskId producer, PTO2TaskId consumer); + void (*scope_begin)(PTO2Runtime *rt, PTO2ScopeMode mode); void (*scope_end)(PTO2Runtime *rt); void (*orchestration_done)(PTO2Runtime *rt); bool (*is_fatal)(PTO2Runtime *rt); @@ -176,7 +182,7 @@ void pto2_runtime_set_mode(PTO2Runtime *rt, PTO2RuntimeMode mode); * bounded by the scope. When scope_end() is called, the scope * releases its reference to all enclosed tasks. */ -void pto2_rt_scope_begin(PTO2Runtime *rt); +void pto2_rt_scope_begin(PTO2Runtime *rt, PTO2ScopeMode mode = PTO2ScopeMode::AUTO); /** * End current scope @@ -186,6 +192,14 @@ void pto2_rt_scope_begin(PTO2Runtime *rt); */ void pto2_rt_scope_end(PTO2Runtime *rt); +PTO2ManualSubmitResult pto2_rt_submit_task_manual(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args); +PTO2ManualSubmitResult pto2_rt_submit_task_manual_with_deps( + PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, const PTO2TaskId explicit_producer_ids[], + int32_t explicit_producer_count +); + +void pto2_rt_add_dependency(PTO2Runtime *rt, PTO2TaskId producer, PTO2TaskId consumer); + /** * Mark orchestration as complete * diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h index 247f09fe..7f0c006b 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h @@ -80,6 +80,7 @@ #define PTO2_ERROR_DEP_POOL_OVERFLOW 4 #define PTO2_ERROR_INVALID_ARGS 5 // Arg construction error (invalid args) #define PTO2_ERROR_DEPENDENCY_OVERFLOW 6 // Too many unique fanin dependencies for one task +#define PTO2_ERROR_OUT_OF_MEMORY 7 // Runtime metadata buffer growth failed // Scheduler errors (100+): detected in scheduler threads #define PTO2_ERROR_SCHEDULER_TIMEOUT 100 @@ -135,6 +136,20 @@ constexpr uint64_t PTO2_TENSOR_DATA_TIMEOUT_CYCLES = 15 * 1000 * 1000 * 1000ULL; * TaskId: defined in pto_task_id.h (included above). */ +// ============================================================================= +// Manual Scope Types +// ============================================================================= + +enum class PTO2ScopeMode : uint8_t { + AUTO = 0, + MANUAL = 1, +}; + +struct PTO2ManualSubmitResult { + PTO2TaskId task_id; + TaskOutputTensors outputs; +}; + // ============================================================================= // Worker Types // ============================================================================= @@ -362,6 +377,8 @@ struct PTO2TaskPayload { int32_t fanin_actual_count{0}; // Actual fanin count (without the +1 redundance) int32_t fanin_spill_start{0}; // Linear start index in fanin spill pool (0 = no spill) PTO2FaninPool *fanin_spill_pool{nullptr}; + int32_t manual_explicit_fanin_begin{0}; // First explicit manual edge appended after implicit fanins + int32_t manual_explicit_fanin_count{0}; // Number of explicit manual edges appended to the tail PTO2TaskSlotState *fanin_inline_slot_states[PTO2_FANIN_INLINE_CAP]; // === Cache lines 3-34 (2048B) — tensors (alignas(64) forces alignment) === Tensor tensors[MAX_TENSOR_ARGS]; @@ -398,7 +415,6 @@ struct PTO2TaskPayload { ); result.materialize_output(tensors[i]); } - tensors[i].update_start_offset(); } // Round up to cache line boundary. Both arrays are 1024B so no overrun. // Eliminates branches; extra bytes within the same CL have zero additional cost. @@ -408,7 +424,7 @@ struct PTO2TaskPayload { // PTO2TaskPayload layout verification (offsetof requires complete type). static_assert( - offsetof(PTO2TaskPayload, fanin_inline_slot_states) == 24, "inline fanin array must follow spill metadata" + offsetof(PTO2TaskPayload, fanin_inline_slot_states) == 32, "inline fanin array must follow spill metadata" ); static_assert(offsetof(PTO2TaskPayload, tensors) == 192, "tensors must start at byte 192 (cache line 3)"); static_assert( diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h index 0c3f5a0f..bd451a22 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h @@ -653,6 +653,44 @@ struct PTO2SchedulerState { #endif } + void publish_manual_scope_tasks(PTO2TaskSlotState **task_slot_states, int32_t count) { + for (int32_t i = 0; i < count; i++) { + PTO2TaskSlotState &slot_state = *task_slot_states[i]; + int32_t new_rc = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; + if (new_rc >= slot_state.fanin_count) { + PTO2ResourceShape shape = pto2_active_mask_to_shape(slot_state.active_mask); + ready_queues[static_cast(shape)].push(&slot_state); + } + } + } + + void publish_manual_scope_tasks_and_end_scope(PTO2TaskSlotState **task_slot_states, int32_t count) { +#if PTO2_ORCH_PROFILING + extern uint64_t g_orch_scope_end_atomic_count; +#endif + if (count > 0) __builtin_prefetch(task_slot_states[0], 1, 0); + for (int32_t i = 0; i < count; i++) { + if (i + 1 < count) __builtin_prefetch(task_slot_states[i + 1], 1, 0); + PTO2TaskSlotState &slot_state = *task_slot_states[i]; + int32_t new_rc = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; +#if PTO2_ORCH_PROFILING + g_orch_scope_end_atomic_count += 1; // fanin_refcount.fetch_add +#endif + if (new_rc >= slot_state.fanin_count) { + PTO2ResourceShape shape = pto2_active_mask_to_shape(slot_state.active_mask); +#if PTO2_ORCH_PROFILING + g_orch_scope_end_atomic_count += 1; // ready queue push lock/CAS path +#endif + ready_queues[static_cast(shape)].push(&slot_state); + } +#if PTO2_ORCH_PROFILING + release_producer(slot_state, g_orch_scope_end_atomic_count); +#else + release_producer(slot_state); +#endif + } + } + /** * Subtask completion: atomic counter model. * Called when a single subtask (AIC, AIV0, or AIV1) finishes on any block. diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/tensor.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/tensor.h index 926bf667..731b16b3 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/tensor.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/tensor.h @@ -58,7 +58,7 @@ struct Segment { * * Layout (64B) is aligned with Tensor cacheline 1 so that * init_from_create_info() can copy the entire cacheline with a single memcpy, - * then overwrite buffer/owner metadata and refresh start_offset later. + * then overwrite buffer/owner metadata while keeping start_offset at zero. * * Arg::add_output() stores a pointer to this object, so the original * must remain valid (not a temporary) until after the submit call. @@ -101,7 +101,7 @@ class alignas(64) TensorCreateInfo { // --- Bytes [0, 32): TensorCreateInfo-only fields --- // These occupy the same positions as Tensor::buffer, Tensor::owner_task_id, // and Tensor::start_offset. The runtime overwrites owner metadata after the - // memcpy and refreshes start_offset during payload materialization. + // memcpy and keeps start_offset at zero for fresh contiguous outputs. uint64_t initial_value; bool has_initial_value; uint8_t __pad1__[7]; @@ -210,6 +210,7 @@ struct alignas(64) Tensor { } } owner_task_id = PTO2TaskId::invalid(); + update_start_offset(); } void init(const Tensor &other) { @@ -264,6 +265,7 @@ struct alignas(64) Tensor { } is_all_offset_zero = all_zero; owner_task_id = other.owner_task_id; + update_start_offset(); } /// Compute 1D flat element offset from multi-dimensional indices. @@ -288,6 +290,7 @@ struct alignas(64) Tensor { memcpy(this, &ci, 64); buffer = {reinterpret_cast(addr), buffer_size}; owner_task_id = PTO2TaskId::invalid(); // caller (orchestrator) overwrites with actual task_id + start_offset = 0; if (ci.has_initial_value) { fill_initial_value(ci.initial_value); } diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_guard_negative/golden.py b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_guard_negative/golden.py new file mode 100644 index 00000000..0f19662d --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_guard_negative/golden.py @@ -0,0 +1,35 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- + +import ctypes + +import torch + + +ALL_CASES = { + "NestedManualScope": {"mode": 1}, + "ManualGetTensorData": {"mode": 2}, + "ManualSetTensorData": {"mode": 3}, + "ManualSelfDependency": {"mode": 4}, +} + +DEFAULT_CASE = "NestedManualScope" +__outputs__ = ["tensor"] + + +def generate_inputs(params: dict) -> list: + tensor = torch.arange(16, dtype=torch.float32) + return [ + ("tensor", tensor), + ("mode", ctypes.c_uint64(params["mode"])), + ] + + +def compute_golden(tensors: dict, params: dict) -> None: + del tensors, params diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_guard_negative/kernels/kernel_config.py b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_guard_negative/kernels/kernel_config.py new file mode 100644 index 00000000..358bf59f --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_guard_negative/kernels/kernel_config.py @@ -0,0 +1,36 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- + +from pathlib import Path + +from task_interface import ArgDirection as D # pyright: ignore[reportAttributeAccessIssue] + +_KERNELS_ROOT = Path(__file__).parent +_SCALAR_DATA_ROOT = _KERNELS_ROOT.parents[1] / "scalar_data_test" / "kernels" + +ORCHESTRATION = { + "source": str(_KERNELS_ROOT / "orchestration" / "manual_scope_guard_orch.cpp"), + "function_name": "aicpu_orchestration_entry", + "signature": [D.IN], +} + +KERNELS = [ + { + "func_id": 0, + "source": str(_SCALAR_DATA_ROOT / "aiv" / "kernel_noop.cpp"), + "core_type": "aiv", + "signature": [], + }, +] + +RUNTIME_CONFIG = { + "runtime": "tensormap_and_ringbuffer", + "aicpu_thread_num": 4, + "block_dim": 3, +} diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_guard_negative/kernels/orchestration/manual_scope_guard_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_guard_negative/kernels/orchestration/manual_scope_guard_orch.cpp new file mode 100644 index 00000000..f9a37ddd --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_guard_negative/kernels/orchestration/manual_scope_guard_orch.cpp @@ -0,0 +1,62 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +#include + +#include "pto_orchestration_api.h" // NOLINT(build/include_subdir) + +#define FUNC_NOOP 0 + +extern "C" { + +__attribute__((visibility("default"))) PTO2OrchestrationConfig aicpu_orchestration_config( + const ChipStorageTaskArgs &orch_args) { + (void)orch_args; // NOLINT(readability/casting) + return PTO2OrchestrationConfig{ + .expected_arg_count = 2, + }; +} + +__attribute__((visibility("default"))) void aicpu_orchestration_entry( + const ChipStorageTaskArgs &orch_args, int orch_thread_num, int orch_thread_index) { + (void)orch_thread_num; // NOLINT(readability/casting) + (void)orch_thread_index; // NOLINT(readability/casting) + + Tensor tensor = from_tensor_arg(orch_args.tensor(0)); + uint64_t mode = orch_args.scalar(0); + uint32_t idx[1] = {0}; + + switch (mode) { + case 1: + PTO2_SCOPE(PTO2ScopeMode::MANUAL) { + PTO2_SCOPE(PTO2ScopeMode::MANUAL) {} + } + break; + case 2: + PTO2_SCOPE(PTO2ScopeMode::MANUAL) { + (void)get_tensor_data(tensor, 1, idx); // NOLINT(readability/casting) + } + break; + case 3: + PTO2_SCOPE(PTO2ScopeMode::MANUAL) { set_tensor_data(tensor, 1, idx, 1.0f); } + break; + case 4: + PTO2_SCOPE(PTO2ScopeMode::MANUAL) { + PTO2TaskId invalid = PTO2TaskId::invalid(); + pto2_rt_add_dependency(invalid, invalid); + } + break; + default: + PTO2_SCOPE() {} + break; + } +} +} diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_outer_multiwrite/golden.py b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_outer_multiwrite/golden.py new file mode 100644 index 00000000..01910280 --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_outer_multiwrite/golden.py @@ -0,0 +1,46 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- + +import torch + + +__outputs__ = ["out", "result", "check"] + +RTOL = 1e-5 +ATOL = 1e-5 + + +def generate_inputs(params: dict) -> list: + del params + size = 128 * 128 + a = torch.full((size,), 1.0, dtype=torch.float32) + b = torch.full((size,), 2.0, dtype=torch.float32) + out = torch.zeros(size, dtype=torch.float32) + result = torch.zeros(size, dtype=torch.float32) + check = torch.zeros(4, dtype=torch.float32) + return [ + ("a", a), + ("b", b), + ("out", out), + ("result", result), + ("check", check), + ] + + +def compute_golden(tensors: dict, params: dict) -> None: + del params + out = torch.as_tensor(tensors["out"]) + result = torch.as_tensor(tensors["result"]) + check = torch.as_tensor(tensors["check"]) + + out.fill_(5.0) + result.fill_(7.0) + check[0] = 5.0 + check[1] = 7.0 + check[2] = 5.0 diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_outer_multiwrite/kernels/kernel_config.py b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_outer_multiwrite/kernels/kernel_config.py new file mode 100644 index 00000000..81bbb546 --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_outer_multiwrite/kernels/kernel_config.py @@ -0,0 +1,36 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- + +from pathlib import Path + +from task_interface import ArgDirection as D # pyright: ignore[reportAttributeAccessIssue] + +_KERNELS_ROOT = Path(__file__).parent +_SCALAR_DATA_ROOT = _KERNELS_ROOT.parents[1] / "scalar_data_test" / "kernels" + +ORCHESTRATION = { + "source": str(_KERNELS_ROOT / "orchestration" / "manual_scope_outer_multiwrite_orch.cpp"), + "function_name": "aicpu_orchestration_entry", + "signature": [D.IN, D.IN, D.OUT, D.OUT, D.OUT], +} + +KERNELS = [ + { + "func_id": 0, + "source": str(_SCALAR_DATA_ROOT / "aiv" / "kernel_add.cpp"), + "core_type": "aiv", + "signature": [D.IN, D.IN, D.OUT], + }, +] + +RUNTIME_CONFIG = { + "runtime": "tensormap_and_ringbuffer", + "aicpu_thread_num": 4, + "block_dim": 3, +} diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_outer_multiwrite/kernels/orchestration/manual_scope_outer_multiwrite_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_outer_multiwrite/kernels/orchestration/manual_scope_outer_multiwrite_orch.cpp new file mode 100644 index 00000000..7d363f9b --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/manual_scope_outer_multiwrite/kernels/orchestration/manual_scope_outer_multiwrite_orch.cpp @@ -0,0 +1,94 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +#include + +#include "pto_orchestration_api.h" // NOLINT(build/include_subdir) + +#define FUNC_ADD 0 + +extern "C" { + +__attribute__((visibility("default"))) PTO2OrchestrationConfig aicpu_orchestration_config( + const ChipStorageTaskArgs &orch_args) { + (void)orch_args; // NOLINT(readability/casting) + return PTO2OrchestrationConfig{ + .expected_arg_count = 5, + }; +} + +__attribute__((visibility("default"))) void aicpu_orchestration_entry( + const ChipStorageTaskArgs &orch_args, int orch_thread_num, int orch_thread_index) { + (void)orch_thread_num; // NOLINT(readability/casting) + (void)orch_thread_index; // NOLINT(readability/casting) + + Tensor ext_a = from_tensor_arg(orch_args.tensor(0)); + Tensor ext_b = from_tensor_arg(orch_args.tensor(1)); + Tensor ext_out = from_tensor_arg(orch_args.tensor(2)); + Tensor ext_result = from_tensor_arg(orch_args.tensor(3)); + Tensor ext_check = from_tensor_arg(orch_args.tensor(4)); + + uint32_t size = orch_args.tensor(0).shapes[0]; + uint32_t inter_shapes[1] = {size}; + TensorCreateInfo inter_ci(inter_shapes, 1, DataType::FLOAT32); + + PTO2_SCOPE() { + PTO2_SCOPE(PTO2ScopeMode::MANUAL) { + Arg tmp0_args; + tmp0_args.add_input(ext_a); + tmp0_args.add_input(ext_a); + tmp0_args.add_output(inter_ci); + PTO2ManualSubmitResult tmp0 = pto2_rt_submit_aiv_task_manual(FUNC_ADD, tmp0_args); + + Arg write0_args; + write0_args.add_input(tmp0.outputs.get_ref(0)); + write0_args.add_input(ext_a); + write0_args.add_output(ext_out); + PTO2ManualSubmitResult write0 = pto2_rt_submit_aiv_task_manual(FUNC_ADD, write0_args); + pto2_rt_add_dependency(tmp0.task_id, write0.task_id); + + Arg tmp1_args; + tmp1_args.add_input(ext_b); + tmp1_args.add_input(ext_b); + tmp1_args.add_output(inter_ci); + PTO2ManualSubmitResult tmp1 = pto2_rt_submit_aiv_task_manual(FUNC_ADD, tmp1_args); + + Arg write1_args; + write1_args.add_input(tmp1.outputs.get_ref(0)); + write1_args.add_input(ext_a); + write1_args.add_output(ext_out); + PTO2ManualSubmitResult write1 = pto2_rt_submit_aiv_task_manual(FUNC_ADD, write1_args); + pto2_rt_add_dependency(tmp1.task_id, write1.task_id); + pto2_rt_add_dependency(write0.task_id, write1.task_id); + } + + Arg consumer_args; + consumer_args.add_input(ext_out); + consumer_args.add_input(ext_b); + consumer_args.add_output(ext_result); + pto2_rt_submit_aiv_task(FUNC_ADD, consumer_args); + + uint32_t idx0[1] = {0}; + uint32_t idx100[1] = {100}; + + float out0 = get_tensor_data(ext_out, 1, idx0); + float result0 = get_tensor_data(ext_result, 1, idx0); + float out100 = get_tensor_data(ext_out, 1, idx100); + + idx0[0] = 0; + set_tensor_data(ext_check, 1, idx0, out0); + idx0[0] = 1; + set_tensor_data(ext_check, 1, idx0, result0); + idx0[0] = 2; + set_tensor_data(ext_check, 1, idx0, out100); + } +} +} diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/golden.py b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/golden.py new file mode 100644 index 00000000..89df9622 --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/golden.py @@ -0,0 +1,7 @@ +from pathlib import Path +import sys + +_BASE = Path(__file__).resolve().parents[1] / "paged_attention" +sys.path.insert(0, str(_BASE)) + +from golden import ALL_CASES, ATOL, DEFAULT_CASE, RTOL, __outputs__, compute_golden, generate_inputs # noqa: E402,F401 diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/aic/aic_hub.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/aic/aic_hub.cpp new file mode 100644 index 00000000..0b3062f1 --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/aic/aic_hub.cpp @@ -0,0 +1,14 @@ +#include +#include + +using namespace pto; + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +extern "C" __aicore__ void kernel_entry(__gm__ int64_t *args) { (void)args; } diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/aiv/aiv_hub.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/aiv/aiv_hub.cpp new file mode 100644 index 00000000..0b3062f1 --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/aiv/aiv_hub.cpp @@ -0,0 +1,14 @@ +#include +#include + +using namespace pto; + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +extern "C" __aicore__ void kernel_entry(__gm__ int64_t *args) { (void)args; } diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/kernel_config.py b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/kernel_config.py new file mode 100644 index 00000000..80765faa --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/kernel_config.py @@ -0,0 +1,71 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from pathlib import Path + +from task_interface import ArgDirection as D # pyright: ignore[reportAttributeAccessIssue] + +_ROOT = Path(__file__).parent +_PA_KERNELS = _ROOT.parent.parent / "paged_attention" / "kernels" + +ORCHESTRATION = { + "source": str(_ROOT / "orchestration" / "paged_attention_orch.cpp"), + "function_name": "aicpu_orchestration_entry", + "signature": [D.IN, D.IN, D.IN, D.IN, D.IN, D.OUT], +} + +KERNELS = [ + { + "func_id": 0, + "name": "QK", + "source": str(_PA_KERNELS / "aic" / "aic_qk_matmul.cpp"), + "core_type": "aic", + "signature": [D.IN, D.IN, D.OUT], + }, + { + "func_id": 2, + "name": "PV", + "source": str(_PA_KERNELS / "aic" / "aic_pv_matmul.cpp"), + "core_type": "aic", + "signature": [D.IN, D.IN, D.OUT], + }, + { + "func_id": 4, + "name": "AIC_HUB", + "source": str(_ROOT / "aic" / "aic_hub.cpp"), + "core_type": "aic", + "signature": [], + }, + { + "func_id": 1, + "name": "SF", + "source": str(_PA_KERNELS / "aiv" / "aiv_softmax_prepare.cpp"), + "core_type": "aiv", + "signature": [D.IN, D.OUT, D.OUT, D.OUT], + }, + { + "func_id": 3, + "name": "UP", + "source": str(_PA_KERNELS / "aiv" / "aiv_online_update.cpp"), + "core_type": "aiv", + "signature": [D.IN, D.IN, D.IN, D.INOUT, D.INOUT, D.INOUT, D.INOUT], + }, + { + "func_id": 5, + "name": "AIV_HUB", + "source": str(_ROOT / "aiv" / "aiv_hub.cpp"), + "core_type": "aiv", + "signature": [], + }, +] + +RUNTIME_CONFIG = { + "runtime": "tensormap_and_ringbuffer", + "aicpu_thread_num": 4, + "block_dim": 24, +} diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/orchestration/paged_attention_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/orchestration/paged_attention_orch.cpp new file mode 100644 index 00000000..50a9c2d2 --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_partial_manual/kernels/orchestration/paged_attention_orch.cpp @@ -0,0 +1,246 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +#include +#include +#include + +#include "pto_orchestration_api.h" // NOLINT(build/include_subdir) + +#define FUNC_QK_MATMUL 0 +#define FUNC_SOFTMAX_PREPARE 1 +#define FUNC_PV_MATMUL 2 +#define FUNC_ONLINE_UPDATE 3 +#define FUNC_AIC_HUB 4 +#define FUNC_AIV_HUB 5 +constexpr uint64_t PLATFORM_PROF_SYS_CNT_FREQ = 50000000; // 50 MHz + +inline double cycles_to_us(uint64_t cycles) { + return (static_cast(cycles) / PLATFORM_PROF_SYS_CNT_FREQ) * 1000000.0; +} + +inline uint64_t get_sys_cnt_aicpu() { + uint64_t ticks; + asm volatile("mrs %0, cntvct_el0" : "=r"(ticks)); + return ticks; +} + +#define CYCLE_COUNT_START() uint64_t _t0 = get_sys_cnt_aicpu(), _t1 +#define CYCLE_COUNT_LAP(acc) \ + do { \ + _t1 = get_sys_cnt_aicpu(); \ + acc += (_t1 - _t0); \ + _t0 = _t1; \ + } while (0) +extern "C" { + +__attribute__((visibility("default"))) PTO2OrchestrationConfig +aicpu_orchestration_config(const ChipStorageTaskArgs &orch_args) { + (void)orch_args; // NOLINT(readability/casting) + return PTO2OrchestrationConfig{ + .expected_arg_count = 7, + }; +} + +__attribute__((visibility("default"))) void +aicpu_orchestration_entry(const ChipStorageTaskArgs &orch_args, int orch_thread_num, int orch_thread_index) { + (void)orch_thread_num; // NOLINT(readability/casting) + (void)orch_thread_index; // NOLINT(readability/casting) + uint64_t prof_submit_manual = 0; + uint64_t prof_add_dependency = 0; + uint64_t prof_manual_scope_close = 0; + uint64_t prof_view = 0; + int prof_submit_manual_count = 0; + int prof_add_dependency_count = 0; + int prof_manual_scope_close_count = 0; + CYCLE_COUNT_START(); + + uint64_t batch = orch_args.tensor(0).shapes[0]; + uint64_t num_heads = orch_args.tensor(0).shapes[1]; + uint64_t head_dim = orch_args.tensor(0).shapes[2]; + DataType data_type = orch_args.tensor(0).dtype; + uint64_t block_size = orch_args.tensor(1).shapes[1]; + uint64_t block_num = orch_args.tensor(3).shapes[1]; + uint64_t scale_value = orch_args.scalar(0); + + uint64_t q_head_num = num_heads; + uint64_t q_tile = std::min(num_heads, 128UL); + uint64_t q_loop = (q_head_num + q_tile - 1) / q_tile; + + void *query_ptr = orch_args.tensor(0).data_as(); + void *kc_ptr = orch_args.tensor(1).data_as(); + void *vc_ptr = orch_args.tensor(2).data_as(); + void *out_ptr = orch_args.tensor(5).data_as(); + + uint64_t total_blocks_count = orch_args.tensor(1).shapes[0]; + + uint32_t query_shapes[2] = {static_cast(batch * num_heads), static_cast(head_dim)}; + uint32_t key_cache_shapes[2] = { + static_cast(total_blocks_count * block_size), static_cast(head_dim) + }; + uint32_t value_cache_shapes[2] = { + static_cast(total_blocks_count * block_size), static_cast(head_dim) + }; + uint32_t out_shapes[2] = {static_cast(batch * num_heads), static_cast(head_dim)}; + Tensor query = make_tensor_external(query_ptr, query_shapes, 2, data_type, true); + Tensor key_cache = make_tensor_external(kc_ptr, key_cache_shapes, 2, data_type, true); + Tensor value_cache = make_tensor_external(vc_ptr, value_cache_shapes, 2, data_type, true); + Tensor out = make_tensor_external(out_ptr, out_shapes, 2, DataType::FLOAT32, true); + + int *host_block_table = orch_args.tensor(3).data_as(); + int *host_context_lens = orch_args.tensor(4).data_as(); + + uint32_t tile2d_shapes[2] = {static_cast(q_tile), static_cast(head_dim)}; + uint32_t scalar_shapes[1] = {static_cast(q_tile)}; + uint32_t sij_shapes[2] = {static_cast(q_tile), static_cast(block_size)}; + TensorCreateInfo tile2d_ci(tile2d_shapes, 2, DataType::FLOAT32); + TensorCreateInfo scalar_ci(scalar_shapes, 1, DataType::FLOAT32); + TensorCreateInfo sij_ci(sij_shapes, 2, DataType::FLOAT32); + TensorCreateInfo pij_f16_ci(sij_shapes, 2, data_type); + + for (uint64_t b_idx = 0; b_idx < batch; b_idx++) { + uint64_t cur_seq = host_context_lens[b_idx]; + uint64_t bn_this_batch = (cur_seq + block_size - 1) / block_size; + for (uint64_t q_idx = 0; q_idx < q_loop; q_idx++) { + PTO2_SCOPE() { + uint64_t cur_offset = b_idx * q_head_num + q_idx * q_tile; + + uint32_t qi_offsets[2] = {static_cast(cur_offset), 0}; + uint32_t out_view_offsets[2] = {static_cast(cur_offset), 0}; + Tensor qi = query.view(tile2d_shapes, qi_offsets, true); + Tensor out_view = out.view(tile2d_shapes, out_view_offsets, true); + CYCLE_COUNT_LAP(prof_view); + + PTO2_SCOPE(PTO2ScopeMode::MANUAL) { + Arg params_inplace; + params_inplace.add_output(tile2d_ci); + params_inplace.add_output(scalar_ci); + params_inplace.add_output(scalar_ci); + PTO2ManualSubmitResult hub_outs = pto2_rt_submit_aiv_task_manual(FUNC_AIV_HUB, params_inplace); + prof_submit_manual_count++; + CYCLE_COUNT_LAP(prof_submit_manual); + const Tensor &oi = hub_outs.outputs.get_ref(0); + const Tensor &li_update = hub_outs.outputs.get_ref(1); + const Tensor &mi_update = hub_outs.outputs.get_ref(2); + PTO2TaskId prev_update_task = hub_outs.task_id; + + for (uint64_t bn = 0; bn < bn_this_batch; bn++) { + uint64_t cur_block_idx = host_block_table[b_idx * block_num + bn]; + uint64_t valid_len = std::min(block_size, cur_seq - bn * block_size); + + uint32_t kv_shapes[2] = { + static_cast(block_size), static_cast(head_dim) + }; + uint32_t kv_offsets[2] = {static_cast(cur_block_idx * block_size), 0}; + Tensor kj = key_cache.view(kv_shapes, kv_offsets, true); + Tensor vj = value_cache.view(kv_shapes, kv_offsets, true); + CYCLE_COUNT_LAP(prof_view); + + Arg params_qk; + params_qk.add_input(qi); + params_qk.add_input(kj); + params_qk.add_output(sij_ci); + PTO2ManualSubmitResult qk_outs = pto2_rt_submit_aic_task_manual(FUNC_QK_MATMUL, params_qk); + prof_submit_manual_count++; + CYCLE_COUNT_LAP(prof_submit_manual); + const Tensor &sij = qk_outs.outputs.get_ref(0); + + uint32_t sij_valid_shapes[2] = { + static_cast(q_tile), static_cast(valid_len) + }; + uint32_t sij_valid_offsets[2] = {0, 0}; + Tensor sij_valid = sij.view(sij_valid_shapes, sij_valid_offsets); + CYCLE_COUNT_LAP(prof_view); + + Arg params_sf; + params_sf.add_input(sij_valid); + params_sf.add_output(pij_f16_ci); + params_sf.add_output(scalar_ci); + params_sf.add_output(scalar_ci); + params_sf.add_scalar(scale_value); + PTO2ManualSubmitResult sf_outs = pto2_rt_submit_aiv_task_manual_with_deps( + FUNC_SOFTMAX_PREPARE, params_sf, {qk_outs.task_id} + ); + prof_submit_manual_count++; + CYCLE_COUNT_LAP(prof_submit_manual); + const Tensor &pij_f16 = sf_outs.outputs.get_ref(0); + const Tensor &mi = sf_outs.outputs.get_ref(1); + const Tensor &li = sf_outs.outputs.get_ref(2); + + Arg params_pv; + params_pv.add_input(pij_f16); + params_pv.add_input(vj); + params_pv.add_output(tile2d_ci); + PTO2ManualSubmitResult pv_outs = + pto2_rt_submit_aic_task_manual_with_deps(FUNC_PV_MATMUL, params_pv, {sf_outs.task_id}); + prof_submit_manual_count++; + CYCLE_COUNT_LAP(prof_submit_manual); + const Tensor &oi_tmp = pv_outs.outputs.get_ref(0); + + uint64_t is_first = (bn == 0) ? 1 : 0; + uint64_t is_last = (bn == bn_this_batch - 1) ? 1 : 0; + + Arg params_up; + params_up.add_input(mi); + params_up.add_input(li); + params_up.add_input(oi_tmp); + params_up.add_inout(mi_update); + params_up.add_inout(li_update); + params_up.add_inout(oi); + params_up.add_inout(out_view); + params_up.add_scalar(is_first); + params_up.add_scalar(is_last); + PTO2ManualSubmitResult up_outs = pto2_rt_submit_aiv_task_manual_with_deps( + FUNC_ONLINE_UPDATE, params_up, {sf_outs.task_id, pv_outs.task_id, prev_update_task} + ); + prof_submit_manual_count++; + CYCLE_COUNT_LAP(prof_submit_manual); + prev_update_task = up_outs.task_id; + } + } + prof_manual_scope_close_count++; + CYCLE_COUNT_LAP(prof_manual_scope_close); + } + } + } + uint64_t total = prof_submit_manual + prof_add_dependency + prof_manual_scope_close + prof_view; + LOG_ALWAYS( + "=== PartialManual Orch Profiling: submits=%d add_dep=%d total=%.3fus ===", + prof_submit_manual_count, + prof_add_dependency_count, + cycles_to_us(total) + ); + if (total > 0) { + LOG_ALWAYS( + " submit_manual : %7.3fus (%5.1f%%)", + cycles_to_us(prof_submit_manual), + prof_submit_manual * 100.0 / total + ); + LOG_ALWAYS( + " add_dependency : %7.3fus (%5.1f%%)", + cycles_to_us(prof_add_dependency), + prof_add_dependency * 100.0 / total + ); + LOG_ALWAYS( + " scope_close(x%d): %7.3fus (%5.1f%%)", + prof_manual_scope_close_count, + cycles_to_us(prof_manual_scope_close), + prof_manual_scope_close * 100.0 / total + ); + LOG_ALWAYS( + " tensor_view : %7.3fus (%5.1f%%)", + cycles_to_us(prof_view), + prof_view * 100.0 / total + ); + } +} + +} // extern "C" diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/golden.py b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/golden.py new file mode 100644 index 00000000..d3f3c1ac --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/golden.py @@ -0,0 +1,8 @@ +from pathlib import Path +import sys + +_BASE = Path(__file__).resolve().parents[1] / "paged_attention_unroll" +sys.path.insert(0, str(_BASE)) + +from golden import ALL_CASES, ATOL, DEFAULT_CASE, RTOL, __outputs__, generate_inputs # noqa: E402,F401 +from paged_attention_golden import compute_golden, run_golden_test # noqa: E402,F401 diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/aic/aic_hub.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/aic/aic_hub.cpp new file mode 100644 index 00000000..0b3062f1 --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/aic/aic_hub.cpp @@ -0,0 +1,14 @@ +#include +#include + +using namespace pto; + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +extern "C" __aicore__ void kernel_entry(__gm__ int64_t *args) { (void)args; } diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/aiv/aiv_hub.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/aiv/aiv_hub.cpp new file mode 100644 index 00000000..0b3062f1 --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/aiv/aiv_hub.cpp @@ -0,0 +1,14 @@ +#include +#include + +using namespace pto; + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +extern "C" __aicore__ void kernel_entry(__gm__ int64_t *args) { (void)args; } diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/kernel_config.py b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/kernel_config.py new file mode 100644 index 00000000..52b62431 --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/kernel_config.py @@ -0,0 +1,72 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from pathlib import Path + +from task_interface import ArgDirection as D # pyright: ignore[reportAttributeAccessIssue] + +_ROOT = Path(__file__).parent +_PA_KERNELS = _ROOT.parent.parent / "paged_attention_unroll" / "kernels" + +ORCHESTRATION = { + "source": str(_ROOT / "orchestration" / "paged_attention_orch.cpp"), + "function_name": "aicpu_orchestration_entry", + "signature": [D.IN, D.IN, D.IN, D.IN, D.IN, D.OUT], +} + +KERNELS = [ + { + "func_id": 0, + "name": "QK", + "source": str(_PA_KERNELS / "aic" / "aic_qk_matmul.cpp"), + "core_type": "aic", + "signature": [D.IN, D.IN, D.OUT], + }, + { + "func_id": 2, + "name": "PV", + "source": str(_PA_KERNELS / "aic" / "aic_pv_matmul.cpp"), + "core_type": "aic", + "signature": [D.IN, D.IN, D.OUT], + }, + { + "func_id": 4, + "name": "AIC_HUB", + "source": str(_ROOT / "aic" / "aic_hub.cpp"), + "core_type": "aic", + "signature": [], + }, + { + "func_id": 1, + "name": "SF", + "source": str(_PA_KERNELS / "aiv" / "aiv_softmax_prepare.cpp"), + "core_type": "aiv", + "signature": [D.IN, D.OUT, D.OUT, D.OUT], + }, + { + "func_id": 3, + "name": "UP", + "source": str(_PA_KERNELS / "aiv" / "aiv_online_update.cpp"), + "core_type": "aiv", + "signature": [D.IN, D.IN, D.IN, D.INOUT, D.INOUT, D.INOUT, D.INOUT], + }, + { + "func_id": 5, + "name": "AIV_HUB", + "source": str(_ROOT / "aiv" / "aiv_hub.cpp"), + "core_type": "aiv", + "signature": [], + }, +] + +RUNTIME_CONFIG = { + "runtime": "tensormap_and_ringbuffer", + "aicpu_thread_num": 4, + "orch_thread_num": 1, + "block_dim": 24, +} diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/orchestration/paged_attention_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/orchestration/paged_attention_orch.cpp new file mode 100644 index 00000000..cfe6666e --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll_partial_manual/kernels/orchestration/paged_attention_orch.cpp @@ -0,0 +1,186 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +#include +#include + +#include "pto_orchestration_api.h" // NOLINT(build/include_subdir) + +#define N_UNROLL 64 + +#define FUNC_QK_MATMUL 0 +#define FUNC_SOFTMAX_PREPARE 1 +#define FUNC_PV_MATMUL 2 +#define FUNC_ONLINE_UPDATE 3 +#define FUNC_AIC_HUB 4 +#define FUNC_AIV_HUB 5 + +extern "C" { + +__attribute__((visibility("default"))) PTO2OrchestrationConfig +aicpu_orchestration_config(const ChipStorageTaskArgs &orch_args) { + (void)orch_args; // NOLINT(readability/casting) + return PTO2OrchestrationConfig{ + .expected_arg_count = 7, + }; +} + +__attribute__((visibility("default"))) void +aicpu_orchestration_entry(const ChipStorageTaskArgs &orch_args, int orch_thread_num, int orch_thread_index) { + (void)orch_thread_num; // NOLINT(readability/casting) + (void)orch_thread_index; // NOLINT(readability/casting) + + uint64_t batch = orch_args.tensor(0).shapes[0]; + uint64_t num_heads = orch_args.tensor(0).shapes[1]; + uint64_t head_dim = orch_args.tensor(0).shapes[2]; + DataType data_type = orch_args.tensor(0).dtype; + uint64_t block_size = orch_args.tensor(1).shapes[1]; + uint64_t block_num = orch_args.tensor(3).shapes[1]; + uint64_t scale_value = orch_args.scalar(0); + + uint64_t q_head_num = num_heads; + uint64_t q_tile = std::min(num_heads, 128UL); + uint64_t q_loop = (q_head_num + q_tile - 1) / q_tile; + + void *query_ptr = orch_args.tensor(0).data_as(); + void *kc_ptr = orch_args.tensor(1).data_as(); + void *vc_ptr = orch_args.tensor(2).data_as(); + void *out_ptr = orch_args.tensor(5).data_as(); + + uint64_t total_blocks_count = orch_args.tensor(1).shapes[0]; + + uint32_t query_shapes[2] = {static_cast(batch * num_heads), static_cast(head_dim)}; + uint32_t key_cache_shapes[2] = { + static_cast(total_blocks_count * block_size), static_cast(head_dim) + }; + uint32_t value_cache_shapes[2] = { + static_cast(total_blocks_count * block_size), static_cast(head_dim) + }; + uint32_t out_shapes[2] = {static_cast(batch * num_heads), static_cast(head_dim)}; + Tensor query = make_tensor_external(query_ptr, query_shapes, 2, data_type, false); + Tensor key_cache = make_tensor_external(kc_ptr, key_cache_shapes, 2, data_type, false); + Tensor value_cache = make_tensor_external(vc_ptr, value_cache_shapes, 2, data_type, false); + Tensor out = make_tensor_external(out_ptr, out_shapes, 2, DataType::FLOAT32); + + uint32_t bt_shapes[2] = {static_cast(batch), static_cast(block_num)}; + Tensor block_table = + make_tensor_external(orch_args.tensor(3).data_as(), bt_shapes, 2, DataType::INT32, false); + uint32_t cl_shapes[1] = {static_cast(batch)}; + Tensor context_lens = + make_tensor_external(orch_args.tensor(4).data_as(), cl_shapes, 1, DataType::INT32, false); + + uint32_t oi_shapes[2] = {static_cast(q_tile), static_cast(head_dim)}; + uint32_t li_shapes[1] = {static_cast(q_tile)}; + TensorCreateInfo tile2d_ci(oi_shapes, 2, DataType::FLOAT32); + TensorCreateInfo scalar_noinit_ci(li_shapes, 1, DataType::FLOAT32, false); + TensorCreateInfo scalar_ci(li_shapes, 1, DataType::FLOAT32); + + for (uint64_t b_idx = 0; b_idx < batch; b_idx++) { + uint32_t cl_idx[1] = {static_cast(b_idx)}; + uint64_t cur_seq = static_cast(get_tensor_data(context_lens, 1, cl_idx)); + uint64_t bn_this_batch = (cur_seq + block_size - 1) / block_size; + + for (uint64_t q_idx = 0; q_idx < q_loop; q_idx++) { + PTO2_SCOPE() { + uint64_t cur_offset = b_idx * q_head_num + q_idx * q_tile; + + uint32_t qi_shapes[2] = {static_cast(q_tile), static_cast(head_dim)}; + uint32_t qi_offsets[2] = {static_cast(cur_offset), 0}; + Tensor qi = query.view(qi_shapes, qi_offsets); + uint32_t out_view_shapes[2] = {static_cast(q_tile), static_cast(head_dim)}; + uint32_t out_view_offsets[2] = {static_cast(cur_offset), 0}; + Tensor out_view = out.view(out_view_shapes, out_view_offsets, true); + + Arg params_inplace; + params_inplace.add_output(tile2d_ci); + params_inplace.add_output(scalar_noinit_ci); + params_inplace.add_output(scalar_noinit_ci); + TaskOutputTensors hub_outs = pto2_rt_submit_aiv_task(FUNC_AIV_HUB, params_inplace); + const Tensor &oi = hub_outs.get_ref(0); + const Tensor &li_update = hub_outs.get_ref(1); + const Tensor &mi_update = hub_outs.get_ref(2); + + Arg params_qk; + Arg params_sf; + Arg params_pv; + Arg params_up; + + for (uint64_t bn = 0; bn < bn_this_batch; bn += N_UNROLL) { + uint64_t n_blocks = std::min(static_cast(N_UNROLL), bn_this_batch - bn); + uint64_t last_block_seq_start = (bn + n_blocks - 1) * block_size; + uint64_t valid_len_last = std::min(block_size, cur_seq - last_block_seq_start); + + PTO2_SCOPE(PTO2ScopeMode::MANUAL) { + uint32_t sij_buf_shapes[2] = { + static_cast(q_tile), static_cast(n_blocks * block_size) + }; + TensorCreateInfo sij_buf_ci(sij_buf_shapes, 2, DataType::FLOAT32); + + params_qk.reset(); + params_qk.add_input(qi); + params_qk.add_input(key_cache); + params_qk.add_input(block_table); + params_qk.add_output(sij_buf_ci); + params_qk.add_scalar(n_blocks); + params_qk.add_scalar(b_idx * block_num + bn); + PTO2ManualSubmitResult qk_outs = pto2_rt_submit_aic_task_manual(FUNC_QK_MATMUL, params_qk); + + uint32_t pij_buf_shapes[2] = { + static_cast(q_tile), static_cast(n_blocks * block_size) + }; + TensorCreateInfo pij_buf_ci(pij_buf_shapes, 2, data_type); + + params_sf.reset(); + params_sf.add_input(qk_outs.outputs.get_ref(0)); + params_sf.add_output(pij_buf_ci); + params_sf.add_output(scalar_ci); + params_sf.add_output(scalar_ci); + params_sf.add_scalar(scale_value); + params_sf.add_scalar(n_blocks); + params_sf.add_scalar(valid_len_last); + PTO2ManualSubmitResult sf_outs = pto2_rt_submit_aiv_task_manual_with_deps( + FUNC_SOFTMAX_PREPARE, params_sf, {qk_outs.task_id} + ); + + params_pv.reset(); + params_pv.add_input(sf_outs.outputs.get_ref(0)); + params_pv.add_input(value_cache); + params_pv.add_input(block_table); + params_pv.add_output(tile2d_ci); + params_pv.add_scalar(n_blocks); + params_pv.add_scalar(b_idx * block_num + bn); + PTO2ManualSubmitResult pv_outs = + pto2_rt_submit_aic_task_manual_with_deps(FUNC_PV_MATMUL, params_pv, {sf_outs.task_id}); + + uint64_t is_first = (bn == 0) ? 1 : 0; + uint64_t is_last = (bn + n_blocks >= bn_this_batch) ? 1 : 0; + + params_up.reset(); + params_up.add_input(sf_outs.outputs.get_ref(1)); + params_up.add_input(sf_outs.outputs.get_ref(2)); + params_up.add_input(pv_outs.outputs.get_ref(0)); + params_up.add_inout(mi_update); + params_up.add_inout(li_update); + params_up.add_inout(oi); + params_up.add_inout(out_view); + params_up.add_scalar(is_first); + params_up.add_scalar(is_last); + PTO2ManualSubmitResult up_outs = pto2_rt_submit_aiv_task_manual_with_deps( + FUNC_ONLINE_UPDATE, params_up, {sf_outs.task_id, pv_outs.task_id} + ); + } + } + } + } + } +} + +} // extern "C" diff --git a/tests/ut/cpp/CMakeLists.txt b/tests/ut/cpp/CMakeLists.txt index 29aab3f8..846659d3 100644 --- a/tests/ut/cpp/CMakeLists.txt +++ b/tests/ut/cpp/CMakeLists.txt @@ -59,6 +59,23 @@ function(add_dist_test name src) add_test(NAME ${name} COMMAND ${name}) endfunction() +function(add_a2a3_tmr_test name src) + add_executable(${name} ${src}) + target_include_directories(${name} PRIVATE + /usr/local/include + ${CMAKE_SOURCE_DIR}/../../../src/common/task_interface + ${CMAKE_SOURCE_DIR}/../../../src/a2a3/runtime/tensormap_and_ringbuffer/runtime + ${CMAKE_SOURCE_DIR}/../../../src/a2a3/runtime/tensormap_and_ringbuffer/orchestration + ) + target_compile_options(${name} PRIVATE -D_GLIBCXX_USE_CXX11_ABI=0) + target_link_libraries(${name} PRIVATE + ${GTEST_MAIN_LIB} + ${GTEST_LIB} + pthread + ) + add_test(NAME ${name} COMMAND ${name}) +endfunction() + enable_testing() add_dist_test(test_dist_tensormap test_dist_tensormap.cpp) @@ -66,3 +83,4 @@ add_dist_test(test_dist_ring test_dist_ring.cpp) add_dist_test(test_dist_scope test_dist_scope.cpp) add_dist_test(test_dist_orchestrator test_dist_orchestrator.cpp) add_dist_test(test_dist_scheduler test_dist_scheduler.cpp) +add_a2a3_tmr_test(test_a2a3_tmr_tensor_offsets test_a2a3_tmr_tensor_offsets.cpp) diff --git a/tests/ut/cpp/test_a2a3_tmr_tensor_offsets.cpp b/tests/ut/cpp/test_a2a3_tmr_tensor_offsets.cpp new file mode 100644 index 00000000..d3388da7 --- /dev/null +++ b/tests/ut/cpp/test_a2a3_tmr_tensor_offsets.cpp @@ -0,0 +1,47 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +#include + +#include "pto_orchestration_api.h" + +TEST(A2A3TmrTensorOffsets, ExternalTensorStartsAtZeroOffset) { + uint32_t shapes[2] = {8, 16}; + Tensor tensor = make_tensor_external(reinterpret_cast(0x1000), shapes, 2, DataType::FLOAT32); + + EXPECT_EQ(tensor.start_offset, 0U); +} + +TEST(A2A3TmrTensorOffsets, ViewCachesFlatStartOffsetAtConstruction) { + uint32_t shapes[2] = {8, 16}; + Tensor tensor = make_tensor_external(reinterpret_cast(0x1000), shapes, 2, DataType::FLOAT32); + + uint32_t view_shapes[2] = {2, 4}; + uint32_t view_offsets[2] = {3, 5}; + Tensor view = tensor.view(view_shapes, view_offsets); + + EXPECT_EQ(view.start_offset, 3U * 16U + 5U); +} + +TEST(A2A3TmrTensorOffsets, NestedViewKeepsAccumulatedStartOffset) { + uint32_t shapes[2] = {8, 16}; + Tensor tensor = make_tensor_external(reinterpret_cast(0x1000), shapes, 2, DataType::FLOAT32); + + uint32_t outer_shapes[2] = {4, 8}; + uint32_t outer_offsets[2] = {2, 3}; + Tensor outer = tensor.view(outer_shapes, outer_offsets); + + uint32_t inner_shapes[2] = {2, 4}; + uint32_t inner_offsets[2] = {1, 2}; + Tensor inner = outer.view(inner_shapes, inner_offsets); + + EXPECT_EQ(inner.start_offset, (2U + 1U) * 16U + (3U + 2U)); +} diff --git a/tests/ut/hardware_test_utils.py b/tests/ut/hardware_test_utils.py new file mode 100644 index 00000000..82d49b91 --- /dev/null +++ b/tests/ut/hardware_test_utils.py @@ -0,0 +1,34 @@ +import os +import re +import subprocess + + +def get_test_device_id(default: str = "0") -> str: + """Pick a hardware test device. + + Respect PTO_TEST_DEVICE_ID when explicitly provided. Otherwise prefer the + lowest-ID NPU that reports no running processes in `npu-smi info`, which is + more stable than blindly defaulting to device 0 on shared machines. + """ + + configured = os.environ.get("PTO_TEST_DEVICE_ID") + if configured: + return configured + + try: + result = subprocess.run( + ["npu-smi", "info"], + capture_output=True, + text=True, + check=False, + ) + except FileNotFoundError: + return default + + if result.returncode != 0: + return default + + free_devices = sorted({int(match) for match in re.findall(r"No running processes found in NPU (\d+)", result.stdout)}) + if free_devices: + return str(free_devices[0]) + return default diff --git a/tests/ut/test_manual_scope_boundary.py b/tests/ut/test_manual_scope_boundary.py new file mode 100644 index 00000000..7e931f7c --- /dev/null +++ b/tests/ut/test_manual_scope_boundary.py @@ -0,0 +1,38 @@ +import os +import subprocess +import sys +from pathlib import Path + +import pytest + +from hardware_test_utils import get_test_device_id + + +PROJECT_ROOT = Path(__file__).parent.parent.parent +RUN_EXAMPLE = PROJECT_ROOT / "examples" / "scripts" / "run_example.py" +KERNELS_DIR = ( + PROJECT_ROOT / "tests" / "st" / "a2a3" / "tensormap_and_ringbuffer" / "manual_scope_outer_multiwrite" / "kernels" +) +GOLDEN = PROJECT_ROOT / "tests" / "st" / "a2a3" / "tensormap_and_ringbuffer" / "manual_scope_outer_multiwrite" / "golden.py" +PTO_ISA_COMMIT = "d96c8784" + + +@pytest.mark.requires_hardware +@pytest.mark.skipif(not os.getenv("ASCEND_HOME_PATH"), reason="ASCEND_HOME_PATH not set; Ascend toolkit required") +def test_manual_scope_outer_multiwrite_boundary(): + device_id = get_test_device_id() + command = ( + f"source {os.environ['ASCEND_HOME_PATH']}/bin/setenv.bash >/dev/null 2>&1 && " + f"{sys.executable} {RUN_EXAMPLE} --build --silent " + f"-k {KERNELS_DIR} -g {GOLDEN} -p a2a3 -d {device_id} " + f"--clone-protocol https -c {PTO_ISA_COMMIT}" + ) + result = subprocess.run( + ["bash", "-lc", command], + cwd=PROJECT_ROOT, + capture_output=True, + text=True, + check=False, + ) + + assert result.returncode == 0, result.stdout + result.stderr diff --git a/tests/ut/test_manual_scope_guards.py b/tests/ut/test_manual_scope_guards.py new file mode 100644 index 00000000..6be164dd --- /dev/null +++ b/tests/ut/test_manual_scope_guards.py @@ -0,0 +1,83 @@ +import os +import subprocess +import sys +import time +from pathlib import Path + +import pytest + +from hardware_test_utils import get_test_device_id + + +PROJECT_ROOT = Path(__file__).parent.parent.parent +RUN_EXAMPLE = PROJECT_ROOT / "examples" / "scripts" / "run_example.py" +KERNELS_DIR = PROJECT_ROOT / "tests" / "st" / "a2a3" / "tensormap_and_ringbuffer" / "manual_scope_guard_negative" / "kernels" +GOLDEN = PROJECT_ROOT / "tests" / "st" / "a2a3" / "tensormap_and_ringbuffer" / "manual_scope_guard_negative" / "golden.py" +PTO_ISA_COMMIT = "d96c8784" + + +@pytest.mark.requires_hardware +@pytest.mark.skipif(not os.getenv("ASCEND_HOME_PATH"), reason="ASCEND_HOME_PATH not set; Ascend toolkit required") +@pytest.mark.parametrize( + ("case_name", "expected_message"), + [ + ( + "NestedManualScope", + "manual scope inside manual scope is not supported", + ), + ( + "ManualGetTensorData", + "blocking tensor data access is not supported inside PTO2_SCOPE(PTO2ScopeMode::MANUAL); exit the manual scope first", + ), + ( + "ManualSetTensorData", + "blocking tensor data access is not supported inside PTO2_SCOPE(PTO2ScopeMode::MANUAL); exit the manual scope first", + ), + ( + "ManualSelfDependency", + "add_dependency does not allow self-dependency", + ), + ], +) +def test_manual_scope_guard_failures(case_name, expected_message): + device_id = get_test_device_id() + log_dir = Path.home() / "ascend" / "log" / "debug" / f"device-{device_id}" + if os.getenv("ASCEND_WORK_PATH"): + work_log_dir = Path(os.environ["ASCEND_WORK_PATH"]).expanduser() / "log" / "debug" / f"device-{device_id}" + if work_log_dir.exists(): + log_dir = work_log_dir + before_logs = set(log_dir.glob("*.log")) if log_dir.exists() else set() + command = ( + f"source {os.environ['ASCEND_HOME_PATH']}/bin/setenv.bash >/dev/null 2>&1 && " + f"{sys.executable} {RUN_EXAMPLE} --build --silent " + f"-k {KERNELS_DIR} -g {GOLDEN} -p a2a3 -d {device_id} " + f"--case {case_name} --clone-protocol https -c {PTO_ISA_COMMIT}" + ) + result = subprocess.run( + ["bash", "-lc", command], + cwd=PROJECT_ROOT, + capture_output=True, + text=True, + check=False, + ) + + assert result.returncode != 0 + combined_output = result.stdout + result.stderr + + new_log = None + deadline = time.monotonic() + 20 + while time.monotonic() < deadline: + current_logs = set(log_dir.glob("*.log")) if log_dir.exists() else set() + created = current_logs - before_logs + if created: + new_log = max(created, key=lambda path: path.stat().st_mtime) + break + time.sleep(0.5) + + if new_log is None: + logs = list(log_dir.glob("*.log")) if log_dir.exists() else [] + assert logs, "expected a device log for the failed manual-scope case" + new_log = max(logs, key=lambda path: path.stat().st_mtime) + + log_text = new_log.read_text(encoding="utf-8", errors="ignore") + assert expected_message in combined_output or expected_message in log_text diff --git a/tests/ut/test_manual_scope_perf_invariants.py b/tests/ut/test_manual_scope_perf_invariants.py new file mode 100644 index 00000000..e6c95288 --- /dev/null +++ b/tests/ut/test_manual_scope_perf_invariants.py @@ -0,0 +1,99 @@ +import os +import subprocess +import sys +import time +from pathlib import Path + +import pytest + +from hardware_test_utils import get_test_device_id + + +PROJECT_ROOT = Path(__file__).parent.parent.parent +RUN_EXAMPLE = PROJECT_ROOT / "examples" / "scripts" / "run_example.py" +KERNELS_DIR = ( + PROJECT_ROOT / "tests" / "st" / "a2a3" / "tensormap_and_ringbuffer" / "manual_scope_outer_multiwrite" / "kernels" +) +GOLDEN = PROJECT_ROOT / "tests" / "st" / "a2a3" / "tensormap_and_ringbuffer" / "manual_scope_outer_multiwrite" / "golden.py" +PTO_ISA_COMMIT = "d96c8784" + + +def _device_log_dir(device_id: str) -> Path: + log_dir = Path.home() / "ascend" / "log" / "debug" / f"device-{device_id}" + if os.getenv("ASCEND_WORK_PATH"): + work_log_dir = Path(os.environ["ASCEND_WORK_PATH"]).expanduser() / "log" / "debug" / f"device-{device_id}" + if work_log_dir.exists(): + return work_log_dir + return log_dir + + +def _run_manual_scope_outer_multiwrite(extra_env: dict[str, str]) -> tuple[subprocess.CompletedProcess[str], str]: + device_id = get_test_device_id() + log_dir = _device_log_dir(device_id) + before_logs = set(log_dir.glob("*.log")) if log_dir.exists() else set() + + env_prefix = " ".join(f"{key}={value}" for key, value in extra_env.items()) + command = ( + f"source {os.environ['ASCEND_HOME_PATH']}/bin/setenv.bash >/dev/null 2>&1 && " + f"{env_prefix} " + f"{sys.executable} {RUN_EXAMPLE} --build --silent " + f"-k {KERNELS_DIR} -g {GOLDEN} -p a2a3 -d {device_id} " + f"--clone-protocol https -c {PTO_ISA_COMMIT}" + ) + result = subprocess.run( + ["bash", "-lc", command], + cwd=PROJECT_ROOT, + capture_output=True, + text=True, + check=False, + ) + + new_log = None + deadline = time.monotonic() + 20 + while time.monotonic() < deadline: + current_logs = set(log_dir.glob("*.log")) if log_dir.exists() else set() + created = current_logs - before_logs + if created: + new_log = max(created, key=lambda path: path.stat().st_mtime) + break + time.sleep(0.5) + + if new_log is None: + logs = list(log_dir.glob("*.log")) if log_dir.exists() else [] + if logs: + new_log = max(logs, key=lambda path: path.stat().st_mtime) + + log_text = "" + if new_log is not None: + log_text = new_log.read_text(encoding="utf-8", errors="ignore") + + return result, result.stdout + result.stderr + log_text + + +@pytest.mark.requires_hardware +@pytest.mark.skipif(not os.getenv("ASCEND_HOME_PATH"), reason="ASCEND_HOME_PATH not set; Ascend toolkit required") +def test_manual_scope_tail_consumer_path_keeps_fast_publish(): + result, combined_text = _run_manual_scope_outer_multiwrite( + { + "PTO2_DEBUG_DUMP_MANUAL_SCOPE": "1", + "PTO2_EXPECT_MANUAL_SCOPE_REPAIR": "0", + } + ) + + assert result.returncode == 0, combined_text + assert "manual_scope_repair_needed=0" in combined_text + + +@pytest.mark.requires_hardware +@pytest.mark.skipif(not os.getenv("ASCEND_HOME_PATH"), reason="ASCEND_HOME_PATH not set; Ascend toolkit required") +def test_manual_scope_retroactive_edge_enables_repair_fallback(): + result, combined_text = _run_manual_scope_outer_multiwrite( + { + "PTO2_DEBUG_DUMP_MANUAL_SCOPE": "1", + "PTO2_DEBUG_FORCE_RETROACTIVE_MANUAL_EDGE": "1", + "PTO2_EXPECT_MANUAL_SCOPE_REPAIR": "1", + } + ) + + assert result.returncode == 0, combined_text + assert "manual_scope_repair_needed=1" in combined_text diff --git a/tools/benchmark_rounds.sh b/tools/benchmark_rounds.sh index 64b283e8..834968b2 100755 --- a/tools/benchmark_rounds.sh +++ b/tools/benchmark_rounds.sh @@ -23,24 +23,38 @@ RUN_EXAMPLE="$PROJECT_ROOT/examples/scripts/run_example.py" declare -A TMR_EXAMPLE_CASES=( [alternating_matmul_add]="" [benchmark_bgemm]="" + [paged_attention]="Case1,Case2" [paged_attention_unroll]="Case1,Case2" [batch_paged_attention]="" ) TMR_EXAMPLE_ORDER=( alternating_matmul_add benchmark_bgemm + paged_attention paged_attention_unroll batch_paged_attention ) # --- aicpu_build_graph --- declare -A ABG_EXAMPLE_CASES=( + [paged_attention]="Case1,Case2" [paged_attention_unroll]="Case1,Case2" ) ABG_EXAMPLE_ORDER=( + paged_attention paged_attention_unroll ) +# --- tensormap_and_ringbuffer_partial_manual --- +declare -A TMR_PARTIAL_MANUAL_EXAMPLE_CASES=( + [paged_attention_partial_manual]="Case1,Case2" + [paged_attention_unroll_partial_manual]="Case1,Case2" +) +TMR_PARTIAL_MANUAL_EXAMPLE_ORDER=( + paged_attention_partial_manual + paged_attention_unroll_partial_manual +) + # --------------------------------------------------------------------------- # Parse arguments # --------------------------------------------------------------------------- @@ -49,6 +63,7 @@ ROUNDS=100 PLATFORM=a2a3 RUNTIME=tensormap_and_ringbuffer VERBOSE=0 +EXAMPLE_FILTER="" EXTRA_ARGS=() while [[ $# -gt 0 ]]; do @@ -69,6 +84,10 @@ while [[ $# -gt 0 ]]; do RUNTIME="$2" shift 2 ;; + -e|--examples) + EXAMPLE_FILTER="$2" + shift 2 + ;; -v|--verbose) VERBOSE=1 shift @@ -78,13 +97,16 @@ while [[ $# -gt 0 ]]; do benchmark_rounds.sh — run all examples and report per-round timing from device logs Usage: - ./tools/benchmark_rounds.sh [-p ] [-d ] [-n ] [-r ] [-v] + ./tools/benchmark_rounds.sh [-p ] [-d ] [-n ] [-r ] [-e ] [-v] Options: -p, --platform Platform to run on (default: a2a3) -d, --device Device ID (default: 0) -n, --rounds Override number of rounds for each example (default: 100) - -r, --runtime Runtime to benchmark: tensormap_and_ringbuffer (default), aicpu_build_graph + -r, --runtime Runtime to benchmark: tensormap_and_ringbuffer (default), + tensormap_and_ringbuffer_partial_manual, + aicpu_build_graph + -e, --examples Comma-separated example names to run (default: runtime-specific full list) -v, --verbose Save detailed run_example.py output to a timestamped log file -h, --help Show this help @@ -124,7 +146,7 @@ vlog() { # --------------------------------------------------------------------------- # Derive arch from platform and set examples directory # --------------------------------------------------------------------------- -EXAMPLES_DIR="$PROJECT_ROOT/tests/st/${PLATFORM}/${RUNTIME}" +TESTS_RUNTIME_DIR="$RUNTIME" # Clock frequency (MHz) for converting cycle counts to microseconds case "$PLATFORM" in @@ -139,16 +161,37 @@ case "$RUNTIME" in declare -n EXAMPLE_CASES=TMR_EXAMPLE_CASES EXAMPLE_ORDER=("${TMR_EXAMPLE_ORDER[@]}") ;; + tensormap_and_ringbuffer_partial_manual) + TESTS_RUNTIME_DIR="tensormap_and_ringbuffer" + declare -n EXAMPLE_CASES=TMR_PARTIAL_MANUAL_EXAMPLE_CASES + EXAMPLE_ORDER=("${TMR_PARTIAL_MANUAL_EXAMPLE_ORDER[@]}") + ;; aicpu_build_graph) declare -n EXAMPLE_CASES=ABG_EXAMPLE_CASES EXAMPLE_ORDER=("${ABG_EXAMPLE_ORDER[@]}") ;; *) - echo "ERROR: unknown runtime '$RUNTIME'. Use tensormap_and_ringbuffer or aicpu_build_graph." + echo "ERROR: unknown runtime '$RUNTIME'. Use tensormap_and_ringbuffer, tensormap_and_ringbuffer_partial_manual, or aicpu_build_graph." exit 1 ;; esac +EXAMPLES_DIR="$PROJECT_ROOT/tests/st/${PLATFORM}/${TESTS_RUNTIME_DIR}" + +if [[ -n "$EXAMPLE_FILTER" ]]; then + IFS=',' read -ra REQUESTED_EXAMPLES <<< "$EXAMPLE_FILTER" + FILTERED_ORDER=() + for requested in "${REQUESTED_EXAMPLES[@]}"; do + if [[ -n "${EXAMPLE_CASES[$requested]+x}" ]]; then + FILTERED_ORDER+=("$requested") + else + echo "ERROR: example '$requested' is not available for runtime '$RUNTIME'." + exit 1 + fi + done + EXAMPLE_ORDER=("${FILTERED_ORDER[@]}") +fi + # --------------------------------------------------------------------------- # Resolve device log directory (mirrors run_example.py / device_log_resolver.py) # ---------------------------------------------------------------------------