diff --git a/changelog/current.md b/changelog/current.md index 1e0cba579..f67e2315c 100644 --- a/changelog/current.md +++ b/changelog/current.md @@ -14,6 +14,14 @@ Record image-affecting changes to `manager/`, `worker/`, `copaw/`, `openclaw-bas - **modelProvider authorization boundary**: Controller reconcilers now own provider-specific AI route authorization, while provisioning keeps using the default gateway authorization path to avoid duplicate provider coupling. - **Matrix AppService mode**: The controller can register as a Matrix Application Service and provision/log in users with the `as_token` instead of per-user passwords (legacy password auth is preserved when disabled). Enabled by default via `HICLAW_MATRIX_APPSERVICE_ENABLED`; the install script and the Helm `runtime-env` Secret generate and persist `HICLAW_MATRIX_APPSERVICE_AS_TOKEN` / `HICLAW_MATRIX_APPSERVICE_HS_TOKEN`. Set `HICLAW_MATRIX_APPSERVICE_USER_NAMESPACE_REGEX` to narrow the exclusive user namespace when running against a shared / pre-existing homeserver. +**Copaw → QwenPaw Migration** + +- feat(copaw): switch worker runtime entrypoint from `copaw.app._app:app` to `qwenpaw.app._app:app`, adding `QWENPAW_WORKING_DIR` env alongside `COPAW_WORKING_DIR` for proper working directory resolution. +- feat(copaw): add `qwenpaw` CLI symlink to PATH for TeamHarness adapter compatibility. +- fix(copaw): change `agents.profiles` injection from shallow `setdefault` to nested merge, ensuring `active_agent`, `profiles.default.id`, and `workspace_dir` are present even when `config.json` already has an `agents` section. +- fix(copaw): add copaw `BaseChannel` fallback in `channel.py` so Manager (copaw-only) can load the Matrix channel without qwenpaw installed. +- fix(copaw): restore `agentscope_runtime` imports in copaw fallback path to prevent empty Manager replies. + **Bug Fixes** - **CoPaw worker runtime environment**: CoPaw workers now prefer AgentTeams storage/runtime environment variables while preserving legacy HiClaw fallbacks, and Qwen-style model health preflights disable thinking for lightweight readiness checks. diff --git a/copaw/Dockerfile b/copaw/Dockerfile index baeb45fa5..d14646761 100644 --- a/copaw/Dockerfile +++ b/copaw/Dockerfile @@ -88,7 +88,11 @@ RUN python -m venv /opt/venv/standard \ && /opt/venv/standard/bin/pip install --no-cache-dir \ --index-url "${PIP_INDEX_URL}" \ --trusted-host "$(echo ${PIP_INDEX_URL} | sed 's|https\?://||;s|/.*||')" \ - /tmp/copaw-worker/ + /tmp/copaw-worker/ \ + && /opt/venv/standard/bin/pip install --no-cache-dir \ + --index-url "${PIP_INDEX_URL}" \ + --trusted-host "$(echo ${PIP_INDEX_URL} | sed 's|https\?://||;s|/.*||')" \ + qwenpaw # --- Patch CoPaw 1.0.2 Matrix channel indentation bug --- # _sync_loop method def is indented one level too deep inside @@ -144,6 +148,9 @@ RUN STANDARD_SITE=/opt/venv/standard/lib/python3.11/site-packages/copaw_worker \ # copaw-sync wrapper RUN printf '#!/bin/bash\nexec python3 "/root/.copaw-worker/${HICLAW_WORKER_NAME}/skills/file-sync/scripts/copaw-sync.py" "$@"\n' \ > /usr/local/bin/copaw-sync && chmod +x /usr/local/bin/copaw-sync +# CoPaw runtime CLI (make it available in PATH for skills) +RUN ln -sf /opt/venv/standard/bin/copaw /usr/local/bin/copaw \ + && ln -sf /opt/venv/standard/bin/qwenpaw /usr/local/bin/qwenpaw # Entrypoint script COPY --chmod=755 scripts/copaw-worker-entrypoint.sh /opt/hiclaw/scripts/copaw-worker-entrypoint.sh diff --git a/copaw/scripts/copaw-worker-entrypoint.sh b/copaw/scripts/copaw-worker-entrypoint.sh index 5d5b8bac9..acdd69186 100755 --- a/copaw/scripts/copaw-worker-entrypoint.sh +++ b/copaw/scripts/copaw-worker-entrypoint.sh @@ -23,6 +23,7 @@ source /opt/hiclaw/scripts/lib/hiclaw-env.sh WORKER_NAME="${AGENTTEAMS_WORKER_NAME:-${HICLAW_WORKER_NAME:-}}" [ -n "${WORKER_NAME}" ] || { echo "AGENTTEAMS_WORKER_NAME is required" >&2; exit 1; } INSTALL_DIR="/root/.copaw-worker" +export QWENPAW_WORKING_DIR="${INSTALL_DIR}/${WORKER_NAME}" CONSOLE_PORT="${AGENTTEAMS_CONSOLE_PORT:-${HICLAW_CONSOLE_PORT:-}}" log() { diff --git a/copaw/src/copaw_worker/bridge.py b/copaw/src/copaw_worker/bridge.py index fef88b17b..79dd3248e 100644 --- a/copaw/src/copaw_worker/bridge.py +++ b/copaw/src/copaw_worker/bridge.py @@ -1,20 +1,164 @@ """ -Bridge: translate openclaw.json (HiClaw Worker config) into CoPaw's -config.json + providers.json, then set COPAW_WORKING_DIR so CoPaw -picks up the right workspace. +Bridge between HiClaw's standard space and CoPaw's runtime space. + +The standard space is the OpenClaw-style sync root restored from MinIO, for +example ``/root/.hiclaw-worker//``. It is the durable, runtime-agnostic +layout owned by HiClaw: + + - ``openclaw.json`` + - ``SOUL.md`` / ``AGENTS.md`` / ``HEARTBEAT.md`` + - ``skills/``, ``config/``, ``credentials/``, and other synced files + +The runtime space is CoPaw's native working directory under the standard space, +for example ``/root/.hiclaw-worker//.copaw/``. It is the layout that the +CoPaw process actually reads and mutates while running: + + - ``config.json`` + - ``providers.json`` and ``.copaw.secret/providers.json`` + - ``workspaces/default/agent.json`` + - ``workspaces/default/SOUL.md`` / ``AGENTS.md`` / ``HEARTBEAT.md`` + +Standard space -> runtime space: + + - Convert ``openclaw.json`` into CoPaw-native structured config: + ``config.json``, ``providers.json``, and ``workspaces/default/agent.json``. + - Patch CoPaw path constants so the running process reads this runtime space. + - Copy ``providers.json`` into the adjacent secret dir that CoPaw reads. + - Copy prompt files into ``workspaces/default/``. + - Copy ``config/mcporter.json`` into + ``workspaces/default/config/mcporter.json``; the legacy + ``mcporter-servers.json`` source is still accepted. + - Expose Manager-pushed ``skills//`` directories by making + ``workspaces/default/skills`` a symlink to the standard-space ``skills/`` + directory. The standard space remains canonical. + +Runtime space -> standard space: + + - Copy agent-edited prompt files from ``workspaces/default/`` back to the + standard space when the runtime copy is newer. + - Leave MinIO upload to ``sync.push_local``; this bridge only materializes the + standard-space files that the normal push loop will persist. """ -from __future__ import annotations - -import logging -logger = logging.getLogger(__name__) +from __future__ import annotations import json +import logging import os import shutil from importlib import resources from pathlib import Path -from typing import Any +from typing import Any, Callable + +logger = logging.getLogger(__name__) + +# Sentinel returned by derivers to mean "skip this policy this run" (the +# corresponding key is left as-is in agent.json). +_MISSING: Any = object() + + +def bridge_standard_to_runtime( + standard_dir: Path, + runtime_dir: Path, + controller_config: dict[str, Any], + *, + skill_names: list[str] | None = None, + profile: str = "worker", + agent: str = "default", +) -> None: + """Materialize standard-space files into CoPaw's runtime space.""" + sync_outer_prompt_files_to_inner(standard_dir, runtime_dir) + bridge_openclaw_to_copaw( + controller_config, + runtime_dir, + profile=profile, + agent=agent, + ) + _apply_credential_guard(standard_dir, runtime_dir) + sync_mcporter_config_to_runtime(standard_dir, runtime_dir) + if skill_names is not None: + sync_skills_to_runtime(standard_dir, runtime_dir, skill_names) + + +def refresh_standard_to_runtime( + standard_dir: Path, + runtime_dir: Path, + controller_config: dict[str, Any], + *, + get_soul: Callable[[], str | None], + get_agents_md: Callable[[], str | None], + skill_names: list[str] | None = None, + profile: str = "worker", + agent: str = "default", +) -> None: + """Refresh runtime space during re-bridge, including legacy prompt fallback.""" + sync_rebridged_prompt_files_to_inner( + standard_dir, + runtime_dir, + get_soul=get_soul, + get_agents_md=get_agents_md, + ) + bridge_openclaw_to_copaw( + controller_config, + runtime_dir, + profile=profile, + agent=agent, + ) + _apply_credential_guard(standard_dir, runtime_dir) + sync_mcporter_config_to_runtime(standard_dir, runtime_dir) + if skill_names is not None: + sync_skills_to_runtime(standard_dir, runtime_dir, skill_names) + + +def bridge_openclaw_to_copaw( + openclaw_cfg: dict[str, Any], + working_dir: Path, + *, + profile: str = "worker", + agent: str = "default", +) -> None: + """Bridge OpenClaw-style config into CoPaw's runtime files.""" + if profile not in ("worker", "manager"): + raise ValueError( + f"unknown bridge profile: {profile!r} (use 'worker' or 'manager')" + ) + + working_dir.mkdir(parents=True, exist_ok=True) + in_container = _is_in_container() + + _ensure_config_json(working_dir) + _write_providers_json(openclaw_cfg, working_dir, in_container) + _write_agent_json( + openclaw_cfg, + working_dir, + in_container, + profile=profile, + agent=agent, + ) + + os.environ["COPAW_WORKING_DIR"] = str(working_dir) + _patch_copaw_paths(working_dir) + + secret_dir = _secret_dir(working_dir) + providers_src = working_dir / "providers.json" + if providers_src.exists(): + shutil.copy2(providers_src, secret_dir / "providers.json") + + +def bridge_controller_to_copaw( + controller_config: dict[str, Any], + working_dir: Path, + *, + profile: str = "worker", + agent: str = "default", +) -> None: + """Compatibility alias for bridge_openclaw_to_copaw.""" + bridge_openclaw_to_copaw( + controller_config, + working_dir, + profile=profile, + agent=agent, + ) def _port_remap(url: str, is_container: bool) -> str: @@ -35,12 +179,7 @@ def _secret_dir(working_dir: Path) -> Path: def _patch_copaw_paths(working_dir: Path) -> None: - """Patch copaw's module-level path constants to point at working_dir. - - copaw.constant captures WORKING_DIR / SECRET_DIR at import time from - env vars, so setting COPAW_WORKING_DIR after import has no effect. - We must update the live module objects directly. - """ + """Patch copaw's module-level path constants to point at working_dir.""" secret_dir = _secret_dir(working_dir) secret_dir.mkdir(parents=True, exist_ok=True) @@ -48,7 +187,9 @@ def _patch_copaw_paths(working_dir: Path) -> None: import copaw.constant as _const _const.WORKING_DIR = working_dir _const.SECRET_DIR = secret_dir - _const.ACTIVE_SKILLS_DIR = working_dir / "active_skills" + _const.ACTIVE_SKILLS_DIR = ( + working_dir / "workspaces" / "default" / "skills" + ) _const.CUSTOMIZED_SKILLS_DIR = working_dir / "customized_skills" _const.MEMORY_DIR = working_dir / "memory" _const.CUSTOM_CHANNELS_DIR = working_dir / "custom_channels" @@ -75,71 +216,240 @@ def _patch_copaw_paths(working_dir: Path) -> None: except (ImportError, AttributeError): pass - # copaw.app.channels.registry binds CUSTOM_CHANNELS_DIR via - # `from ...constant import CUSTOM_CHANNELS_DIR` at import time, so it keeps - # a STALE copy of the default path even after we patch copaw.constant above. - # _discover_custom_channels() / register_custom_channel_routes() read this - # module global at CALL time, so rebinding it here (before ChannelManager - # starts) makes them see our working_dir/custom_channels regardless of - # import order. Without this the patched matrix_channel.py is never - # discovered and copaw falls back to its builtin (broken) Matrix channel. - try: - import copaw.app.channels.registry as _channels_registry - _channels_registry.CUSTOM_CHANNELS_DIR = working_dir / "custom_channels" - logger.info( - "bridge: patched channels registry CUSTOM_CHANNELS_DIR -> %s", - _channels_registry.CUSTOM_CHANNELS_DIR, - ) - except ImportError: - pass +def _template_text(name: str) -> str: + """Read a template by basename from the in-tree templates/ directory.""" + return (resources.files("copaw_worker") / "templates" / name).read_text( + encoding="utf-8" + ) -def bridge_openclaw_to_copaw( - openclaw_cfg: dict[str, Any], - working_dir: Path, - *, - profile: str = "manager", + +def _install_from_template(dst: Path, template_name: str) -> bool: + """Copy template -> dst only if dst is missing. Returns True when installed.""" + if dst.exists(): + return False + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_text(_template_text(template_name), encoding="utf-8") + logger.info("bridge: installed %s from template %s", dst, template_name) + return True + + +def sync_mcporter_config_to_runtime(standard_dir: Path, runtime_dir: Path) -> Path | None: + """Copy mcporter config from standard space into CoPaw's default workspace.""" + src_candidates = ( + standard_dir / "config" / "mcporter.json", + standard_dir / "mcporter-servers.json", + ) + src = next((candidate for candidate in src_candidates if candidate.exists()), None) + if src is None: + logger.info("No mcporter config found to copy from %s", standard_dir) + return None + + dst = runtime_dir / "workspaces" / "default" / "config" / "mcporter.json" + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src, dst) + logger.info("mcporter config copied to %s", dst) + return dst + + +def sync_skills_to_runtime( + standard_dir: Path, + runtime_dir: Path, + skill_names: list[str], +) -> list[str]: + """Expose Manager-pushed skills in CoPaw runtime space via symlink.""" + standard_skills_dir = standard_dir / "skills" + standard_skills_dir.mkdir(parents=True, exist_ok=True) + + # MinIO does not preserve Unix permission bits. Restore executable scripts + # in the standard space because runtime skills are a direct symlink to it. + for sh in standard_skills_dir.rglob("*.sh"): + sh.chmod(sh.stat().st_mode | 0o111) + + skill_name_set = set(skill_names) + for child in list(standard_skills_dir.iterdir()): + if child.is_dir() and child.name not in skill_name_set: + shutil.rmtree(child) + logger.info("Removed stale standard skill no longer in MinIO: %s", child.name) + + workspace_skills_dir = runtime_dir / "workspaces" / "default" / "skills" + workspace_skills_dir.parent.mkdir(parents=True, exist_ok=True) + dedup_customized_skills(runtime_dir) + + expected_target = standard_skills_dir.resolve() + if workspace_skills_dir.is_symlink(): + if workspace_skills_dir.resolve() != expected_target: + workspace_skills_dir.unlink() + elif workspace_skills_dir.exists(): + if workspace_skills_dir.is_dir(): + shutil.rmtree(workspace_skills_dir) + else: + workspace_skills_dir.unlink() + + if not workspace_skills_dir.exists(): + target = os.path.relpath(standard_skills_dir, workspace_skills_dir.parent) + workspace_skills_dir.symlink_to(target, target_is_directory=True) + logger.info("Linked runtime skills dir %s -> %s", workspace_skills_dir, target) + + installed = [ + skill_name + for skill_name in skill_names + if (standard_skills_dir / skill_name).exists() + ] + for skill_name in installed: + logger.info("Exposed MinIO skill: %s", skill_name) + enable_workspace_skills_by_default(runtime_dir, installed) + return installed + + +def enable_workspace_skills_by_default( + runtime_dir: Path, + skill_names: list[str], ) -> None: - """ - Read openclaw_cfg (parsed openclaw.json) and write: - - /config.json (global config) - - /workspaces/default/agent.json (per-agent config) - - /providers.json (LLM credentials, for reference) - - .secret/providers.json (where copaw actually reads from) + """Seed CoPaw's workspace manifest so exposed HiClaw skills are active.""" + if not skill_names: + return + + workspace_dir = runtime_dir / "workspaces" / "default" + workspace_skills_dir = workspace_dir / "skills" + manifest_path = workspace_dir / "skill.json" + + manifest: dict[str, Any] = { + "schema_version": "workspace-skill-manifest.v1", + "version": 1, + "skills": {}, + } + if manifest_path.exists(): + try: + loaded = json.loads(manifest_path.read_text(encoding="utf-8")) + if isinstance(loaded, dict): + manifest.update(loaded) + except json.JSONDecodeError: + logger.warning( + "Invalid CoPaw skill manifest, recreating: %s", + manifest_path, + ) + + if not isinstance(manifest.get("skills"), dict): + manifest["skills"] = {} + skills = manifest["skills"] + changed = False + for skill_name in sorted(set(skill_names)): + if not (workspace_skills_dir / skill_name / "SKILL.md").exists(): + continue + existing = skills.get(skill_name) + if isinstance(existing, dict): + if existing.get("enabled") is not True: + existing["enabled"] = True + changed = True + if not existing.get("channels"): + existing["channels"] = ["all"] + changed = True + continue + skills[skill_name] = { + "enabled": True, + "channels": ["all"], + "source": "customized", + } + changed = True - Also sets COPAW_WORKING_DIR env var and patches copaw's module-level - path constants so the running process uses the correct directory. + if changed or not manifest_path.exists(): + workspace_dir.mkdir(parents=True, exist_ok=True) + manifest_path.write_text( + json.dumps(manifest, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) - """ - working_dir.mkdir(parents=True, exist_ok=True) - in_container = _is_in_container() - _write_config_json(openclaw_cfg, working_dir, in_container) - _write_agent_json(openclaw_cfg, working_dir, in_container, profile=profile) - _write_providers_json(openclaw_cfg, working_dir, in_container) +def dedup_customized_skills(runtime_dir: Path) -> None: + """Remove customized skills that shadow CoPaw builtins.""" + customized_dir = runtime_dir / "customized_skills" + if not customized_dir.is_dir(): + return - os.environ["COPAW_WORKING_DIR"] = str(working_dir) + try: + import copaw.agents.skills as _skills_pkg + builtin_skills_root = Path(_skills_pkg.__file__).resolve().parent + except (ImportError, AttributeError): + return - # Patch module-level constants (import-time values won't reflect env change) - _patch_copaw_paths(working_dir) + builtin_names: set[str] = set() + if builtin_skills_root.is_dir(): + for child in builtin_skills_root.iterdir(): + if child.is_dir() and not child.name.startswith("_"): + builtin_names.add(child.name) - # Copy providers.json into secret_dir — that's where copaw actually reads it - secret_dir = _secret_dir(working_dir) - providers_src = working_dir / "providers.json" - if providers_src.exists(): - shutil.copy2(providers_src, secret_dir / "providers.json") + if not builtin_names: + return + for child in list(customized_dir.iterdir()): + if child.is_dir() and child.name in builtin_names: + shutil.rmtree(child) + logger.info( + "Removed stale customized skill '%s' (now a builtin)", + child.name, + ) -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- -def _resolve_active_model(cfg: dict[str, Any]) -> dict[str, Any] | None: - """Return the config dict of the active model from openclaw.json, or None. +def sync_outer_prompt_files_to_inner(local_dir: Path, copaw_working_dir: Path) -> None: + """Copy OpenClaw-style prompt files into CoPaw's default workspace.""" + workspace_dir = copaw_working_dir / "workspaces" / "default" + workspace_dir.mkdir(parents=True, exist_ok=True) + + for name in ("SOUL.md", "AGENTS.md"): + src = local_dir / name + if src.exists(): + (workspace_dir / name).write_text(src.read_text()) + + heartbeat_dst = workspace_dir / "HEARTBEAT.md" + if not heartbeat_dst.exists(): + heartbeat_src = local_dir / "HEARTBEAT.md" + if heartbeat_src.exists(): + heartbeat_dst.write_text(heartbeat_src.read_text()) + + +def sync_rebridged_prompt_files_to_inner( + local_dir: Path, + copaw_working_dir: Path, + *, + get_soul: Callable[[], str | None], + get_agents_md: Callable[[], str | None], +) -> None: + """Refresh CoPaw prompt files during re-bridge while preserving legacy fallback.""" + soul_path = local_dir / "SOUL.md" + agents_path = local_dir / "AGENTS.md" + soul = soul_path.read_text() if soul_path.exists() else get_soul() + agents = agents_path.read_text() if agents_path.exists() else get_agents_md() + + workspace_dir = copaw_working_dir / "workspaces" / "default" + if soul: + workspace_dir.mkdir(parents=True, exist_ok=True) + (workspace_dir / "SOUL.md").write_text(soul) + if agents: + workspace_dir.mkdir(parents=True, exist_ok=True) + (workspace_dir / "AGENTS.md").write_text(agents) + + +def _matrix_raw(cfg: dict[str, Any]) -> dict[str, Any]: + return cfg.get("channels", {}).get("matrix", {}) + + +def _matrix_bool( + cfg: dict[str, Any], + camel_key: str, + snake_key: str, + default: bool, +) -> bool: + matrix = _matrix_raw(cfg) + if camel_key in matrix: + return bool(matrix.get(camel_key)) + if snake_key in matrix: + return bool(matrix.get(snake_key)) + return default + - Prefers agents.defaults.model.primary ("provider_id/model_id"); - falls back to the first model of the first provider. - """ +def _resolve_active_model(cfg: dict[str, Any]) -> dict[str, Any] | None: + """Return the config dict of the active model from openclaw.json, or None.""" providers_raw = cfg.get("models", {}).get("providers", {}) if not providers_raw: return None @@ -158,7 +468,6 @@ def _resolve_active_model(cfg: dict[str, Any]) -> dict[str, Any] | None: if m.get("id") == mid: return m - # Fallback: first provider, first model for provider_cfg in providers_raw.values(): models = provider_cfg.get("models", []) if models: @@ -168,7 +477,6 @@ def _resolve_active_model(cfg: dict[str, Any]) -> dict[str, Any] | None: def _resolve_context_window(cfg: dict[str, Any]) -> int | None: - """Return the contextWindow of the active (or first) model, or None.""" m = _resolve_active_model(cfg) if m and "contextWindow" in m: return int(m["contextWindow"]) @@ -176,197 +484,281 @@ def _resolve_context_window(cfg: dict[str, Any]) -> int | None: def _resolve_vision_enabled(cfg: dict[str, Any]) -> bool: - """Return True if the active model declares image input support. - - The openclaw.json model's ``input`` field is a list of supported modalities - (e.g. ["text", "image"]). If the field is absent we assume text-only to - avoid sending images to a model that cannot handle them. - """ + """True if the active model declares image input support.""" m = _resolve_active_model(cfg) if m is None: return False - input_types = m.get("input", []) - return "image" in input_types - + return "image" in m.get("input", []) -# --------------------------------------------------------------------------- -# config.json -# --------------------------------------------------------------------------- -def _write_config_json( +def _resolve_embedding_config( cfg: dict[str, Any], - working_dir: Path, in_container: bool, -) -> None: - matrix_raw = cfg.get("channels", {}).get("matrix", {}) - homeserver = _port_remap( - matrix_raw.get("homeserver", ""), in_container +) -> dict[str, Any] | None: + """Extract embedding config from openclaw's ``agents.defaults.memorySearch``.""" + memory_search = ( + cfg.get("agents", {}) + .get("defaults", {}) + .get("memorySearch", {}) ) - access_token = matrix_raw.get("accessToken", "") - - # DM allowlist - dm_cfg = matrix_raw.get("dm", {}) - dm_policy = dm_cfg.get("policy", "allowlist") - dm_allow_from: list[str] = dm_cfg.get("allowFrom", []) + if not memory_search: + return None - # Group allowlist - group_policy = matrix_raw.get("groupPolicy", "allowlist") - group_allow_from: list[str] = matrix_raw.get("groupAllowFrom", []) + remote = memory_search.get("remote", {}) + base_url = _port_remap(remote.get("baseUrl", ""), in_container) + api_key = remote.get("apiKey", "") + model = memory_search.get("model", "") - # Per-room/group config (pass through as-is for MatrixChannel to use) - groups = matrix_raw.get("groups", {}) + if not base_url or not model: + return None - # History limit: openclaw uses camelCase "historyLimit", bridge to snake_case. - history_limit = matrix_raw.get("historyLimit") - if history_limit is None: - history_limit = ( - cfg.get("messages", {}).get("groupChat", {}).get("historyLimit") + if not api_key: + logger.warning( + "memorySearch.remote.apiKey is empty; embedding requests will likely fail", ) - matrix_channel_cfg: dict[str, Any] = { - "enabled": matrix_raw.get("enabled", True), - "homeserver": homeserver, - "access_token": access_token, - "encryption": matrix_raw.get("encryption", False), - "dm_policy": dm_policy, - "allow_from": dm_allow_from, - "group_policy": group_policy, - "group_allow_from": group_allow_from, - "groups": groups, - "filter_tool_messages": True, - "filter_thinking": True, - "vision_enabled": _resolve_vision_enabled(cfg), + dimensions = ( + memory_search.get("outputDimensionality") + or int(os.environ.get("HICLAW_EMBEDDING_DIMENSIONS", "0")) + or 1024 + ) + + return { + "backend": "openai", + "api_key": api_key, + "base_url": base_url, + "model_name": model, + "dimensions": dimensions, + "enable_cache": True, + "use_dimensions": False, } - if history_limit is not None: - matrix_channel_cfg["history_limit"] = int(history_limit) - - config_path = working_dir / "config.json" - # Merge with existing config to avoid clobbering other settings - existing: dict[str, Any] = {} - if config_path.exists(): - with open(config_path) as f: - existing = json.load(f) - existing.setdefault("channels", {})["matrix"] = matrix_channel_cfg - # Disable console channel (we use Matrix) - existing["channels"].setdefault("console", {})["enabled"] = False - - # Bridge model context window → agents.running.max_input_length so that - # CoPaw's memory compaction threshold tracks the actual model capability. - # We read contextWindow from the first model of the primary (or first) - # provider to avoid hard-coding a default that mismatches the real model. - context_window = _resolve_context_window(cfg) - if context_window is not None: - existing.setdefault("agents", {}).setdefault("running", {})[ - "max_input_length" - ] = context_window - - with open(config_path, "w") as f: - json.dump(existing, f, indent=2, ensure_ascii=False) +def _resolve_history_limit(cfg: dict[str, Any]) -> int | None: + matrix_raw = _matrix_raw(cfg) + hl = matrix_raw.get("historyLimit") + if hl is None: + hl = cfg.get("messages", {}).get("groupChat", {}).get("historyLimit") + return int(hl) if hl is not None else None + + +def _derive_matrix_user_id(cfg: dict[str, Any], _in_container: bool = False) -> Any: + """Derive CoPaw Matrix user_id from OpenClaw config or env.""" + m = _matrix_raw(cfg) + uid = m.get("userId") or m.get("user_id") + if uid: + return uid + domain = os.environ.get("HICLAW_MATRIX_DOMAIN") or os.environ.get("MATRIX_DOMAIN", "") + if not domain: + return _MISSING + local = os.environ.get("HICLAW_WORKER_NAME") or os.environ.get("WORKER_NAME", "manager") + return f"@{local}:{domain}" + + +def _derive_heartbeat(cfg: dict[str, Any], _in_container: bool = False) -> Any: + """Map openclaw agents.defaults.heartbeat -> copaw heartbeat block.""" + hb = cfg.get("agents", {}).get("defaults", {}).get("heartbeat") + if not isinstance(hb, dict) or not hb: + return _MISSING + out: dict[str, Any] = {"enabled": True} + if "every" in hb: + out["every"] = hb["every"] + if "target" in hb: + out["target"] = hb["target"] + if "activeHours" in hb: + out["active_hours"] = hb["activeHours"] + return out + + +def _get_path(container: dict[str, Any], path: tuple[str, ...]) -> Any: + """Return value at ``path`` inside nested dicts, or ``_MISSING``.""" + node: Any = container + for key in path: + if not isinstance(node, dict) or key not in node: + return _MISSING + node = node[key] + return node + + +def _set_path(container: dict[str, Any], path: tuple[str, ...], value: Any) -> None: + """Assign ``value`` at ``path``, creating intermediate dicts as needed.""" + node = container + for key in path[:-1]: + nxt = node.get(key) + if not isinstance(nxt, dict): + nxt = {} + node[key] = nxt + node = nxt + node[path[-1]] = value + + +def _deep_merge_local_wins(remote: Any, local: Any) -> Any: + """Deep-merge two JSON trees where local leaves win over remote.""" + if isinstance(remote, dict) and isinstance(local, dict): + out: dict[str, Any] = {} + for k in remote.keys() | local.keys(): + if k in remote and k in local: + out[k] = _deep_merge_local_wins(remote[k], local[k]) + elif k in remote: + out[k] = remote[k] + else: + out[k] = local[k] + return out + return local +def _union_list(remote: list[Any] | None, local: list[Any] | None) -> list[Any]: + """Concat local then remote, dedup preserving order. Local entries win order.""" + seen: set[str] = set() + out: list[Any] = [] + for item in (local or []) + (remote or []): + try: + key = ( + json.dumps(item, sort_keys=True) + if isinstance(item, (dict, list)) + else repr(item) + ) + except TypeError: + key = repr(item) + if key not in seen: + seen.add(key) + out.append(item) + return out + + +def _apply_policy( + existing: dict[str, Any], + path: tuple[str, ...], + policy: str, + remote_value: Any, +) -> None: + """Apply one merge policy for one path. ``remote_value == _MISSING`` skips.""" + if remote_value is _MISSING: + return + + if policy == "remote-wins": + _set_path(existing, path, remote_value) + return + + if policy == "union": + local_value = _get_path(existing, path) + local_list = local_value if isinstance(local_value, list) else [] + remote_list = remote_value if isinstance(remote_value, list) else [] + _set_path(existing, path, _union_list(remote_list, local_list)) + return + + if policy == "deep-merge": + local_value = _get_path(existing, path) + if local_value is _MISSING: + _set_path(existing, path, remote_value) + else: + _set_path(existing, path, _deep_merge_local_wins(remote_value, local_value)) + return + + if policy == "seed": + local_value = _get_path(existing, path) + if local_value is _MISSING: + _set_path(existing, path, remote_value) + return + + raise ValueError(f"unknown merge policy: {policy}") + + +_PolicyDeriver = Callable[[dict[str, Any], bool], Any] + + +_CONTROLLER_FIELDS: list[tuple[tuple[str, ...], str, _PolicyDeriver]] = [ + (("channels", "matrix", "enabled"), + "remote-wins", lambda c, _: _matrix_raw(c).get("enabled", True)), + (("channels", "matrix", "homeserver"), + "remote-wins", lambda c, ic: _port_remap(_matrix_raw(c).get("homeserver", ""), ic)), + (("channels", "matrix", "access_token"), + "remote-wins", lambda c, _: _matrix_raw(c).get("accessToken", "")), + (("channels", "matrix", "user_id"), + "remote-wins", _derive_matrix_user_id), + (("channels", "matrix", "encryption"), + "remote-wins", lambda c, _: _matrix_raw(c).get("encryption", False)), + (("channels", "matrix", "dm_policy"), + "remote-wins", lambda c, _: _matrix_raw(c).get("dm", {}).get("policy", "allowlist")), + (("channels", "matrix", "group_policy"), + "remote-wins", lambda c, _: _matrix_raw(c).get("groupPolicy", "allowlist")), + (("channels", "matrix", "filter_tool_messages"), + "remote-wins", lambda c, _: _matrix_bool(c, "filterToolMessages", "filter_tool_messages", False)), + (("channels", "matrix", "filter_thinking"), + "remote-wins", lambda c, _: _matrix_bool(c, "filterThinking", "filter_thinking", True)), + (("channels", "matrix", "vision_enabled"), + "remote-wins", lambda c, _: _resolve_vision_enabled(c)), + (("channels", "matrix", "history_limit"), + "remote-wins", + lambda c, _: _resolve_history_limit(c) if _resolve_history_limit(c) is not None else _MISSING), + (("channels", "matrix", "allow_from"), + "union", lambda c, _: _matrix_raw(c).get("dm", {}).get("allowFrom", []) or []), + (("channels", "matrix", "group_allow_from"), + "union", lambda c, _: _matrix_raw(c).get("groupAllowFrom", []) or []), + (("channels", "matrix", "groups"), + "deep-merge", lambda c, _: _matrix_raw(c).get("groups", {}) or {}), + (("running", "max_input_length"), + "remote-wins", + lambda c, _: _resolve_context_window(c) if _resolve_context_window(c) is not None else _MISSING), + (("running", "embedding_config"), + "remote-wins", + lambda c, ic: _resolve_embedding_config(c, ic) if _resolve_embedding_config(c, ic) is not None else _MISSING), + (("heartbeat",), "seed", _derive_heartbeat), +] + + +def _apply_credential_guard(standard_dir: Path, runtime_dir: Path) -> None: + """Inject credagent.json paths into CoPaw's file guard config.""" + from copaw_worker.hooks.credential_guard import apply_credential_guard + + count = apply_credential_guard(standard_dir, runtime_dir) + if count > 0: + logger.info("bridge: credential guard applied %d protected paths", count) + + +def _ensure_config_json(working_dir: Path) -> None: + """Install config.json from template if missing. Never overwrite.""" + _install_from_template(working_dir / "config.json", "config.json") -# --------------------------------------------------------------------------- -# agent.json — per-agent config (CoPaw 1.0.2+ reads this, not config.json) -# --------------------------------------------------------------------------- def _write_agent_json( - cfg: dict[str, Any], + controller_config: dict[str, Any], working_dir: Path, in_container: bool, *, profile: str = "worker", + agent: str = "default", ) -> None: - """Create agent.json from template, then overlay Matrix channel config. + """Create agent.json from template if absent; then overlay controller fields.""" + agent_path = working_dir / "workspaces" / agent / "agent.json" + _install_from_template(agent_path, f"agent.{profile}.json") - CoPaw 1.0.2+ reads workspace/agent.json for per-agent configuration. - The template provides defaults; we overlay controller-owned fields - (Matrix access_token, homeserver, allowlists, context window). - """ - workspace_dir = working_dir / "workspaces" / "default" - workspace_dir.mkdir(parents=True, exist_ok=True) - agent_path = workspace_dir / "agent.json" - - # Install from template if missing - if not agent_path.exists(): - template_name = f"agent.{profile}.json" - try: - # Try loading from package templates directory - tmpl_dir = Path(__file__).resolve().parent / "templates" - tmpl_path = tmpl_dir / template_name - if tmpl_path.exists(): - shutil.copy2(str(tmpl_path), str(agent_path)) - else: - # Fallback: create minimal agent.json - minimal = { - "id": "default", - "name": "Manager" if profile == "manager" else "Default Agent", - "language": "zh", - "channels": { - "console": {"enabled": True}, - "matrix": { - "enabled": True, - "filter_tool_messages": False, - "filter_thinking": True, - "allow_from": [], - "group_allow_from": [], - "groups": {}, - }, - }, - "running": {"max_iters": 200}, - } - with open(agent_path, "w") as f: - json.dump(minimal, f, indent=2) - except Exception: - pass - - # Load existing agent.json try: with open(agent_path) as f: - agent_cfg = json.load(f) - except Exception: - agent_cfg = {"id": "default", "channels": {}, "running": {}} - - # Overlay Matrix channel config from openclaw.json - matrix_raw = cfg.get("channels", {}).get("matrix", {}) - homeserver = _port_remap(matrix_raw.get("homeserver", ""), in_container) - access_token = matrix_raw.get("accessToken", "") - - dm_cfg = matrix_raw.get("dm", {}) - dm_allow_from: list[str] = dm_cfg.get("allowFrom", []) - group_allow_from: list[str] = matrix_raw.get("groupAllowFrom", []) - groups = matrix_raw.get("groups", {}) - - matrix_ch = agent_cfg.setdefault("channels", {}).setdefault("matrix", {}) - matrix_ch["enabled"] = matrix_raw.get("enabled", True) - if homeserver: - matrix_ch["homeserver"] = homeserver - if access_token: - matrix_ch["access_token"] = access_token - matrix_ch["allow_from"] = dm_allow_from - matrix_ch["group_allow_from"] = group_allow_from - matrix_ch["groups"] = groups - matrix_ch["filter_tool_messages"] = True - matrix_ch["filter_thinking"] = True - - # Disable console channel (we use Matrix) - agent_cfg.setdefault("channels", {}).setdefault("console", {})["enabled"] = False - - # Bridge context window - context_window = _resolve_context_window(cfg) - if context_window is not None: - agent_cfg.setdefault("running", {})["max_input_length"] = context_window - - # Set workspace_dir - agent_cfg.setdefault("workspace_dir", str(workspace_dir)) + existing = json.load(f) + if not isinstance(existing, dict): + raise ValueError("agent.json root is not a dict") + except Exception as exc: + logger.warning( + "agent.json at %s is unreadable (%s); re-seeding from template", + agent_path, + exc, + ) + agent_path.unlink(missing_ok=True) + _install_from_template(agent_path, f"agent.{profile}.json") + with open(agent_path) as f: + existing = json.load(f) + + for path, policy, deriver in _CONTROLLER_FIELDS: + remote_value = deriver(controller_config, in_container) + _apply_policy(existing, path, policy, remote_value) + + # workspace_dir depends on local filesystem layout; seed once, never rewrite. + existing.setdefault("workspace_dir", str(agent_path.parent)) with open(agent_path, "w") as f: - json.dump(agent_cfg, f, indent=2, ensure_ascii=False) + json.dump(existing, f, indent=2, ensure_ascii=False) -# --------------------------------------------------------------------------- -# providers.json -# --------------------------------------------------------------------------- def _write_providers_json( cfg: dict[str, Any], @@ -380,9 +772,7 @@ def _write_providers_json( active_model = "" for provider_id, provider_cfg in providers_raw.items(): - base_url = _port_remap( - provider_cfg.get("baseUrl", ""), in_container - ) + base_url = _port_remap(provider_cfg.get("baseUrl", ""), in_container) api_key = provider_cfg.get("apiKey", "") models_raw = provider_cfg.get("models", []) @@ -403,13 +793,10 @@ def _write_providers_json( "chat_model": "OpenAIChatModel", } - # Use first provider + first model as active LLM if not active_provider_id and models: active_provider_id = provider_id active_model = models[0]["id"] - # Resolve active model from agents.defaults.model.primary - # Format: "provider_id/model_id" primary = ( cfg.get("agents", {}) .get("defaults", {}) @@ -435,30 +822,25 @@ def _write_providers_json( with open(providers_path, "w") as f: json.dump(providers_data, f, indent=2, ensure_ascii=False) - - -# --------------------------------------------------------------------------- -# Runtime-to-standard sync (worker uses this to push edits back to sync root) -# --------------------------------------------------------------------------- - -def bridge_runtime_to_standard(standard_dir): +def bridge_runtime_to_standard(standard_dir: Path) -> None: """Materialize runtime-space edits back into the standard sync root.""" sync_inner_prompt_files_to_outer(standard_dir) -def sync_inner_prompt_files_to_outer(local_dir): +def sync_inner_prompt_files_to_outer(local_dir: Path) -> None: """Copy agent-edited prompt files from CoPaw workspace back to sync root.""" inner_outer_files = ("AGENTS.md", "SOUL.md", "HEARTBEAT.md") - copaw_ws_dir = Path(local_dir) / ".copaw" / "workspaces" / "default" + copaw_ws_dir = local_dir / ".copaw" / "workspaces" / "default" for name in inner_outer_files: inner = copaw_ws_dir / name - outer = Path(local_dir) / name + outer = local_dir / name if not inner.exists(): continue try: inner_mtime = inner.stat().st_mtime except OSError: continue + # Only copy if inner is newer than outer (or outer doesn't exist) outer_mtime = outer.stat().st_mtime if outer.exists() else 0 if inner_mtime > outer_mtime: inner_content = inner.read_text(errors="replace") @@ -466,53 +848,59 @@ def sync_inner_prompt_files_to_outer(local_dir): if inner_content != outer_content: outer.write_text(inner_content) logger.debug( - "Inner->Outer sync: .copaw/workspaces/default/%s -> %s", + "Inner→Outer sync: .copaw/workspaces/default/%s → %s", name, name, ) -# --------------------------------------------------------------------------- -# CLI entry point — used by manager/scripts/init/start-copaw-manager.sh -# --------------------------------------------------------------------------- -def _main_cli(argv=None): + +def _main_cli(argv: list[str] | None = None) -> int: import argparse parser = argparse.ArgumentParser( prog="python -m copaw_worker.bridge", - description="Bridge Controller config into CoPaw runtime files.", + description=( + "Bridge Controller config (openclaw.json today) into CoPaw's " + "config.json / agent.json / providers.json." + ), ) parser.add_argument("--openclaw-json", required=True, - help="Path to openclaw.json") + help="Path to the controller config file (openclaw.json)") parser.add_argument("--working-dir", required=True, help="CoPaw working dir (e.g. ~/.copaw)") - parser.add_argument("--profile", default="manager", - choices=["worker", "manager"], - help="Template profile (default: manager)") + parser.add_argument("--profile", default="worker", choices=["worker", "manager"], + help="Template profile to use on first boot") + parser.add_argument("--agent", default="default", + help="CoPaw workspace key (maps to workspaces//). " + "Default: 'default'. Exposed for multi-agent setups.") args = parser.parse_args(argv) - from pathlib import Path as _Path - import json as _json - - openclaw_path = _Path(args.openclaw_json) + openclaw_path = Path(args.openclaw_json) if not openclaw_path.exists(): print(f"ERROR: {openclaw_path} not found", flush=True) return 1 - working_dir = _Path(args.working_dir) + working_dir = Path(args.working_dir) working_dir.mkdir(parents=True, exist_ok=True) with open(openclaw_path) as f: - controller_config = _json.load(f) + controller_config = json.load(f) bridge_openclaw_to_copaw( controller_config, working_dir, profile=args.profile, + agent=args.agent, + ) + print( + f"Bridged {openclaw_path} -> {working_dir} " + f"(profile={args.profile}, agent={args.agent})", + flush=True, ) return 0 if __name__ == "__main__": - import sys as _sys - _sys.exit(_main_cli()) + import sys + sys.exit(_main_cli()) diff --git a/copaw/src/copaw_worker/health.py b/copaw/src/copaw_worker/health.py index 3b296c59b..3eea58926 100644 --- a/copaw/src/copaw_worker/health.py +++ b/copaw/src/copaw_worker/health.py @@ -12,7 +12,7 @@ - Component health detection strategy: * copaw: Startup health: - - check: start uvicorn.Server for "copaw.app._app:app". + - check: start uvicorn.Server for "qwenpaw.app._app:app". - check: after starting the server, the worker performs one bounded startup probe against the native CoPaw health endpoint. - healthy: the startup probe gets HTTP 200 from @@ -446,5 +446,17 @@ def _max_tokens_param(model_id: str) -> str: return "max_tokens" +def check_qwenpaw_matrix_channel(marker_path: "str | Path") -> bool: + """Check whether the Matrix ready marker file exists. + + Returns True when the marker file written by the joined_rooms polling + task is present, signalling the qwenpaw Matrix channel has completed + initial sync and at least one room is joined. + """ + try: + return Path(marker_path).exists() + except Exception: + return False + def _now() -> str: return datetime.now(timezone.utc).isoformat() diff --git a/copaw/src/copaw_worker/worker.py b/copaw/src/copaw_worker/worker.py index b1600307b..f3fac25ff 100644 --- a/copaw/src/copaw_worker/worker.py +++ b/copaw/src/copaw_worker/worker.py @@ -2,30 +2,45 @@ Worker main entry point. Bootstrap flow: -1. Pull openclaw.json + SOUL.md + AGENTS.md from MinIO -2. Bridge openclaw.json -> CoPaw config.json + providers.json -3. Install MatrixChannel into CoPaw's custom_channels dir -4. Start CoPaw AgentRunner + ChannelManager (Matrix channel) +1. Ensure the MinIO client exists and initialize the worker FileSync workspace. +2. Mirror the worker prefix and shared folders from MinIO into standard space. +3. Load openclaw.json and refresh the Matrix access token/device when possible. +4. Bridge standard space into CoPaw runtime config, prompts, and skill links. +5. Start the local-to-remote preservation loop for runtime edits. +6. Launch CoPaw's FastAPI app; its lifecycle starts the runner, channels, and web console. """ from __future__ import annotations import asyncio +from contextlib import suppress +import json import logging import os import platform import shutil import stat from pathlib import Path -from typing import Optional +from typing import Any, Optional from rich.console import Console from rich.panel import Panel from copaw_worker.config import WorkerConfig -from copaw_worker.sync import FileSync, sync_loop, push_loop -from copaw_worker.bridge import bridge_openclaw_to_copaw +from copaw_worker.health import ( + ComponentHealth, + check_qwenpaw_matrix_channel, + HealthState, + check_copaw_service, + check_matrix_service, + check_model_service, +) +from copaw_worker.sync import FileSync, push_loop, sync_loop from copaw_worker.worker_api import WorkerAPIServer -from copaw_worker.health import HealthState, check_matrix_service +from copaw_worker.bridge import ( + bridge_standard_to_runtime, + sync_mcporter_config_to_runtime, + sync_skills_to_runtime, +) console = Console() logger = logging.getLogger(__name__) @@ -39,6 +54,13 @@ def __init__(self, config: WorkerConfig) -> None: self._copaw_working_dir: Optional[Path] = None self._runner = None self._channel_manager = None + self._push_task: Optional[asyncio.Task[None]] = None + self._pull_task: Optional[asyncio.Task[None]] = None + self._worker_api: WorkerAPIServer | None = None + self._server: Any | None = None + self._health: HealthState | None = None + self._openclaw_cfg: dict[str, Any] | None = None + self._matrix_ready_marker: Path | None = None # ------------------------------------------------------------------ # Public API @@ -56,23 +78,51 @@ async def run(self) -> None: async def stop(self) -> None: console.print("[yellow]Stopping worker...[/yellow]") - if self._channel_manager is not None: - try: - await self._channel_manager.stop_all() - except Exception: - pass - if self._runner is not None: - try: - await self._runner.stop() - except Exception: - pass + logger.info( + "worker stop requested worker=%s has_server=%s has_push_task=%s", + self.worker_name, + self._server is not None, + self._push_task is not None, + ) + + if self._server is not None: + self._server.should_exit = True + logger.info("uvicorn shutdown requested worker=%s", self.worker_name) + + if self._push_task is not None: + self._push_task.cancel() + with suppress(asyncio.CancelledError): + await self._push_task + self._push_task = None + logger.info("FileSync push loop stopped worker=%s", self.worker_name) + + if self._pull_task is not None: + self._pull_task.cancel() + with suppress(asyncio.CancelledError): + await self._pull_task + self._pull_task = None + logger.info("FileSync pull loop stopped worker=%s", self.worker_name) + + if self._worker_api is not None: + await self._worker_api.stop() + self._worker_api = None + console.print("[green]Worker stopped.[/green]") + logger.info("worker stopped worker=%s", self.worker_name) # ------------------------------------------------------------------ # Startup # ------------------------------------------------------------------ async def start(self) -> bool: + logger.info( + "worker startup begin worker=%s install_dir=%s minio_endpoint=%s bucket=%s console_port=%s", + self.worker_name, + self.config.install_dir, + self.config.minio_endpoint, + self.config.minio_bucket, + self.config.console_port, + ) console.print( Panel.fit( f"[bold green]CoPaw Worker[/bold green]\n" @@ -82,33 +132,61 @@ async def start(self) -> bool: ) # 1. Ensure mc (MinIO Client) is available + logger.info("startup stage=ensure_mc worker=%s", self.worker_name) self._ensure_mc() # 2. Init file sync + self._copaw_working_dir = self.config.install_dir / self.worker_name / ".copaw" + self._health = HealthState(self._copaw_working_dir / "health.json") + self._health.persist() + workspace_dir = self._copaw_working_dir / "workspaces" / "default" self.sync = FileSync( endpoint=self.config.minio_endpoint, access_key=self.config.minio_access_key, secret_key=self.config.minio_secret_key, bucket=self.config.minio_bucket, worker_name=self.worker_name, + worker_cr_name=self.config.worker_cr_name, secure=self.config.minio_secure, local_dir=self.config.install_dir / self.worker_name, + shared_dir=workspace_dir / "shared", + global_shared_dir=workspace_dir / "global-shared", + ) + logger.info( + "startup stage=init_sync worker=%s local_dir=%s copaw_working_dir=%s", + self.worker_name, + self.sync.local_dir, + self._copaw_working_dir, ) # 2. Full mirror from MinIO (restore all state: config, sessions, sync token, etc.) # Mirrors the OpenClaw worker's startup approach: pull everything first, - # then use selective sync during runtime. + # then preserve local changes via push_loop during runtime. console.print("[yellow]Pulling all files from MinIO...[/yellow]") + logger.info("startup stage=mirror_all worker=%s", self.worker_name) try: self.sync.mirror_all() except Exception as exc: + logger.exception("startup stage=mirror_all failed worker=%s", self.worker_name) console.print(f"[red]Failed to mirror from MinIO: {exc}[/red]") + self._health.update( + "sync", + "unhealthy", + f"startup mirror failed: {exc}", + { + "operation": "mirror_all", + "error_type": type(exc).__name__, + }, + ) return False + self._health.update("sync", "healthy", "startup mirror restored") # 3. Parse openclaw.json (already on disk after mirror_all) + logger.info("startup stage=load_config worker=%s", self.worker_name) try: openclaw_cfg = self.sync.get_config() except Exception as exc: + logger.exception("startup stage=load_config failed worker=%s", self.worker_name) console.print(f"[red]Failed to read config: {exc}[/red]") return False @@ -117,19 +195,55 @@ async def start(self) -> bool: # regenerated identity key causes other clients to reject key # distribution. Re-login creates a new device_id, matching the # Manager's behavior. + logger.info("startup stage=matrix_relogin worker=%s", self.worker_name) openclaw_cfg = self._matrix_relogin(openclaw_cfg) + self._openclaw_cfg = openclaw_cfg + + logger.info("startup stage=model_preflight worker=%s", self.worker_name) + model_status = check_model_service(openclaw_cfg) + self._health.update( + "model", + model_status.healthiness, + model_status.message, + model_status.details, + ) + if model_status.healthiness == "healthy": + logger.info("model preflight OK worker=%s details=%s", self.worker_name, model_status.details) + else: + logger.warning( + "model preflight failed worker=%s message=%s details=%s", + self.worker_name, + model_status.message, + model_status.details, + ) + console.print(f"[yellow]Model service preflight failed: {model_status.message}[/yellow]") + details = model_status.details or {} + notify_msg = ( + f"⚠️ Model service check failed: {model_status.message}\n" + f"Provider: {details.get('provider', 'unknown')}, " + f"Model: {details.get('model', 'unknown')}\n" + f"Please check model configuration." + ) + self._notify_matrix(notify_msg, openclaw_cfg) # 4. Set up CoPaw working directory - self._copaw_working_dir = self.config.install_dir / self.worker_name / ".copaw" self._copaw_working_dir.mkdir(parents=True, exist_ok=True) + self._matrix_ready_marker = ( + Path("/tmp") / f"hiclaw-copaw-{self.worker_name}-matrix-ready" + ) + self._matrix_ready_marker.unlink(missing_ok=True) + os.environ["HICLAW_MATRIX_CHANNEL_READY_FILE"] = str( + self._matrix_ready_marker, + ) + logger.info( + "startup stage=prepare_runtime_dir worker=%s copaw_working_dir=%s", + self.worker_name, + self._copaw_working_dir, + ) - # Write SOUL.md / AGENTS.md into CoPaw working dir (read from local copies pulled by mirror_all) - for name in ("SOUL.md", "AGENTS.md"): - src = self.sync.local_dir / name - if src.exists(): - (self._copaw_working_dir / name).write_text(src.read_text()) - - # 5. Bridge openclaw.json -> CoPaw config.json + providers.json + # 5. Bridge standard space -> CoPaw runtime space. + # This writes prompt files into workspaces/default/ and converts + # openclaw.json -> config.json / agent.json / providers.json. # Infer gateway port from FS endpoint so bridge's _port_remap uses # the correct host port instead of the hardcoded default. if not os.environ.get("HICLAW_PORT_GATEWAY"): @@ -137,179 +251,427 @@ async def start(self) -> bool: _parsed = urlparse(self.config.minio_endpoint) if _parsed.port: os.environ["HICLAW_PORT_GATEWAY"] = str(_parsed.port) + logger.info( + "inferred HICLAW_PORT_GATEWAY=%s from MinIO endpoint worker=%s", + _parsed.port, + self.worker_name, + ) console.print("[yellow]Bridging configuration to CoPaw...[/yellow]") + logger.info("startup stage=bridge worker=%s", self.worker_name) try: - bridge_openclaw_to_copaw(openclaw_cfg, self._copaw_working_dir) + skill_names = self.sync.list_skills() + bridge_standard_to_runtime( + self.sync.local_dir, + self._copaw_working_dir, + openclaw_cfg, + skill_names=skill_names, + ) except Exception as exc: + logger.exception("startup stage=bridge failed worker=%s", self.worker_name) console.print(f"[red]Config bridge failed: {exc}[/red]") + self._health.update( + "bridge", + "unhealthy", + f"standard-to-copaw bridge failed: {exc}", + { + "operation": "bridge_standard_to_runtime", + "error_type": type(exc).__name__, + }, + ) return False + self._health.update( + "bridge", + "healthy", + "standard-to-copaw bridge completed", + {"operation": "bridge_standard_to_runtime"}, + ) - # 6. Copy mcporter config into CoPaw working dir so mcporter finds - # ./config/mcporter.json when running from COPAW_WORKING_DIR - self._copy_mcporter_config() - - # 7. Install MatrixChannel into CoPaw's custom_channels dir - self._install_matrix_channel() - - # 8. Sync skills from MinIO into CoPaw's active_skills dir - self._sync_skills() + if skill_names: + console.print(f"[green]Skills installed: {len(skill_names)}[/green]") + logger.info( + "skills installed worker=%s count=%d", + self.worker_name, + len(skill_names), + ) + logger.debug("skills installed worker=%s names=%s", self.worker_name, skill_names) + else: + logger.info("No extra skills in MinIO for worker %s", self.worker_name) - # 9. Start background MinIO sync - asyncio.create_task( + # 6. Start runtime sync loops. Remote -> Local refreshes controller- + # managed config and skills; shared data remains explicit via filesync. + logger.info( + "startup stage=start_sync_loop worker=%s interval_seconds=%s", + self.worker_name, + self.config.sync_interval, + ) + self._pull_task = asyncio.create_task( sync_loop( self.sync, interval=self.config.sync_interval, on_pull=self._on_files_pulled, - ) + health=self._health, + ), + name=f"copaw-worker-{self.worker_name}-sync-loop", + ) + logger.info("startup stage=start_push_loop worker=%s interval_seconds=5", self.worker_name) + self._push_task = asyncio.create_task( + push_loop(self.sync, check_interval=5, health=self._health), + name=f"copaw-worker-{self.worker_name}-push-loop", ) - # Local -> Remote: change-triggered push (every 5s, mirrors openclaw worker behavior) - asyncio.create_task(push_loop(self.sync, check_interval=5)) + await self._start_worker_api() console.print("[bold green]Worker initialized.[/bold green]") - if self.config.console_port: - console.print( - f"[dim]Note: web console enabled on port {self.config.console_port} " - f"(~500MB extra RAM). Remove --console-port to save memory.[/dim]" - ) - else: - console.print( - "[dim]Tip: add --console-port 8088 to enable the web console " - "(costs ~500MB extra RAM).[/dim]" - ) + console.print( + f"[dim]Web console will start on port {self.config.console_port}[/dim]" + ) + logger.info("worker startup complete worker=%s", self.worker_name) return True # ------------------------------------------------------------------ # CoPaw runner # ------------------------------------------------------------------ - async def _run_copaw(self) -> None: - """Start CoPaw. If console_port is set, run the full FastAPI app via - uvicorn (gives access to the web console). Otherwise start the runner - and channel manager directly (lightweight, no HTTP server).""" - if self.config.console_port: - await self._run_copaw_with_console(self.config.console_port) + async def _start_worker_api(self) -> None: + self._worker_api = WorkerAPIServer( + host="0.0.0.0", + port=self.config.worker_port, + liveness_handler=self.build_worker_liveness, + readiness_handler=self.build_worker_readiness, + ) + await self._worker_api.start() + + async def build_worker_liveness(self) -> dict[str, Any]: + return { + "liveness": "alive", + "message": "worker api alive", + "details": {"worker_port": self.config.worker_port}, + } + + async def build_worker_readiness(self) -> dict[str, Any]: + if self._health is None: + raise RuntimeError("health state is not initialized") + openclaw_cfg = self._openclaw_cfg or {} + + copaw = await asyncio.to_thread(check_copaw_service, self.config.console_port) + self._health.update("copaw", copaw.healthiness, copaw.message, copaw.details) + + model = await asyncio.to_thread(check_model_service, openclaw_cfg) + self._health.update("model", model.healthiness, model.message, model.details) + if model.healthiness == "healthy": + logger.info("readiness model check OK worker=%s", self.worker_name) else: - await self._run_copaw_headless() + logger.warning( + "readiness model check failed worker=%s message=%s details=%s", + self.worker_name, model.message, model.details, + ) - async def _run_copaw_with_console(self, port: int) -> None: - """Run CoPaw's full FastAPI app (runner + channels + web console).""" - import uvicorn - from copaw.app.channels.registry import clear_builtin_channel_cache + matrix_cfg = openclaw_cfg.get("channels", {}).get("matrix", {}) + from .bridge import _port_remap, _is_in_container + homeserver = _port_remap(matrix_cfg.get("homeserver", ""), _is_in_container()) + matrix = await asyncio.to_thread(check_matrix_service, homeserver) + if matrix.healthiness == "healthy": + marker_ready = ( + self._matrix_ready_marker is not None + and self._matrix_ready_marker.exists() + ) + if not marker_ready: + matrix = ComponentHealth( + "unhealthy", + "Matrix channel is not ready", + { + **(matrix.details or {}), + "channelReady": False, + }, + ) + self._health.update("matrix", matrix.healthiness, matrix.message, matrix.details) + + snapshot = self._health.to_dict() + ready = snapshot["healthiness"] == "healthy" + return { + "readiness": "ready" if ready else "not_ready", + "healthiness": snapshot["healthiness"], + "message": "worker ready" if ready else snapshot["message"], + "components": snapshot["components"], + "updated_at": snapshot["updated_at"], + } + + async def _mark_copaw_startup_health( + self, + *, + timeout: float = 60, + interval: float = 0.5, + ) -> None: + """Mark CoPaw healthy once its own app health endpoint is reachable.""" + if self._health is None: + return - clear_builtin_channel_cache() + deadline = asyncio.get_running_loop().time() + timeout + while True: + status = await asyncio.to_thread( + check_copaw_service, + self.config.console_port, + ) + if status.healthiness == "healthy": + self._health.update("copaw", status.healthiness, status.message, status.details) + logger.info( + "copaw startup health OK worker=%s details=%s", + self.worker_name, + status.details, + ) + return + + if asyncio.get_running_loop().time() >= deadline: + self._health.update("copaw", status.healthiness, status.message, status.details) + logger.warning( + "copaw startup health failed worker=%s message=%s details=%s", + self.worker_name, + status.message, + status.details, + ) + return + await asyncio.sleep(interval) - # --- Worker API server (liveness/readiness probes) --- - worker_port = self.config.worker_port or (port + 1) - health_state = HealthState( - self._copaw_working_dir / "health.json" - ) - async def _liveness(): - snap = health_state.snapshot() - return {"liveness": "alive", "healthiness": snap.healthiness} + async def _poll_matrix_joined_rooms(self) -> None: + """Poll Matrix /_matrix/client/v3/joined_rooms until DM room appears. - async def _readiness(): - # Mark startup-only components as healthy — they were validated - # during _initialize() and don't need runtime re-checking. - for comp in ("sync", "bridge", "model"): - health_state.update(comp, "healthy", "validated at startup") + Writes _matrix_ready_marker when at least one room is joined. + """ + import json + import urllib.request - # Probe CoPaw console (TCP reachability — CoPaw has no /health endpoint) - import socket as _socket - try: - with _socket.create_connection(("127.0.0.1", port), timeout=3): - health_state.update("copaw", "healthy", f"console reachable on port {port}") - except Exception as e: - health_state.update("copaw", "unhealthy", f"console unreachable: {e}") + openclaw_cfg = self._openclaw_cfg or {} + matrix_cfg = openclaw_cfg.get("channels", {}).get("matrix", {}) + homeserver = matrix_cfg.get("homeserver", "") + access_token = matrix_cfg.get("accessToken", "") - # Probe Matrix homeserver - matrix_cfg = {} + if not homeserver or not access_token: + logger.warning( + "cannot poll joined_rooms: missing homeserver or token worker=%s", + self.worker_name, + ) + return + + from .bridge import _port_remap, _is_in_container + homeserver = _port_remap(homeserver, _is_in_container()) + url = f"{homeserver}/_matrix/client/v3/joined_rooms" + + deadline = asyncio.get_running_loop().time() + 60 + while True: try: - cfg_path = self.sync.local_dir / "openclaw.json" - if cfg_path.exists(): - import json as _json - matrix_cfg = _json.loads(cfg_path.read_text()).get("channels", {}).get("matrix", {}) - except Exception: - pass - homeserver = matrix_cfg.get("homeserver", "") - if homeserver: - mx_health = check_matrix_service(homeserver, timeout=5) - health_state.update("matrix", mx_health.healthiness, mx_health.message) - - snap = health_state.snapshot() - return { - "readiness": "ready" if snap.healthiness == "healthy" else "not_ready", - "healthiness": snap.healthiness, - "message": snap.message, - "components": { - k: {"healthiness": v.healthiness, "message": v.message} - for k, v in snap.components.items() - }, - } + req = urllib.request.Request( + url, + headers={"Authorization": f"Bearer {access_token}"}, + method="GET", + ) + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read().decode()) + rooms = data.get("joined_rooms", []) + if rooms: + self._matrix_ready_marker.write_text("ready\\n") + logger.info( + "matrix channel ready: %d joined rooms worker=%s", + len(rooms), + self.worker_name, + ) + return + except Exception as exc: + logger.debug("joined_rooms poll failed: %s", exc) + + if asyncio.get_running_loop().time() >= deadline: + logger.warning( + "matrix channel not ready within 60s worker=%s", + self.worker_name, + ) + return + await asyncio.sleep(2) - api_server = WorkerAPIServer( - host="0.0.0.0", - port=worker_port, - liveness_handler=_liveness, - readiness_handler=_readiness, + async def _run_copaw(self) -> None: + """Start CoPaw via FastAPI app (includes runner + channels + web console).""" + import uvicorn + from qwenpaw.app.channels.registry import clear_builtin_channel_cache + from copaw_worker.hooks import install_tool_hooks + + install_tool_hooks() + clear_builtin_channel_cache() + logger.info( + "starting CoPaw FastAPI app worker=%s host=%s port=%s", + self.worker_name, + "0.0.0.0", + self.config.console_port, ) - await api_server.start() uv_config = uvicorn.Config( - "copaw.app._app:app", + "qwenpaw.app._app:app", host="0.0.0.0", - port=port, + port=self.config.console_port, log_level="info", ) server = uvicorn.Server(uv_config) + self._server = server console.print( f"[bold green]CoPaw console available at " - f"http://127.0.0.1:{port}/[/bold green]" + f"http://127.0.0.1:{self.config.console_port}/[/bold green]" ) try: + startup_health_task = asyncio.create_task( + self._mark_copaw_startup_health(), + name=f"copaw-worker-{self.worker_name}-startup-health", + ) + matrix_ready_task = asyncio.create_task( + self._poll_matrix_joined_rooms(), + name=f"copaw-worker-{self.worker_name}-matrix-ready", + ) await server.serve() + if not server.should_exit and self._health is not None: + self._health.update( + "copaw", + "unhealthy", + "CoPaw app exited unexpectedly", + {"operation": "run_copaw"}, + ) except asyncio.CancelledError: server.should_exit = True + logger.info("CoPaw FastAPI app cancelled worker=%s", self.worker_name) + except Exception as exc: + logger.exception("CoPaw FastAPI app failed worker=%s", self.worker_name) + if self._health is not None: + self._health.update( + "copaw", + "unhealthy", + f"CoPaw app failed: {exc}", + { + "operation": "run_copaw", + "error_type": type(exc).__name__, + }, + ) + raise finally: - await api_server.stop() + if "startup_health_task" in locals() and not startup_health_task.done(): + startup_health_task.cancel() + with suppress(asyncio.CancelledError): + await startup_health_task + if self._server is server: + self._server = None + logger.info("CoPaw FastAPI app stopped worker=%s", self.worker_name) - async def _run_copaw_headless(self) -> None: - """Start CoPaw's AgentRunner + ChannelManager (no HTTP server).""" - from copaw.app.runner.runner import AgentRunner - from copaw.config.utils import load_config - from copaw.app.channels.manager import ChannelManager - from copaw.app.channels.utils import make_process_from_runner - from copaw.app.channels.registry import clear_builtin_channel_cache + async def _on_files_pulled(self, pulled_files: list[str]) -> None: + """Refresh runtime projections after controller-managed files change.""" + assert self.sync is not None + assert self._copaw_working_dir is not None - # Force registry reload so newly installed matrix_channel.py is picked up - clear_builtin_channel_cache() + needs_rebridge = "openclaw.json" in pulled_files + skills_changed = any(f.startswith("skills/") for f in pulled_files) + mcporter_changed = "config/mcporter.json" in pulled_files + credagent_changed = "config/credagent.json" in pulled_files + + if skills_changed: + skill_names = self.sync.list_skills() + sync_skills_to_runtime( + self.sync.local_dir, + self._copaw_working_dir, + skill_names, + ) - self._runner = AgentRunner() - await self._runner.start() + if mcporter_changed and not needs_rebridge: + sync_mcporter_config_to_runtime(self.sync.local_dir, self._copaw_working_dir) - # load_config reads COPAW_WORKING_DIR/config.json (set by bridge.py) - config = load_config() - self._channel_manager = ChannelManager.from_config( - process=make_process_from_runner(self._runner), - config=config, - on_last_dispatch=None, - ) - await self._channel_manager.start_all() + if credagent_changed and not needs_rebridge: + self._hot_update_credential_guard() + + if not needs_rebridge: + return - console.print("[bold green]CoPaw channels started. Worker is running.[/bold green]") + logger.info("openclaw config changed; re-bridging worker=%s", self.worker_name) + try: + openclaw_cfg = json.loads( + (self.sync.local_dir / "openclaw.json").read_text(encoding="utf-8") + ) + bridge_standard_to_runtime( + self.sync.local_dir, + self._copaw_working_dir, + openclaw_cfg, + skill_names=self.sync.list_skills() if skills_changed else None, + ) + self._openclaw_cfg = openclaw_cfg + self._hot_update_matrix_channel_config() + if self._health is not None: + self._health.update( + "bridge", + "healthy", + "runtime config re-bridged", + {"operation": "sync_loop"}, + ) + except Exception as exc: + logger.exception("runtime config re-bridge failed worker=%s", self.worker_name) + if self._health is not None: + self._health.update( + "bridge", + "unhealthy", + f"runtime config re-bridge failed: {exc}", + { + "operation": "sync_loop", + "error_type": type(exc).__name__, + }, + ) + def _hot_update_matrix_channel_config(self) -> None: + """Refresh MatrixChannel allowlists if the channel object is reachable.""" + if self._channel_manager is None or self._copaw_working_dir is None: + return + + agent_path = self._copaw_working_dir / "workspaces" / "default" / "agent.json" try: - while True: - await asyncio.sleep(60) - except asyncio.CancelledError: - pass - finally: - await self._channel_manager.stop_all() - await self._runner.stop() - # Clear refs so stop() doesn't double-call - self._channel_manager = None - self._runner = None + import json + agent_cfg = json.loads(agent_path.read_text(encoding="utf-8")) + matrix_cfg = (agent_cfg.get("channels") or {}).get("matrix") or {} + except Exception as exc: + logger.warning("failed to load re-bridged Matrix config: %s", exc) + return + + for channel in getattr(self._channel_manager, "_channels", []): + cfg = getattr(channel, "_cfg", None) + if cfg is None or not hasattr(cfg, "group_allow_from"): + continue + try: + parsed = type(cfg)(matrix_cfg) + except Exception as exc: + logger.warning("failed to parse re-bridged Matrix config: %s", exc) + return + for attr in ( + "allow_from", + "group_allow_from", + "group_combined_allow", + "groups", + "dm_policy", + "group_policy", + "vision_enabled", + "history_limit", + ): + if hasattr(parsed, attr): + setattr(cfg, attr, getattr(parsed, attr)) + logger.info("MatrixChannel policy hot-updated worker=%s", self.worker_name) + return + + def _hot_update_credential_guard(self) -> None: + """Re-apply credagent.json paths and reload CoPaw's file guard.""" + if self.sync is None or self._copaw_working_dir is None: + return + from copaw_worker.hooks.credential_guard import apply_credential_guard + + count = apply_credential_guard(self.sync.local_dir, self._copaw_working_dir) + try: + from copaw.security.tool_guard.engine import get_guard_engine + + get_guard_engine().reload_rules() + logger.info( + "credential guard hot-reloaded paths=%d worker=%s", + count, + self.worker_name, + ) + except Exception as exc: + logger.warning("credential guard reload failed: %s", exc) # ------------------------------------------------------------------ # Matrix re-login (E2EE device_id refresh) @@ -335,10 +697,28 @@ def _matrix_relogin(self, openclaw_cfg: dict) -> dict: matrix_password = self.sync._cat(password_key) if not matrix_password: + logger.warning( + "Matrix password not found in MinIO; skipping re-login worker=%s", + self.worker_name, + ) console.print( "[dim]No Matrix password found in MinIO, skipping re-login " "(E2EE may not work after restart)[/dim]" ) + self._health.update( + "matrix", + "unhealthy", + "matrix re-login skipped: missing homeserver or password", + { + "operation": "matrix_relogin", + "has_homeserver": bool( + openclaw_cfg.get("channels", {}) + .get("matrix", {}) + .get("homeserver", "") + ), + "has_password": False, + }, + ) return openclaw_cfg matrix_password = matrix_password.strip() @@ -349,6 +729,21 @@ def _matrix_relogin(self, openclaw_cfg: dict) -> dict: ) if not homeserver or not matrix_password: + logger.warning( + "Matrix re-login skipped due to missing homeserver/password worker=%s has_homeserver=%s", + self.worker_name, + bool(homeserver), + ) + self._health.update( + "matrix", + "unhealthy", + "matrix re-login skipped: missing homeserver or password", + { + "operation": "matrix_relogin", + "has_homeserver": bool(homeserver), + "has_password": bool(matrix_password), + }, + ) return openclaw_cfg login_url = f"{homeserver}/_matrix/client/v3/login" @@ -377,23 +772,221 @@ def _matrix_relogin(self, openclaw_cfg: dict) -> dict: config_path = self.sync.local_dir / "openclaw.json" with open(config_path, "w") as f: json.dump(openclaw_cfg, f, indent=2, ensure_ascii=False) + logger.info( + "Matrix re-login OK worker=%s device=%s", + self.worker_name, + new_device, + ) console.print( f"[green]Matrix re-login OK[/green] " - f"(device: {new_device}, token: {new_token[:10]}...)" + f"(device: {new_device})" + ) + self._health.update( + "matrix", + "healthy", + "matrix re-login succeeded", + { + "operation": "matrix_relogin", + "device_id": new_device, + }, ) else: + logger.warning( + "Matrix re-login returned no token worker=%s device=%s", + self.worker_name, + new_device, + ) console.print( "[yellow]Matrix re-login returned no token, " "using existing access token[/yellow]" ) + self._health.update( + "matrix", + "unhealthy", + "matrix re-login returned no access token", + { + "operation": "matrix_relogin", + "device_id": new_device, + }, + ) except Exception as exc: + logger.exception("Matrix re-login failed worker=%s", self.worker_name) console.print( f"[yellow]Matrix re-login failed: {exc} — " f"using existing access token (E2EE may not work)[/yellow]" ) + self._health.update( + "matrix", + "unhealthy", + f"matrix re-login failed: {exc}", + { + "operation": "matrix_relogin", + "error_type": type(exc).__name__, + }, + ) return openclaw_cfg + def _notify_matrix(self, message: str, openclaw_cfg: dict) -> None: + """Best-effort send a m.notice to all joined Matrix rooms. + + Uses the raw Matrix CS API (urllib) since the nio client is not yet + running at startup time. Accepts pending room invitations first so + that brand-new workers that have not yet joined any room still + receive the notification. + """ + import json + import urllib.request + import uuid + + matrix_cfg = openclaw_cfg.get("channels", {}).get("matrix", {}) + from .bridge import _port_remap, _is_in_container + homeserver = _port_remap( + matrix_cfg.get("homeserver", ""), _is_in_container() + ) + access_token = matrix_cfg.get("accessToken", "") + + if not homeserver or not access_token: + logger.debug("notify_matrix skipped: missing homeserver or token") + return + + headers = {"Authorization": f"Bearer {access_token}"} + + rooms = self._wait_for_matrix_rooms(homeserver, headers) + if not rooms: + logger.warning( + "notify_matrix: no rooms available after waiting, " + "notification skipped worker=%s", + self.worker_name, + ) + return + + body = json.dumps({ + "msgtype": "m.notice", + "body": message, + }).encode("utf-8") + + for room_id in rooms: + txn_id = uuid.uuid4().hex + url = ( + f"{homeserver}/_matrix/client/v3/rooms/" + f"{urllib.request.quote(room_id, safe='')}" + f"/send/m.room.message/{txn_id}" + ) + try: + req = urllib.request.Request( + url, + data=body, + headers={**headers, "Content-Type": "application/json"}, + method="PUT", + ) + urllib.request.urlopen(req, timeout=10) + except Exception as exc: + logger.debug( + "notify_matrix: failed to send to %s: %s", room_id, exc + ) + + def _wait_for_matrix_rooms( + self, + homeserver: str, + headers: dict[str, str], + *, + timeout: float = 120, + poll_interval: float = 3, + ) -> list[str]: + """Wait until the worker has at least one joined Matrix room. + + On each poll cycle: accept pending invites, then check joined_rooms. + Returns the room list, or [] after *timeout* seconds. + """ + import json + import time + import urllib.request + + deadline = time.monotonic() + timeout + + while True: + self._accept_matrix_invites(homeserver, headers) + + try: + req = urllib.request.Request( + f"{homeserver}/_matrix/client/v3/joined_rooms", + headers=headers, + method="GET", + ) + with urllib.request.urlopen(req, timeout=10) as resp: + rooms = json.loads(resp.read()).get("joined_rooms", []) + except Exception as exc: + logger.debug("notify_matrix: failed to list joined rooms: %s", exc) + rooms = [] + + if rooms: + return rooms + + remaining = deadline - time.monotonic() + if remaining <= 0: + return [] + + logger.info( + "notify_matrix: no rooms yet, retrying in %.0fs " + "(%.0fs remaining) worker=%s", + poll_interval, + remaining, + self.worker_name, + ) + time.sleep(min(poll_interval, remaining)) + + def _accept_matrix_invites( + self, + homeserver: str, + headers: dict[str, str], + ) -> None: + """Accept all pending Matrix room invitations via initial sync.""" + import json + import urllib.request + + sync_filter = json.dumps( + {"room": {"timeline": {"limit": 0}, "state": {"limit": 0}}} + ) + sync_url = ( + f"{homeserver}/_matrix/client/v3/sync" + f"?filter={urllib.request.quote(sync_filter)}&timeout=0" + ) + try: + req = urllib.request.Request(sync_url, headers=headers, method="GET") + with urllib.request.urlopen(req, timeout=15) as resp: + sync_data = json.loads(resp.read()) + except Exception as exc: + logger.debug("notify_matrix: sync for invites failed: %s", exc) + return + + invited = sync_data.get("rooms", {}).get("invite", {}) + if not invited: + return + + logger.info( + "notify_matrix: accepting %d pending room invite(s) worker=%s", + len(invited), + self.worker_name, + ) + for room_id in invited: + join_url = ( + f"{homeserver}/_matrix/client/v3/join/" + f"{urllib.request.quote(room_id, safe='')}" + ) + try: + req = urllib.request.Request( + join_url, + data=b"{}", + headers={**headers, "Content-Type": "application/json"}, + method="POST", + ) + urllib.request.urlopen(req, timeout=10) + except Exception as exc: + logger.debug( + "notify_matrix: failed to join %s: %s", room_id, exc + ) + # ------------------------------------------------------------------ # mc (MinIO Client) auto-install # ------------------------------------------------------------------ @@ -425,10 +1018,12 @@ def _ensure_mc(self) -> None: install_dir.mkdir(parents=True, exist_ok=True) dest = install_dir / "mc" else: + logger.warning("mc auto-install not supported system=%s", system) console.print(f"[yellow]mc auto-install not supported on {system}, please install mc manually[/yellow]") return console.print(f"[yellow]mc not found, downloading from {url}...[/yellow]") + logger.warning("mc not found; downloading worker=%s url=%s dest=%s", self.worker_name, url, dest) try: import httpx with httpx.stream("GET", url, follow_redirects=True, timeout=60) as resp: @@ -440,199 +1035,7 @@ def _ensure_mc(self) -> None: dest.chmod(dest.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) os.environ["PATH"] = str(install_dir) + os.pathsep + os.environ.get("PATH", "") console.print(f"[green]mc installed to {dest}[/green]") + logger.info("mc installed worker=%s dest=%s", self.worker_name, dest) except Exception as exc: + logger.exception("mc auto-install failed worker=%s url=%s dest=%s", self.worker_name, url, dest) console.print(f"[yellow]mc auto-install failed: {exc}. Please install mc manually.[/yellow]") - - # ------------------------------------------------------------------ - # Skills sync - # ------------------------------------------------------------------ - - def _sync_skills(self) -> None: - """Pull skills from MinIO and install into CoPaw's active_skills dir. - - First seeds all CoPaw built-in skills (pdf, xlsx, docx, etc.) as a base - layer, then overlays skills pushed from MinIO by the Manager (which take - precedence and can override built-ins). - """ - active_skills_dir = self._copaw_working_dir / "active_skills" - active_skills_dir.mkdir(parents=True, exist_ok=True) - - # 0. Remove stale customized_skills that duplicate builtins. - # After an upgrade the new CoPaw image may ship builtins (pdf, pptx …) - # that were previously only available as customized copies. If the old - # customized_skills/ directory persists on disk, CoPaw loads both the - # builtin AND the customized copy, causing duplicates in the UI. - self._dedup_customized_skills() - - # 1. Seed CoPaw built-in skills as base layer. - # bridge.py has already patched copaw.constant.ACTIVE_SKILLS_DIR to point - # here, so sync_skills_to_working_dir() writes to the correct directory. - try: - from copaw.agents.skills_manager import sync_skills_to_working_dir - synced, skipped = sync_skills_to_working_dir(skill_names=None, force=False) - logger.info( - "Seeded CoPaw built-in skills: %d installed, %d already existed", - synced, skipped, - ) - except Exception as exc: - logger.warning("Failed to seed CoPaw built-in skills: %s", exc) - - # 2. Overlay with Manager-pushed skills from MinIO (higher priority). - skill_names = self.sync.list_skills() - if not skill_names: - logger.info("No extra skills in MinIO for worker %s", self.worker_name) - - for skill_name in skill_names: - src_skill_dir = self.sync.local_dir / "skills" / skill_name - dst_skill_dir = active_skills_dir / skill_name - if not src_skill_dir.exists(): - continue - dst_skill_dir.mkdir(parents=True, exist_ok=True) - # Mirror the full skill directory (SKILL.md + scripts/ + references/) - for src_file in src_skill_dir.rglob("*"): - if not src_file.is_file(): - continue - rel = src_file.relative_to(src_skill_dir) - dst_file = dst_skill_dir / rel - dst_file.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(src_file, dst_file) - # Restore +x on shell scripts - if dst_file.suffix == ".sh": - dst_file.chmod(dst_file.stat().st_mode | 0o111) - logger.info("Installed MinIO skill: %s", skill_name) - - if skill_names: - console.print(f"[green]Skills installed: {', '.join(skill_names)}[/green]") - - # 3. Remove stale skills from active_skills/ that are no longer in MinIO - # and are not CoPaw builtins. - try: - import copaw.agents.skills as _skills_pkg - builtin_skills_root = Path(_skills_pkg.__file__).resolve().parent - builtin_names = { - c.name for c in builtin_skills_root.iterdir() - if c.is_dir() and not c.name.startswith("_") - } - except (ImportError, AttributeError): - builtin_names = set() - - keep_names = builtin_names | set(skill_names) | {"file-sync"} - for child in list(active_skills_dir.iterdir()): - if child.is_dir() and child.name not in keep_names: - shutil.rmtree(child) - logger.info("Removed stale active skill: %s", child.name) - - def _dedup_customized_skills(self) -> None: - """Remove customized skills that shadow CoPaw builtins. - - CoPaw discovers skills from two independent directories: - - builtin: /copaw/agents/skills// - - customized: /customized_skills// - - After an upgrade, new builtins may overlap with stale customized copies - left over from a previous version. This method detects the overlap and - removes the customized copy so only the (newer) builtin is loaded. - """ - customized_dir = self._copaw_working_dir / "customized_skills" - if not customized_dir.is_dir(): - return - - # Collect builtin skill names from the installed copaw package - try: - import copaw.agents.skills as _skills_pkg - builtin_skills_root = Path(_skills_pkg.__file__).resolve().parent - except (ImportError, AttributeError): - return - - builtin_names: set[str] = set() - if builtin_skills_root.is_dir(): - for child in builtin_skills_root.iterdir(): - if child.is_dir() and not child.name.startswith("_"): - builtin_names.add(child.name) - - if not builtin_names: - return - - # Remove customized copies that duplicate a builtin - import shutil - for child in list(customized_dir.iterdir()): - if child.is_dir() and child.name in builtin_names: - shutil.rmtree(child) - logger.info( - "Removed stale customized skill '%s' (now a builtin)", - child.name, - ) - - # ------------------------------------------------------------------ - # MatrixChannel installation - # ------------------------------------------------------------------ - - def _install_matrix_channel(self) -> None: - """Copy matrix_channel.py into COPAW_WORKING_DIR/custom_channels/. - - CoPaw's CUSTOM_CHANNELS_DIR = WORKING_DIR / "custom_channels", and - WORKING_DIR is read from COPAW_WORKING_DIR env var at import time. - We set COPAW_WORKING_DIR in bridge.py before this runs, so the - directory is already correct. - """ - custom_channels_dir = self._copaw_working_dir / "custom_channels" - custom_channels_dir.mkdir(parents=True, exist_ok=True) - src = Path(__file__).parent / "matrix_channel.py" - dst = custom_channels_dir / "matrix_channel.py" - shutil.copy2(src, dst) - logger.debug("MatrixChannel installed to %s", dst) - - # ------------------------------------------------------------------ - # mcporter config - # ------------------------------------------------------------------ - - def _copy_mcporter_config(self) -> None: - """Copy mcporter config from workspace root into CoPaw working dir. - - pull_all writes to /config/mcporter.json (workspace root), - but mcporter looks for ./config/mcporter.json relative to cwd, which - is COPAW_WORKING_DIR (.copaw/). Copy it there so mcporter finds it. - """ - src = self.sync.local_dir / "config" / "mcporter.json" - if not src.exists(): - return - dst = self._copaw_working_dir / "config" / "mcporter.json" - dst.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(src, dst) - logger.info("mcporter config copied to %s", dst) - - # ------------------------------------------------------------------ - # File sync callback - # ------------------------------------------------------------------ - - async def _on_files_pulled(self, pulled_files: list[str]) -> None: - """Re-bridge config when Manager-managed files change (openclaw.json). - SOUL.md, AGENTS.md are Worker-managed and not pulled; use local copies.""" - # Re-sync skills if any skill file changed - if any(f.startswith("skills/") for f in pulled_files): - self._sync_skills() - - # Copy mcporter config into CoPaw working dir when it changes - if "config/mcporter.json" in pulled_files: - self._copy_mcporter_config() - - needs_rebridge = "openclaw.json" in pulled_files - if not needs_rebridge: - return - - console.print("[yellow]Config changed, re-bridging...[/yellow]") - try: - openclaw_cfg = self.sync.get_config() - # Use local Worker-managed files; fallback to MinIO for initial bootstrap - soul = (self.sync.local_dir / "SOUL.md").read_text() if (self.sync.local_dir / "SOUL.md").exists() else self.sync.get_soul() - agents = (self.sync.local_dir / "AGENTS.md").read_text() if (self.sync.local_dir / "AGENTS.md").exists() else self.sync.get_agents_md() - - if soul: - (self._copaw_working_dir / "SOUL.md").write_text(soul) - if agents: - (self._copaw_working_dir / "AGENTS.md").write_text(agents) - - bridge_openclaw_to_copaw(openclaw_cfg, self._copaw_working_dir) - console.print("[green]Config re-bridged.[/green]") - except Exception as exc: - console.print(f"[red]Re-bridge failed: {exc}[/red]") diff --git a/copaw/src/matrix/channel.py b/copaw/src/matrix/channel.py index b86e03f71..e7b8e5e10 100644 --- a/copaw/src/matrix/channel.py +++ b/copaw/src/matrix/channel.py @@ -43,11 +43,13 @@ logger = logging.getLogger("copaw.channels.matrix") # --------------------------------------------------------------------------- -# Lazy import of QwenPaw base types so this file can be syntax-checked without -# qwenpaw installed (it's only executed inside a qwenpaw environment). +# Lazy import of QwenPaw base types with copaw fallback. This file is used by +# both the Worker (copaw/Dockerfile, has qwenpaw) and Manager +# (manager/Dockerfile.copaw, may only have copaw). The fallback ensures the +# channel is recognized in either environment. # --------------------------------------------------------------------------- try: - from copaw.app.channels.base import BaseChannel + from qwenpaw.app.channels.base import BaseChannel from agentscope_runtime.engine.schemas.agent_schemas import ( AudioContent, ContentType, @@ -59,10 +61,20 @@ VideoContent, ) except ImportError: # pragma: no cover - BaseChannel = object # type: ignore[assignment,misc] - ContentType = None # type: ignore[assignment] - MessageType = None # type: ignore[assignment] - RunStatus = None # type: ignore[assignment] + # Fallback to copaw package for environments without qwenpaw (e.g. Manager). + # copaw.app.channels.base.BaseChannel is a drop-in of qwenpaw's BaseChannel + # and ensures the copaw framework's channel loader (issubclass check) passes. + try: + from copaw.app.channels.base import BaseChannel + from agentscope_runtime.engine.schemas.agent_schemas import ( + AudioContent, ContentType, FileContent, ImageContent, + MessageType, RunStatus, TextContent, VideoContent, + ) + except ImportError: + BaseChannel = object # type: ignore[assignment,misc] + ContentType = None # type: ignore[assignment] + MessageType = None # type: ignore[assignment] + RunStatus = None # type: ignore[assignment] CHANNEL_KEY = "matrix" diff --git a/copaw/tests/test_worker_health.py b/copaw/tests/test_worker_health.py index 3e82a4dc6..7c7ec2162 100644 --- a/copaw/tests/test_worker_health.py +++ b/copaw/tests/test_worker_health.py @@ -623,12 +623,12 @@ async def serve(self): fake_uvicorn.Server = FakeServer monkeypatch.setitem(sys.modules, "uvicorn", fake_uvicorn) - fake_registry = types.ModuleType("copaw.app.channels.registry") + fake_registry = types.ModuleType("qwenpaw.app.channels.registry") fake_registry.clear_builtin_channel_cache = lambda: None - monkeypatch.setitem(sys.modules, "copaw", types.ModuleType("copaw")) - monkeypatch.setitem(sys.modules, "copaw.app", types.ModuleType("copaw.app")) - monkeypatch.setitem(sys.modules, "copaw.app.channels", types.ModuleType("copaw.app.channels")) - monkeypatch.setitem(sys.modules, "copaw.app.channels.registry", fake_registry) + monkeypatch.setitem(sys.modules, "qwenpaw", types.ModuleType("qwenpaw")) + monkeypatch.setitem(sys.modules, "qwenpaw.app", types.ModuleType("qwenpaw.app")) + monkeypatch.setitem(sys.modules, "qwenpaw.app.channels", types.ModuleType("qwenpaw.app.channels")) + monkeypatch.setitem(sys.modules, "qwenpaw.app.channels.registry", fake_registry) fake_hooks = types.ModuleType("copaw_worker.hooks") fake_hooks.install_tool_hooks = lambda: None diff --git a/tests/lib/test-helpers.sh b/tests/lib/test-helpers.sh index 40f8c2635..94c92d883 100755 --- a/tests/lib/test-helpers.sh +++ b/tests/lib/test-helpers.sh @@ -527,6 +527,11 @@ while True: last_error = exc if time.monotonic() >= deadline: print(str(last_error), file=sys.stderr) + print("=== DEBUG: last readyz before timeout ===", file=sys.stderr) + try: + print(json.dumps(ready, ensure_ascii=False, indent=2), file=sys.stderr) + except Exception: + print("(unable to serialize last ready state)", file=sys.stderr) sys.exit(10) time.sleep(2) @@ -737,7 +742,7 @@ dump_diagnostics() { worker) local container="hiclaw-worker-${name}" printf "\n--- docker logs %s (last 100 lines) ---\n" "${container}" - docker logs --tail 100 "${container}" 2>&1 || true + docker logs --tail 200 "${container}" 2>&1 || true printf "\n--- container state: %s ---\n" "${container}" docker inspect --format='status={{.State.Status}} exit={{.State.ExitCode}} oom={{.State.OOMKilled}} restarts={{.RestartCount}} startedAt={{.State.StartedAt}} finishedAt={{.State.FinishedAt}} error={{.State.Error}}' "${container}" 2>&1 || true printf "\n--- controller logs (recent, filtered for %s) ---\n" "${name}"