Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin/

# Test binary
*.test

# Output of the go coverage tool
*.out

# Python
__pycache__/
*.py[cod]

# IDE
.idea/
.vscode/
*.swp
*.swo

# OS
.DS_Store
Thumbs.db

# Local Docs (Do not commit)
/docs/
31 changes: 31 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[project]
name = "linkwork-executor"
version = "0.1.0"
description = "LinkWork Executor"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"linkwork-agent-sdk>=0.1.0",
]

[project.scripts]
linkwork-executor-worker = "linkwork_executor.work.worker:main"

[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
]

[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"

[tool.setuptools.packages.find]
where = ["src"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]

5 changes: 5 additions & 0 deletions src/linkwork_executor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""LinkWork Executor package."""

from .work.worker import Worker

__all__ = ["Worker"]
16 changes: 16 additions & 0 deletions src/linkwork_executor/work/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Work layer package."""

from .consumer import GitRepoConfig, Task, TaskConsumer
from .lifecycle import LifecycleManager
from .workspace import TaskStatus, WorkspaceManager
from .worker import Worker

__all__ = [
"GitRepoConfig",
"LifecycleManager",
"Task",
"TaskConsumer",
"TaskStatus",
"Worker",
"WorkspaceManager",
]
55 changes: 55 additions & 0 deletions src/linkwork_executor/work/agents_guide.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Workspace AGENTS.md helper utilities."""

from __future__ import annotations

from pathlib import Path

AGENTS_FILE_PATH = Path("/workspace/AGENTS.md")
AGENTS_DEFAULT_HEADER = "# AGENTS Workspace Guide"
SKILL_GUIDANCE_HEADER = "## Skill 使用规范(Claude Native)"
SKILL_GUIDANCE_BLOCK = """## Skill 使用规范(Claude Native)

1. Skill 是“流程指引”,MCP 是“外部能力”。
- 仅加载 Skill 不代表具备外部检索能力。
- 若 Skill 依赖 Tavily,必须确保已绑定并加载 Tavily MCP。

2. loaded != referenced。
- `SKILLS_LOADED` 仅表示已加载。
- 只有实际调用 `Skill` 工具或读取 Skill 文件,才算真正引用。

3. 禁止 slash 命令话术。
- 不输出 `/commit`、`/review-pr` 等样式。
- 统一使用自然语言和结构化结论表达。

4. 大结果检索必须走分批流程。
- Tavily 多查询结果先落地 `/tmp/*.json`。
- 再做“去重压缩 → 分批小结 → 最终汇总”,避免上下文或超时问题。

5. 产物规范。
- `/tmp` 仅临时文件,不作为最终交付。
- 最终文件必须写入 `/workspace/workstation`。

6. 能力降级时必须显式说明。
- 若 MCP 不可用或证据不足,明确“待确认项”和下一步动作,禁止编造结论。"""


def ensure_workspace_agents_skill_guidance(path: Path = AGENTS_FILE_PATH) -> None:
"""Ensure AGENTS.md contains skill guidance section."""
path.parent.mkdir(parents=True, exist_ok=True)
existing = path.read_text(encoding="utf-8") if path.exists() else ""
updated = upsert_workspace_agents_skill_guidance(existing)
if updated != existing:
path.write_text(updated, encoding="utf-8")


def upsert_workspace_agents_skill_guidance(content: str) -> str:
"""Append skill guidance section when missing."""
normalized = content.replace("\r\n", "\n")
if SKILL_GUIDANCE_HEADER in normalized:
return normalized

prefix = normalized.strip()
if not prefix:
return f"{AGENTS_DEFAULT_HEADER}\n\n{SKILL_GUIDANCE_BLOCK}\n"
return f"{prefix}\n\n{SKILL_GUIDANCE_BLOCK}\n"

247 changes: 247 additions & 0 deletions src/linkwork_executor/work/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
"""Task consumer for Redis queue."""

from __future__ import annotations

import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import PurePosixPath
from typing import Awaitable, Callable

from linkwork_agent_sdk.constants import BLPOP_TIMEOUT_SECONDS, build_task_queue_key
from linkwork_agent_sdk.redis import RedisClient

_logger = logging.getLogger(__name__)

TaskCallback = Callable[["Task"], Awaitable[None] | None]
TaskErrorCallback = Callable[["Task", Exception], Awaitable[None] | None]
TaskAbortCallback = Callable[["Task", str], Awaitable[None] | None]

DELIVERY_MODE_GIT = "git"
DELIVERY_MODE_OSS = "oss"


@dataclass(slots=True)
class GitRepoConfig:
repo: str
origin_branch: str
task_branch: str = ""


@dataclass(slots=True)
class Task:
task_id: str
user_id: str
content: str
system_prompt_append: str
delivery_mode: str
selected_model: str = ""
role_id: str = ""
git_config: list[GitRepoConfig] = field(default_factory=list)
file_path_mappings: list["FilePathMapping"] = field(default_factory=list)


@dataclass(slots=True)
class FilePathMapping:
runtime_path: str
real_path: str


class TaskConsumer:
"""Consume task JSON payload from Redis List using BLPOP."""

def __init__(
self,
redis_client: RedisClient,
workstation_id: str,
blpop_timeout: int = BLPOP_TIMEOUT_SECONDS,
on_task_assigned: TaskCallback | None = None,
on_task_completed: TaskCallback | None = None,
on_task_failed: TaskErrorCallback | None = None,
on_task_aborted: TaskAbortCallback | None = None,
) -> None:
self._redis = redis_client
self._workstation_id = workstation_id
self._blpop_timeout = blpop_timeout
self._on_task_assigned = on_task_assigned
self._on_task_completed = on_task_completed
self._on_task_failed = on_task_failed
self._on_task_aborted = on_task_aborted
self._last_task_time = time.time()

@property
def queue_key(self) -> str:
return build_task_queue_key(self._workstation_id)

@property
def last_task_time(self) -> float:
return self._last_task_time

async def consume_once(self) -> Task | None:
result = await self._redis.blpop(self.queue_key, timeout=self._blpop_timeout)

if result is None:
return None

_, payload = result
try:
raw_task = json.loads(payload)
git_config = _parse_git_config(raw_task.get("git_config"))
delivery_mode = _parse_delivery_mode(raw_task.get("delivery_mode"), git_config)
file_path_mappings = _parse_file_path_mappings(raw_task.get("file_path_mappings"))

task_id = str(raw_task["task_id"]).strip()
user_id = str(raw_task["user_id"]).strip()
content = str(raw_task["content"]).strip()
system_prompt_append = str(raw_task["system_prompt_append"]).strip()

if not task_id:
raise ValueError("task_id cannot be empty")
if not user_id:
raise ValueError("user_id cannot be empty")
if not content:
raise ValueError("content cannot be empty")
if not system_prompt_append:
raise ValueError("system_prompt_append cannot be empty")

task = Task(
task_id=task_id,
user_id=user_id,
content=content,
system_prompt_append=system_prompt_append,
delivery_mode=delivery_mode,
selected_model=str(raw_task.get("selected_model", "")).strip(),
role_id=str(raw_task.get("role_id", "")).strip(),
git_config=git_config,
file_path_mappings=file_path_mappings,
)
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as error:
_logger.error(
"TaskConsumer: task payload parse failed, skipping: %s (payload=%s)",
error,
payload[:500] if isinstance(payload, str) else payload,
)
return None

self._last_task_time = time.time()
await _maybe_await(self._on_task_assigned, task)
return task

async def mark_completed(self, task: Task) -> None:
await _maybe_await(self._on_task_completed, task)

async def mark_failed(self, task: Task, error: Exception) -> None:
await _maybe_await(self._on_task_failed, task, error)

async def mark_aborted(self, task: Task, reason: str) -> None:
await _maybe_await(self._on_task_aborted, task, reason)


def _parse_git_config(raw: object) -> list[GitRepoConfig]:
if raw is None:
return []
if not isinstance(raw, list):
raise ValueError("git_config must be an array")

parsed: list[GitRepoConfig] = []
for item in raw:
if not isinstance(item, dict):
raise ValueError("git_config item must be an object")

repo = str(item.get("repo", "")).strip()
if not repo:
raise ValueError("git_config.repo is required")

origin_branch_raw = item.get("origin_branch")
if origin_branch_raw is None:
raise ValueError("git_config.origin_branch is required")

origin_branch = str(origin_branch_raw).strip()
if not origin_branch:
raise ValueError("git_config.origin_branch cannot be empty")

task_branch = str(item.get("task_branch", "")).strip()
parsed.append(
GitRepoConfig(
repo=repo,
origin_branch=origin_branch,
task_branch=task_branch,
)
)

return parsed


def _parse_delivery_mode(raw: object, git_config: list[GitRepoConfig]) -> str:
mode = str(raw).strip().lower() if raw is not None else ""
if not mode:
inferred = DELIVERY_MODE_GIT if git_config else DELIVERY_MODE_OSS
_logger.warning(
"task payload missing delivery_mode, fallback to inferred mode: %s (legacy compatibility)",
inferred,
)
mode = inferred

if mode not in {DELIVERY_MODE_GIT, DELIVERY_MODE_OSS}:
raise ValueError(f"delivery_mode must be '{DELIVERY_MODE_GIT}' or '{DELIVERY_MODE_OSS}'")

if mode == DELIVERY_MODE_GIT and not git_config:
raise ValueError("delivery_mode=git requires non-empty git_config")

return mode


def _parse_file_path_mappings(raw: object) -> list[FilePathMapping]:
if raw is None:
return []
if not isinstance(raw, list):
raise ValueError("file_path_mappings must be an array")

parsed: list[FilePathMapping] = []
seen_runtime_paths: set[str] = set()
for item in raw:
if not isinstance(item, dict):
raise ValueError("file_path_mappings item must be an object")
runtime_path = str(item.get("runtime_path", "")).strip()
real_path = str(item.get("real_path", "")).strip()
if not runtime_path or not real_path:
raise ValueError("file_path_mappings.runtime_path and real_path are required")

runtime_path = _normalize_doc_runtime_path(runtime_path, "runtime_path")
real_path = _normalize_doc_runtime_path(real_path, "real_path")

if runtime_path in seen_runtime_paths:
raise ValueError(f"file_path_mappings has duplicate runtime_path: {runtime_path}")
seen_runtime_paths.add(runtime_path)

parsed.append(FilePathMapping(runtime_path=runtime_path, real_path=real_path))

return parsed


def _normalize_doc_runtime_path(path: str, field_name: str) -> str:
normalized = PurePosixPath(path)
if not path.startswith("/"):
raise ValueError(f"file_path_mappings.{field_name} must be absolute: {path}")
if ".." in normalized.parts:
raise ValueError(f"file_path_mappings.{field_name} must not contain '..': {path}")
normalized_text = str(normalized)
if normalized_text.startswith("/workspace/user/") or normalized_text.startswith(
"/workspace/workstation/"
):
return normalized_text
raise ValueError(
"file_path_mappings."
f"{field_name} must start with /workspace/user/ or /workspace/workstation/: {path}"
)


async def _maybe_await(callback: Callable[..., Awaitable[None] | None] | None, *args: object) -> None:
if callback is None:
return
result = callback(*args)
if result is None:
return
if isinstance(result, Awaitable):
await result
Loading