-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Phase 7 — Worker Boundary and Cloud-Safe Execution Profiles #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| # Phase 07 Plan: Worker Boundary and Cloud-Safe Execution Profiles | ||
|
|
||
| **Phase**: 07 — Worker Boundary and Cloud-Safe Execution Profiles | ||
| **Goal**: Make long-running execution explicit and safe when the control plane is hosted away from the local workstation. Separate cloud ingress from local-only providers. | ||
| **Requirements**: WRKR-01, WRKR-02, WRKR-03 | ||
|
|
||
| ## Context | ||
|
|
||
| Phase 6 delivered a stable REST API and shared orchestration contract. Phase 7 introduces a worker boundary so HTTP ingress never waits on long-running repo execution, and adds execution profiles so cloud-hosted runs can explicitly reject local-only subprocess providers. | ||
|
|
||
| The local execution path is preserved intact. Cloud-safe is strictly opt-in via `--profile cloud-safe`. | ||
|
|
||
| ## Plans | ||
|
|
||
| ### 07-01: Worker boundary and background run dispatch contract | ||
|
|
||
| **Files created:** | ||
| - `maf_starter/worker_boundary.py` — `WorkerProfile` enum (LOCAL, CLOUD_SAFE), `WorkerBoundary` class with async dispatch and run status | ||
| - `tests/test_worker_boundary.py` — unit tests for submit_async, get_status, done/pending states | ||
|
|
||
| **Design decisions:** | ||
| - `WorkerBoundary.submit_async(run_id, workflow)` dispatches via `asyncio.create_task` — no new dependencies | ||
| - Status is tracked in a plain dict keyed by run_id; values are `"pending"`, `"running"`, `"done"`, or `"error:<message>"` | ||
| - `submit_async` returns immediately with the run_id so the HTTP layer is never blocked | ||
| - `WorkerProfile` is a string enum to allow clean serialization and CLI parsing | ||
|
|
||
| ### 07-02: Cloud-safe provider and execution profiles | ||
|
|
||
| **Files created / modified:** | ||
| - `maf_starter/execution_profile.py` — `ExecutionProfile` dataclass, `CLOUD_SAFE_PROFILE` and `LOCAL_PROFILE` constants, `IncompatibleProviderError` | ||
| - `maf_starter/provider_fallback.py` — guard added at the top of `_execute_chain_step` to reject subprocess providers when profile is CLOUD_SAFE | ||
| - `maf_starter/devui_overrides.py` — `--profile` CLI flag added (choices: local, cloud-safe; default: local) wired through to settings context | ||
| - `tests/test_execution_profile.py` — unit tests for profile enforcement and IncompatibleProviderError | ||
|
|
||
| **Design decisions:** | ||
| - `IncompatibleProviderError(RuntimeError)` carries provider name and profile name in a clear message | ||
| - Subprocess providers are `gemini-cli`, `claude-cli`, `codex-cli` — same set already used throughout `provider_fallback.py` | ||
| - LOCAL profile imposes no restrictions; CLOUD_SAFE rejects all subprocess providers on first check before any subprocess is spawned | ||
| - `ExecutionProfile` is a frozen dataclass with `profile: WorkerProfile` and `capabilities: tuple[str, ...]` | ||
| - `CLOUD_SAFE_PROFILE` capabilities list: `["api-only", "no-subprocess"]` | ||
| - `LOCAL_PROFILE` capabilities list: `["api", "subprocess", "repo-execution"]` | ||
|
|
||
| ### 07-03: End-to-end validation | ||
|
|
||
| **Files created / modified:** | ||
| - `tests/test_phase7_e2e.py` — three integration tests: | ||
| 1. Cloud-safe profile rejects gemini-cli subprocess provider with `IncompatibleProviderError` | ||
| 2. Local profile accepts all providers (gemini, anthropic, gemini-cli, claude-cli, codex-cli) | ||
| 3. Async dispatch via `WorkerBoundary.submit_async` returns run_id immediately without blocking | ||
| - `STATE.md` updated: Phase 7 marked complete | ||
|
|
||
| ## Verification | ||
|
|
||
| All tests pass with: | ||
| ``` | ||
| cd C:\PersonalRepo\portfolio\autogen && python -m pytest tests/ -v | ||
| ``` | ||
|
|
||
| ## Constraints | ||
|
|
||
| - No new pip dependencies — asyncio and stdlib only | ||
| - Local path unchanged — subprocess providers still work under LOCAL profile | ||
| - `IncompatibleProviderError` is informative: `"Provider {name} requires subprocess access which is not available in cloud-safe profile"` | ||
| - snake_case modules, PascalCase dataclasses, UPPER_SNAKE_CASE module constants — consistent with existing maf_starter patterns |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from dataclasses import dataclass | ||
|
|
||
| from maf_starter.worker_boundary import WorkerProfile | ||
|
|
||
|
|
||
| # Providers that require subprocess access and cannot run in a cloud-hosted context. | ||
| SUBPROCESS_PROVIDERS: frozenset[str] = frozenset({"gemini-cli", "claude-cli", "codex-cli"}) | ||
|
|
||
|
|
||
| class IncompatibleProviderError(RuntimeError): | ||
| """Raised when a provider requires capabilities that the active execution | ||
| profile does not allow. | ||
|
|
||
| Example:: | ||
|
|
||
| raise IncompatibleProviderError("gemini-cli", WorkerProfile.CLOUD_SAFE) | ||
| """ | ||
|
|
||
| def __init__(self, provider: str, profile: WorkerProfile) -> None: | ||
| self.provider = provider | ||
| self.profile = profile | ||
| super().__init__( | ||
| f"Provider {provider!r} requires subprocess access which is not " | ||
| f"available in {profile.value!r} profile." | ||
| ) | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class ExecutionProfile: | ||
| """Describes the execution capabilities available in the current hosting context. | ||
|
|
||
| Attributes: | ||
| profile: The broad classification of the execution environment. | ||
| capabilities: Explicit capability tokens that other modules can inspect | ||
| to decide whether a feature or provider is allowed. | ||
| """ | ||
|
|
||
| profile: WorkerProfile | ||
| capabilities: tuple[str, ...] | ||
|
|
||
| def allows_subprocess(self) -> bool: | ||
| """Return True when subprocess-backed providers are permitted.""" | ||
| return "subprocess" in self.capabilities | ||
|
|
||
| def assert_provider_allowed(self, provider: str) -> None: | ||
| """Raise IncompatibleProviderError if *provider* is a subprocess provider | ||
| and the profile does not allow subprocess access. | ||
|
|
||
| Args: | ||
| provider: Provider key such as ``"gemini-cli"``, ``"claude-cli"``, | ||
| or ``"gemini"``. Non-subprocess providers are always | ||
| allowed and this method returns immediately. | ||
| """ | ||
| if provider in SUBPROCESS_PROVIDERS and not self.allows_subprocess(): | ||
| raise IncompatibleProviderError(provider, self.profile) | ||
|
|
||
|
|
||
| # Pre-built profile constants ------------------------------------------------- | ||
|
|
||
| LOCAL_PROFILE = ExecutionProfile( | ||
| profile=WorkerProfile.LOCAL, | ||
| capabilities=("api", "subprocess", "repo-execution"), | ||
| ) | ||
| """Local workstation profile — all providers and repo execution are permitted.""" | ||
|
|
||
| CLOUD_SAFE_PROFILE = ExecutionProfile( | ||
| profile=WorkerProfile.CLOUD_SAFE, | ||
| capabilities=("api-only", "no-subprocess"), | ||
| ) | ||
| """Cloud-safe profile — API-only providers only; subprocess providers are rejected.""" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,7 @@ async def get_final_response(self): | |
| AnthropicClient = None | ||
|
|
||
| from maf_starter.config import Settings, activate_run_scope, reset_run_scope | ||
| from maf_starter.execution_profile import CLOUD_SAFE_PROFILE, LOCAL_PROFILE, ExecutionProfile | ||
| from maf_starter.routing_policy import RoutingPlan, build_routing_plan | ||
| from maf_starter.routing_types import CapabilityChange, ChainStep, RouteAttempt | ||
|
|
||
|
|
@@ -317,7 +318,11 @@ async def _execute_chain_step( | |
| prior_error: Exception, | ||
| attempt_log: list[RouteAttempt] | None = None, | ||
| fallback_index: int = 0, | ||
| profile: ExecutionProfile = LOCAL_PROFILE, | ||
| ): | ||
| # Guard: reject subprocess-backed providers when the profile disallows them. | ||
| profile.assert_provider_allowed(step.provider) | ||
|
Comment on lines
+321
to
+324
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The new guard always receives Useful? React with 👍 / 👎. |
||
|
|
||
| if step.provider == "gemini": | ||
| client = OpenAIChatClient( | ||
| model_id=step.model or settings.model, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| from enum import Enum | ||
| from typing import Any, Awaitable, Callable | ||
|
|
||
|
|
||
| class WorkerProfile(str, Enum): | ||
| LOCAL = "local" | ||
| CLOUD_SAFE = "cloud-safe" | ||
|
|
||
|
|
||
| _RunStatus = str # "pending" | "running" | "done" | "error:<message>" | ||
|
|
||
|
|
||
| class WorkerBoundary: | ||
| """Dispatch long-running workflow executions asynchronously so HTTP ingress | ||
| returns a run_id immediately instead of waiting for the full execution path. | ||
|
|
||
| Status values returned by get_status: | ||
| "pending" — task queued, not yet started | ||
| "running" — task is actively executing | ||
| "done" — task completed successfully | ||
| "error:<msg>" — task raised an exception; <msg> is str(exc) | ||
| """ | ||
|
|
||
| def __init__(self, profile: WorkerProfile = WorkerProfile.LOCAL) -> None: | ||
| self.profile = profile | ||
| self._status: dict[str, _RunStatus] = {} | ||
| self._tasks: dict[str, asyncio.Task[Any]] = {} | ||
|
|
||
| def submit_async( | ||
| self, | ||
| run_id: str, | ||
| workflow: Callable[[], Awaitable[Any]], | ||
| ) -> str: | ||
| """Dispatch *workflow* as a background asyncio task and return *run_id* | ||
| immediately. The caller never waits for the workflow to finish. | ||
|
|
||
| Args: | ||
| run_id: Stable identifier for this run (caller's responsibility to | ||
| generate a unique value, e.g. via uuid.uuid4().hex). | ||
| workflow: Zero-argument async callable that encapsulates the full | ||
| long-running execution path for this run. | ||
|
|
||
| Returns: | ||
| run_id — the same value passed in, ready for the HTTP response. | ||
| """ | ||
| self._status[run_id] = "pending" | ||
| task = asyncio.create_task(self._run(run_id, workflow)) | ||
| self._tasks[run_id] = task | ||
|
Comment on lines
+50
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Every submission stores its task in Useful? React with 👍 / 👎. |
||
| return run_id | ||
|
|
||
| async def _run(self, run_id: str, workflow: Callable[[], Awaitable[Any]]) -> None: | ||
| self._status[run_id] = "running" | ||
| try: | ||
| await workflow() | ||
| self._status[run_id] = "done" | ||
| except Exception as exc: # noqa: BLE001 | ||
| self._status[run_id] = f"error:{exc}" | ||
|
|
||
| def get_status(self, run_id: str) -> _RunStatus | None: | ||
| """Return the current status string for *run_id*, or None if unknown.""" | ||
| return self._status.get(run_id) | ||
|
|
||
| def is_done(self, run_id: str) -> bool: | ||
| """Return True when the run has reached a terminal state (done or error).""" | ||
| status = self._status.get(run_id) | ||
| if status is None: | ||
| return False | ||
| return status == "done" or status.startswith("error:") | ||
|
|
||
| def all_run_ids(self) -> tuple[str, ...]: | ||
| """Return all known run IDs in submission order.""" | ||
| return tuple(self._status) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
--profile cloud-safeis used withAUTOGEN_PROVIDER=gemini-cli,claude-cli, orcodex-cli, the selected profile is only assigned and printed;run_blocking_chatandrun_resumable_stepstill callcreate_model_client(settings)without checking or passing it, anddashboardstarts the app without preserving it. Consequently, a command advertised as disabling subprocess providers still constructs and runs them.Useful? React with 👍 / 👎.