diff --git a/.claude/skills/dataflow-dev copy/SKILL.md b/.claude/skills/dataflow-dev/SKILL.md similarity index 100% rename from .claude/skills/dataflow-dev copy/SKILL.md rename to .claude/skills/dataflow-dev/SKILL.md diff --git a/.claude/skills/dataflow-dev copy/context/dev_notes.md b/.claude/skills/dataflow-dev/context/dev_notes.md similarity index 100% rename from .claude/skills/dataflow-dev copy/context/dev_notes.md rename to .claude/skills/dataflow-dev/context/dev_notes.md diff --git a/.claude/skills/dataflow-dev copy/context/knowledge_base.md b/.claude/skills/dataflow-dev/context/knowledge_base.md similarity index 100% rename from .claude/skills/dataflow-dev copy/context/knowledge_base.md rename to .claude/skills/dataflow-dev/context/knowledge_base.md diff --git a/.claude/skills/dataflow-dev copy/diagnostics/known_issues.md b/.claude/skills/dataflow-dev/diagnostics/known_issues.md similarity index 100% rename from .claude/skills/dataflow-dev copy/diagnostics/known_issues.md rename to .claude/skills/dataflow-dev/diagnostics/known_issues.md diff --git a/.claude/skills/dataflow-dev copy/scripts/check_updates.sh b/.claude/skills/dataflow-dev/scripts/check_updates.sh similarity index 100% rename from .claude/skills/dataflow-dev copy/scripts/check_updates.sh rename to .claude/skills/dataflow-dev/scripts/check_updates.sh diff --git a/.claude/skills/dataflow-dev copy/templates/operator_template.py b/.claude/skills/dataflow-dev/templates/operator_template.py similarity index 100% rename from .claude/skills/dataflow-dev copy/templates/operator_template.py rename to .claude/skills/dataflow-dev/templates/operator_template.py diff --git a/.claude/skills/dataflow-dev copy/templates/pipeline_template.py b/.claude/skills/dataflow-dev/templates/pipeline_template.py similarity index 100% rename from .claude/skills/dataflow-dev copy/templates/pipeline_template.py rename to .claude/skills/dataflow-dev/templates/pipeline_template.py diff --git a/.claude/skills/dataflow-dev copy/templates/prompt_template.py b/.claude/skills/dataflow-dev/templates/prompt_template.py similarity index 100% rename from .claude/skills/dataflow-dev copy/templates/prompt_template.py rename to .claude/skills/dataflow-dev/templates/prompt_template.py diff --git a/.claude/skills/dataflow-operator-builder copy/SKILL.md b/.claude/skills/dataflow-operator-builder/SKILL.md similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/SKILL.md rename to .claude/skills/dataflow-operator-builder/SKILL.md diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/cli/operator_cli.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/cli/operator_cli.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/cli/operator_cli.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/cli/operator_cli.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/operators/eval_operator.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/operators/eval_operator.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/operators/eval_operator.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/operators/eval_operator.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/operators/filter_operator.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/operators/filter_operator.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/operators/filter_operator.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/operators/filter_operator.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/operators/generate_operator.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/operators/generate_operator.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/operators/generate_operator.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/operators/generate_operator.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/operators/refine_operator.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/operators/refine_operator.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/operators/refine_operator.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/operators/refine_operator.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/package/cli_init.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/package/cli_init.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/package/cli_init.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/package/cli_init.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/package/operator_pkg_init.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/package/operator_pkg_init.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/package/operator_pkg_init.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/package/operator_pkg_init.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/package/operators_root_init.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/package/operators_root_init.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/package/operators_root_init.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/package/operators_root_init.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/package/package_init.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/package/package_init.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/package/package_init.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/package/package_init.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/tests/test_operator_registry.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/tests/test_operator_registry.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/tests/test_operator_registry.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/tests/test_operator_registry.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/tests/test_operator_smoke.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/tests/test_operator_smoke.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/tests/test_operator_smoke.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/tests/test_operator_smoke.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/assets/templates/tests/test_operator_unit.py.tmpl b/.claude/skills/dataflow-operator-builder/assets/templates/tests/test_operator_unit.py.tmpl similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/assets/templates/tests/test_operator_unit.py.tmpl rename to .claude/skills/dataflow-operator-builder/assets/templates/tests/test_operator_unit.py.tmpl diff --git a/.claude/skills/dataflow-operator-builder copy/references/acceptance-checklist.md b/.claude/skills/dataflow-operator-builder/references/acceptance-checklist.md similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/references/acceptance-checklist.md rename to .claude/skills/dataflow-operator-builder/references/acceptance-checklist.md diff --git a/.claude/skills/dataflow-operator-builder copy/references/askuserquestion-rounds.md b/.claude/skills/dataflow-operator-builder/references/askuserquestion-rounds.md similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/references/askuserquestion-rounds.md rename to .claude/skills/dataflow-operator-builder/references/askuserquestion-rounds.md diff --git a/.claude/skills/dataflow-operator-builder copy/references/cli-shell-guidelines.md b/.claude/skills/dataflow-operator-builder/references/cli-shell-guidelines.md similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/references/cli-shell-guidelines.md rename to .claude/skills/dataflow-operator-builder/references/cli-shell-guidelines.md diff --git a/.claude/skills/dataflow-operator-builder copy/references/gotchas.md b/.claude/skills/dataflow-operator-builder/references/gotchas.md similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/references/gotchas.md rename to .claude/skills/dataflow-operator-builder/references/gotchas.md diff --git a/.claude/skills/dataflow-operator-builder copy/references/operator-contract.md b/.claude/skills/dataflow-operator-builder/references/operator-contract.md similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/references/operator-contract.md rename to .claude/skills/dataflow-operator-builder/references/operator-contract.md diff --git a/.claude/skills/dataflow-operator-builder copy/references/output-checklist.md b/.claude/skills/dataflow-operator-builder/references/output-checklist.md similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/references/output-checklist.md rename to .claude/skills/dataflow-operator-builder/references/output-checklist.md diff --git a/.claude/skills/dataflow-operator-builder copy/references/registration-rules.md b/.claude/skills/dataflow-operator-builder/references/registration-rules.md similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/references/registration-rules.md rename to .claude/skills/dataflow-operator-builder/references/registration-rules.md diff --git a/.claude/skills/dataflow-operator-builder copy/scripts/build_operator_artifacts.py b/.claude/skills/dataflow-operator-builder/scripts/build_operator_artifacts.py similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/scripts/build_operator_artifacts.py rename to .claude/skills/dataflow-operator-builder/scripts/build_operator_artifacts.py diff --git a/.claude/skills/dataflow-operator-builder copy/scripts/example_spec.json b/.claude/skills/dataflow-operator-builder/scripts/example_spec.json similarity index 100% rename from .claude/skills/dataflow-operator-builder copy/scripts/example_spec.json rename to .claude/skills/dataflow-operator-builder/scripts/example_spec.json diff --git a/.claude/skills/generating-dataflow-pipeline copy/SKILL.md b/.claude/skills/generating-dataflow-pipeline/SKILL.md similarity index 100% rename from .claude/skills/generating-dataflow-pipeline copy/SKILL.md rename to .claude/skills/generating-dataflow-pipeline/SKILL.md diff --git a/.claude/skills/generating-dataflow-pipeline copy/examples/basic_generate_and_filter.md b/.claude/skills/generating-dataflow-pipeline/examples/basic_generate_and_filter.md similarity index 100% rename from .claude/skills/generating-dataflow-pipeline copy/examples/basic_generate_and_filter.md rename to .claude/skills/generating-dataflow-pipeline/examples/basic_generate_and_filter.md diff --git a/.claude/skills/generating-dataflow-pipeline copy/examples/kbc_pdf_to_qa.md b/.claude/skills/generating-dataflow-pipeline/examples/kbc_pdf_to_qa.md similarity index 100% rename from .claude/skills/generating-dataflow-pipeline copy/examples/kbc_pdf_to_qa.md rename to .claude/skills/generating-dataflow-pipeline/examples/kbc_pdf_to_qa.md diff --git a/.claude/skills/generating-dataflow-pipeline copy/examples/multi_stage_pipeline.md b/.claude/skills/generating-dataflow-pipeline/examples/multi_stage_pipeline.md similarity index 100% rename from .claude/skills/generating-dataflow-pipeline copy/examples/multi_stage_pipeline.md rename to .claude/skills/generating-dataflow-pipeline/examples/multi_stage_pipeline.md diff --git a/.claude/skills/generating-dataflow-pipeline copy/examples/multifield_scoring.md b/.claude/skills/generating-dataflow-pipeline/examples/multifield_scoring.md similarity index 100% rename from .claude/skills/generating-dataflow-pipeline copy/examples/multifield_scoring.md rename to .claude/skills/generating-dataflow-pipeline/examples/multifield_scoring.md diff --git a/.claude/skills/generating-dataflow-pipeline copy/templates/pipeline_template.py b/.claude/skills/generating-dataflow-pipeline/templates/pipeline_template.py similarity index 100% rename from .claude/skills/generating-dataflow-pipeline copy/templates/pipeline_template.py rename to .claude/skills/generating-dataflow-pipeline/templates/pipeline_template.py diff --git a/.claude/skills/prompt-template-builder copy/SKILL.md b/.claude/skills/prompt-template-builder/SKILL.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/SKILL.md rename to .claude/skills/prompt-template-builder/SKILL.md diff --git a/.claude/skills/prompt-template-builder copy/examples/filter_rewrite_finance.md b/.claude/skills/prompt-template-builder/examples/filter_rewrite_finance.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/examples/filter_rewrite_finance.md rename to .claude/skills/prompt-template-builder/examples/filter_rewrite_finance.md diff --git a/.claude/skills/prompt-template-builder copy/examples/multifield_scoring_prompt.md b/.claude/skills/prompt-template-builder/examples/multifield_scoring_prompt.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/examples/multifield_scoring_prompt.md rename to .claude/skills/prompt-template-builder/examples/multifield_scoring_prompt.md diff --git a/.claude/skills/prompt-template-builder copy/examples/single_field_generation.md b/.claude/skills/prompt-template-builder/examples/single_field_generation.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/examples/single_field_generation.md rename to .claude/skills/prompt-template-builder/examples/single_field_generation.md diff --git a/.claude/skills/prompt-template-builder copy/references/acceptance-checklist.md b/.claude/skills/prompt-template-builder/references/acceptance-checklist.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/references/acceptance-checklist.md rename to .claude/skills/prompt-template-builder/references/acceptance-checklist.md diff --git a/.claude/skills/prompt-template-builder copy/references/askuserquestion-rounds.md b/.claude/skills/prompt-template-builder/references/askuserquestion-rounds.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/references/askuserquestion-rounds.md rename to .claude/skills/prompt-template-builder/references/askuserquestion-rounds.md diff --git a/.claude/skills/prompt-template-builder copy/references/gotchas.md b/.claude/skills/prompt-template-builder/references/gotchas.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/references/gotchas.md rename to .claude/skills/prompt-template-builder/references/gotchas.md diff --git a/.claude/skills/prompt-template-builder copy/references/input-schema.md b/.claude/skills/prompt-template-builder/references/input-schema.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/references/input-schema.md rename to .claude/skills/prompt-template-builder/references/input-schema.md diff --git a/.claude/skills/prompt-template-builder copy/references/output-contract.md b/.claude/skills/prompt-template-builder/references/output-contract.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/references/output-contract.md rename to .claude/skills/prompt-template-builder/references/output-contract.md diff --git a/.claude/skills/prompt-template-builder copy/templates/decision_json_template.md b/.claude/skills/prompt-template-builder/templates/decision_json_template.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/templates/decision_json_template.md rename to .claude/skills/prompt-template-builder/templates/decision_json_template.md diff --git a/.claude/skills/prompt-template-builder copy/templates/final_response_template.md b/.claude/skills/prompt-template-builder/templates/final_response_template.md similarity index 100% rename from .claude/skills/prompt-template-builder copy/templates/final_response_template.md rename to .claude/skills/prompt-template-builder/templates/final_response_template.md diff --git a/.claude/skills/prompt-template-builder copy/templates/prompt_class_template.py.tmpl b/.claude/skills/prompt-template-builder/templates/prompt_class_template.py.tmpl similarity index 100% rename from .claude/skills/prompt-template-builder copy/templates/prompt_class_template.py.tmpl rename to .claude/skills/prompt-template-builder/templates/prompt_class_template.py.tmpl diff --git a/.gitignore b/.gitignore index 591db26..2bdfd1a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,19 @@ *.zip __pycache__ -dist_release/ \ No newline at end of file +dist_release/ + +# local runtime artifacts +data/ +backend/dataflow_cache/ +backend/cache_local/ +backend/data/dataflow_core/ + +# editor / OS +.DS_Store +*.swp +*.pyc +.idea/ +.vscode/ + +# stale "* copy" skill dirs accidentally committed in earlier branches +.claude/skills/* copy/ diff --git a/CLAUDE.md b/CLAUDE.md index 05820f2..5951d68 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,42 +1,49 @@ # DataFlow WebUI — Project Rules -## WebUI Change Policy (MANDATORY) - -All modifications to `backend/app/` and `frontend/src/` MUST comply with this policy. - -### Allowed Changes ONLY - -1. **Operator Rendering Support (Frontend)** — Add or fix rendering for DataFlow operators not yet properly displayed in the WebUI pipeline canvas. - - New/fixed node components in `components/manage/mainFlow/nodes/` - - Fix parameter rendering in operator config panels - - Add missing operator-specific UI elements (param editors, display widgets) - - Fix display bugs for operators that exist in DataFlow but show incorrectly - - Verification: operator exists in `OPERATOR_REGISTRY` (`GET /api/v1/operators/`) but WebUI renders it incorrectly. - -2. **Operator API Support (Backend)** — Add or fix API endpoints that serve existing DataFlow operator/pipeline/serving/dataset data. - - Missing CRUD endpoint for an existing resource type - - Fix serialization of operator parameters - - Fix edge cases in existing endpoints (null handling, pagination) - - Add query params or filters to existing list endpoints - -### Prohibited Changes - -- **NO** new analytical pages or dashboards (e.g., taxonomy analytics, custom reports) -- **NO** new backend endpoints for derived/computed data (e.g., `/api/v1/taxonomy/analytics`, `/api/v1/custom-stats`) -- **NO** new sidebar navigation items (`navList` in `manage/index.vue`) -- **NO** new routes in `router/Manage/index.js` -- **NO** new `views/manage/*/index.vue` pages -- **NO** new services/registries in `core/container.py` -- **NO** new config paths in `core/config.py` -- **NO** feature-level functionality not tied to making existing operators render in the UI - -### Decision Rule - -Before any frontend/backend change, check: -1. Does it make an existing DataFlow operator render correctly in the WebUI? → Allowed -2. Does it fix a bug in an existing API endpoint? → Allowed -3. Does it add missing CRUD for an existing resource type? → Allowed -4. Anything else → **PROHIBITED** — ask user to explicitly override - -If the user requests a prohibited change, explain the policy and ask for explicit override confirmation. +> **Deployment assumption**: this project is a **single-user, local-first** tool. +> Each developer/user runs both backend and frontend on their own machine. +> There is **no authentication, no registration, no multi-tenant concern**. +> Do NOT add login pages, JWT/session infrastructure, or per-user ACL. + +## WebUI Change Policy + +The previous strict allow-list (only operator-rendering fixes / existing-endpoint +patches) has been **relaxed**. New routes, new sidebar items, new backend +endpoints, and new services are all permitted when they serve the goal of +"completing DataFlow-WebUI as a usable local tool". + +### Encouraged work areas + +1. **Agent chat experience** — tool-call progress indicators, session + persistence, multi-session switching, reconnect logic, robust error surfacing. +2. **Prompt management** — both read-only browsing of the built-in + `PROMPT_REGISTRY` AND first-class support for user-defined prompt templates + (file-based persistence under `backend/data/prompts/`, CRUD endpoints, + editor UI, reference from operator config). +3. **JSON Schema management** — container-based service injection, schema + validation (ajv), monaco/codemirror editor, deep integration with operator + parameter panels so saved schemas can be chosen for structured LLM outputs. +4. **Operator rendering** — still a priority: new/fixed node components in + `components/manage/mainFlow/nodes/`, parameter editors, correct display of + operators that already exist in `OPERATOR_REGISTRY`. +5. **Repo hygiene** — consolidating/deleting duplicate markdown, removing + stray `* copy` skill directories, fixing obvious code smells (same-name + route handlers, hard-coded URLs, services bypassing the DI container). + +### Still prohibited + +- **Authentication / registration / multi-user features.** Single-user local + tool only. +- **External telemetry / analytics uploads.** No outbound reporting. +- **Breaking changes to `OPERATOR_REGISTRY` / `PROMPT_REGISTRY` semantics** — + always extend, never redefine, so upstream DataFlow stays compatible. + +### Decision rule + +Before a change, ask: + +1. Does it make DataFlow-WebUI more useful as a single-user local tool? → OK +2. Does it introduce a login page, account system, or remote telemetry? → **NO** +3. Does it silently mutate DataFlow core-registry semantics? → **NO** +4. Everything else → proceed, and keep changes minimal + consistent with + existing patterns (`core/container.py`, `api/v1/envelope.py`, etc.). diff --git a/backend/app/api/v1/endpoints/agent.py b/backend/app/api/v1/endpoints/agent.py index 50639af..baf02ac 100644 --- a/backend/app/api/v1/endpoints/agent.py +++ b/backend/app/api/v1/endpoints/agent.py @@ -5,23 +5,37 @@ 消息协议(前端 → 后端): { "type": "chat", "message": "用户输入" } - { "type": "abort_session" } ← 终止正在运行的 claude 进程并清除上下文 - { "type": "clear_session" } ← 仅清除上下文(不终止进程,已废弃,保留兼容) + { "type": "abort_session" } ← 终止正在运行的 claude 进程并清除当前会话(保留历史) + { "type": "clear_session" } ← 脱离当前会话(不 kill 进程),等价于"新建一个对话" + { "type": "new_session" } ← 同 clear_session,语义更清晰 + { "type": "switch_session", ← 切换到一条历史会话,下一轮对话继续它 + "session_id": "..." } 消息协议(后端 → 前端): { "type": "text_chunk", "content": "..." } ← Agent 回复文本(流式) + { "type": "tool_call_start", "tool_use_id": "...", + "name": "mcp__dataflow__list_operators", + "input_preview": "{ ... }" } ← Agent 开始调用某个工具 + { "type": "tool_call_end", "tool_use_id": "...", + "is_error": false, "output_preview": "..." } ← 工具调用完成 { "type": "sync_pipeline", "pipeline": {...}, "nodes": [...], "edges": [...] } ← Pipeline 同步到 DAG 编辑器 { "type": "done" } ← 本轮回复完成 { "type": "session_aborted" } ← 会话已终止并清除 - { "type": "session_cleared" } ← 会话已清除(兼容旧版) + { "type": "session_cleared" } ← 会话已清除(新对话,保留历史) + { "type": "session_switched", ← 已切换到历史会话 + "session_id": "..." } { "type": "error", "message": "..." } ← 错误信息 """ import asyncio import json -from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException +from pydantic import BaseModel from app.services.agent_session import AgentSessionManager from app.core.logger_setup import get_logger +from app.api.v1.resp import ok +from app.api.v1.envelope import ApiResponse +from typing import List, Optional logger = get_logger(__name__) @@ -97,6 +111,14 @@ async def agent_websocket(websocket: WebSocket): async def run_chat_stream(message: str): """在后台 Task 中运行 Agent 流式输出""" nonlocal active_task + + def _truncate(obj, limit=400): + try: + s = json.dumps(obj, ensure_ascii=False) + except Exception: + s = str(obj) + return s if len(s) <= limit else s[:limit] + "…" + try: async for chunk in agent_manager.chat_stream( user_id=user_id, @@ -115,16 +137,71 @@ async def run_chat_stream(message: str): }) # Claude Code 格式:type=assistant,完整消息体 + # content 数组中可能包含 text / tool_use 块 elif chunk_type == "assistant": message_obj = chunk.get("message", {}) for content_block in message_obj.get("content", []): - if content_block.get("type") == "text": + block_type = content_block.get("type") + if block_type == "text": text = content_block.get("text", "") if text: await ws_manager.send(user_id, { "type": "text_chunk", "content": text, }) + elif block_type == "tool_use": + # Agent 开始调用一个工具 + await ws_manager.send(user_id, { + "type": "tool_call_start", + "tool_use_id": content_block.get("id", ""), + "name": content_block.get("name", ""), + "input_preview": _truncate( + content_block.get("input", {}) + ), + }) + + # Claude Code 格式:type=result,本轮 agent 循环的最终结束信号 + # (可能因为成功、kvcache 超限、turn 超限等原因结束) + elif chunk_type == "result": + subtype = chunk.get("subtype", "") + stop_reason = chunk.get("stop_reason", "") + is_error = bool(chunk.get("is_error", False)) + result_text = chunk.get("result", "") or "" + # 只在"异常结束但文本又为空"时推送警告——成功结束由主循环稍后的 done 覆盖 + if (is_error or stop_reason in ( + "kvcache_no_enough", "max_turns_exceeded", + )) and not result_text.strip(): + hint = { + "kvcache_no_enough": + "模型上下文/KV 缓存已塞满,Agent 中止。建议:清空对话重来," + "明确让 Agent 先调 list_operator_categories 再按类别拉算子," + "避免一次性 list_operators。", + "max_turns_exceeded": + "Agent 工具调用轮次达到上限。建议用更具体的指令或拆小需求再试。", + }.get(stop_reason, f"Agent 异常结束(stop_reason={stop_reason})") + await ws_manager.send(user_id, { + "type": "error", + "message": hint, + }) + + # Claude Code 格式:type=user,通常是工具执行结果回填 + elif chunk_type == "user": + message_obj = chunk.get("message", {}) + for content_block in message_obj.get("content", []): + if content_block.get("type") == "tool_result": + output = content_block.get("content", "") + # content 可能是字符串或 content block 列表 + if isinstance(output, list): + output = "".join( + (b.get("text", "") if isinstance(b, dict) else str(b)) + for b in output + ) + await ws_manager.send(user_id, { + "type": "tool_call_end", + "tool_use_id": content_block.get("tool_use_id", ""), + "is_error": bool(content_block.get("is_error", False)), + "output_preview": _truncate(output), + }) # 本轮回复正常完成 await ws_manager.send(user_id, {"type": "done"}) @@ -177,12 +254,38 @@ async def run_chat_stream(message: str): await ws_manager.send(user_id, {"type": "session_aborted"}) logger.info(f"[{user_id}] Session aborted") - elif msg_type == "clear_session": - # 兼容旧版:仅清除 session ID(不强制终止进程) + elif msg_type == "clear_session" or msg_type == "new_session": + # 脱离当前会话(不 kill 进程、保留历史),等价于"新建一个对话" agent_manager.clear_session(user_id) await ws_manager.send(user_id, {"type": "session_cleared"}) logger.info(f"[{user_id}] Session cleared") + elif msg_type == "switch_session": + sid = (raw.get("session_id") or "").strip() + if not sid: + await ws_manager.send(user_id, { + "type": "error", "message": "switch_session requires session_id", + }) + continue + # 正在跑的流需要先中止,避免和旧 session 的子进程纠缠 + if active_task and not active_task.done(): + active_task.cancel() + try: + await active_task + except (asyncio.CancelledError, Exception): + pass + ok_switch = agent_manager.switch_session(user_id, sid) + if ok_switch: + await ws_manager.send(user_id, { + "type": "session_switched", "session_id": sid, + }) + logger.info(f"[{user_id}] Switched to session {sid}") + else: + await ws_manager.send(user_id, { + "type": "error", + "message": f"Session {sid} not found for this user", + }) + except WebSocketDisconnect: # 前端断开连接:清理后台任务 if active_task and not active_task.done(): @@ -201,3 +304,59 @@ async def run_chat_stream(message: str): except Exception: pass ws_manager.disconnect(user_id) + + +# ─── Session history REST API ──────────────────────────────────────────────── + +class SessionInfo(BaseModel): + session_id: str + title: str + created_at: str + updated_at: str + message_count: int = 0 + + +class SessionListOut(BaseModel): + current: Optional[str] = None + history: List[SessionInfo] = [] + + +class RenameSessionIn(BaseModel): + title: str + + +@router.get( + "/sessions", + response_model=ApiResponse[SessionListOut], + summary="列出指定 user 的历史 Agent 会话", +) +def list_agent_sessions(user_id: str = "default"): + history = agent_manager.list_history(user_id) + return ok(SessionListOut( + current=agent_manager.get_session_id(user_id), + history=[SessionInfo(**h) for h in history], + )) + + +@router.delete( + "/sessions/{session_id}", + response_model=ApiResponse[dict], + summary="删除一条历史 Agent 会话", +) +def delete_agent_session(session_id: str, user_id: str = "default"): + removed = agent_manager.delete_session(user_id, session_id) + if not removed: + raise HTTPException(status_code=404, detail="Session not found") + return ok({"deleted": True, "session_id": session_id}) + + +@router.put( + "/sessions/{session_id}", + response_model=ApiResponse[dict], + summary="重命名一条历史 Agent 会话", +) +def rename_agent_session(session_id: str, body: RenameSessionIn, user_id: str = "default"): + renamed = agent_manager.rename_session(user_id, session_id, body.title.strip()) + if not renamed: + raise HTTPException(status_code=404, detail="Session not found") + return ok({"session_id": session_id, "title": body.title.strip()}) diff --git a/backend/app/api/v1/endpoints/json_schemas.py b/backend/app/api/v1/endpoints/json_schemas.py index 21bb4ce..0f9dd98 100644 --- a/backend/app/api/v1/endpoints/json_schemas.py +++ b/backend/app/api/v1/endpoints/json_schemas.py @@ -1,14 +1,17 @@ from fastapi import APIRouter, HTTPException, status from loguru import logger as log from app.schemas.json_schema import JsonSchemaCreate, JsonSchemaUpdate, JsonSchemaOut -from app.services.json_schema_manager import JsonSchemaManager +from app.core.container import container from app.api.v1.resp import ok from app.api.v1.envelope import ApiResponse from typing import List router = APIRouter(tags=["json_schemas"]) -manager = JsonSchemaManager() + + +def _manager(): + return container.json_schema_manager @router.post( @@ -19,7 +22,7 @@ def create_schema(schema_in: JsonSchemaCreate): """Create a new JSON schema.""" try: - schema_out = manager.create( + schema_out = _manager().create( name=schema_in.name, description=schema_in.description or "", schema=schema_in.schema, @@ -39,7 +42,7 @@ def create_schema(schema_in: JsonSchemaCreate): def list_schemas(): """List all JSON schemas.""" try: - schemas = manager.list_all() + schemas = _manager().list_all() return ok([JsonSchemaOut(**s) for s in schemas]) except Exception as e: log.error(f"Failed to list schemas: {e}") @@ -54,7 +57,7 @@ def list_schemas(): def get_schema(schema_id: str): """Get a schema by ID.""" try: - schema = manager.get(schema_id) + schema = _manager().get(schema_id) if not schema: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -77,7 +80,7 @@ def update_schema(schema_id: str, schema_update: JsonSchemaUpdate): """Update a schema by ID.""" try: update_data = schema_update.dict(exclude_unset=True) - schema = manager.update(schema_id, **update_data) + schema = _manager().update(schema_id, **update_data) if not schema: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -99,7 +102,7 @@ def update_schema(schema_id: str, schema_update: JsonSchemaUpdate): def delete_schema(schema_id: str): """Delete a schema by ID.""" try: - success = manager.delete(schema_id) + success = _manager().delete(schema_id) if not success: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, diff --git a/backend/app/api/v1/endpoints/operators.py b/backend/app/api/v1/endpoints/operators.py index dcfe26e..77016f4 100644 --- a/backend/app/api/v1/endpoints/operators.py +++ b/backend/app/api/v1/endpoints/operators.py @@ -1,41 +1,178 @@ # app/api/v1/endpoints/operators.py import json -from fastapi import APIRouter, HTTPException -from typing import List, Dict, Any +from fastapi import APIRouter, HTTPException, Query +from typing import List, Dict, Any from loguru import logger as log # --- 1. Import required Schemas and response wrappers --- from app.schemas.operator import ( - OperatorSchema, + OperatorSchema, OperatorDetailSchema, - OperatorDetailsResponseSchema + OperatorDetailsResponseSchema ) from app.api.v1.resp import ok from app.api.v1.envelope import ApiResponse +from app.api.v1.errors import ValidationBizError # --- 2. Import service layer --- from app.services.operator_registry import OPS_JSON_PATH +from app.services.operator_category_guide import ( + build_category_response, + valid_categories, +) from app.core.container import container router = APIRouter(tags=["operators"]) @router.get( - "/", + "/", response_model=ApiResponse[List[OperatorSchema]], - operation_id="list_operators", - summary="Return list of registered operators (simplified)" + operation_id="list_operators_all", + summary=( + "INTERNAL: Return ALL registered operators (full catalog). Used by " + "the web UI's operator sidebar. NOT exposed via MCP — agents must " + "use `list_operators` (the category-scoped variant) instead." + ), ) -def list_operators(lang: str = "zh"): - """Return all registered operators (simplified version).""" +def list_operators_all(lang: str = "zh", category: str = None): + """Return operators, optionally filtered by category. + + This is the legacy unrestricted route, kept for the frontend which + renders the full operator palette. It is **intentionally not + whitelisted in the MCP server** (see ``app.mcp_server`` include_operations) + so agents cannot accidentally pull the 145-op catalog into context. + """ try: op_list = container.operator_registry.get_op_list(lang=lang) + if category: + op_list = [ + op for op in op_list + if (op.get("type") or {}).get("level_1") == category + ] return ok(op_list) except Exception as e: log.error(f"Failed to get operator list: {e}") raise HTTPException(status_code=500, detail=str(e)) +@router.get( + "/by_category", + response_model=ApiResponse[List[OperatorSchema]], + operation_id="list_operators", + summary=( + "Return registered operators within a SINGLE category. `category` " + "is REQUIRED. This is the agent-facing variant exposed via MCP." + ), +) +def list_operators( + category: str = Query( + None, + description=( + "REQUIRED. One of the level_1 categories returned by " + "list_operator_categories (e.g. 'core_text', 'general_text', " + "'reasoning'). Calling this endpoint without a category — or " + "with 'all' / 'unknown' — is rejected with a structured error " + "so the agent can recover without wasting context on the full " + "~145-operator catalog." + ), + ), + lang: str = "zh", +): + """Return operators belonging to a single top-level category (agent path). + + Why a dedicated route: the frontend operator palette still needs the + full catalog, so the old ``/`` path (now ``list_operators_all``) stays + liberal and is hidden from MCP. This route is the one whitelisted in + ``app.mcp_server``; when the agent forgets ``category``, the response + itself carries the list of valid values plus a two-step recovery hint, + so the next tool call can succeed without a round-trip to the user. + """ + try: + op_list = container.operator_registry.get_op_list(lang=lang) + valids = valid_categories(op_list) + + if not category or category.strip().lower() in {"all", "*", "unknown", "any"}: + raise ValidationBizError( + message=( + "list_operators requires a `category` argument. Returning the full " + "~145-operator catalog would overflow the agent context window. " + "Call list_operator_categories first, read use_for/not_for for each, " + "choose exactly one category, then re-invoke list_operators with it." + ), + code=40010, + data={ + "error": "category_required", + "valid_categories": valids, + "next_action": ( + "Step 1: list_operator_categories → " + "Step 2: list_operators?category= → " + "Step 3: get_operator_detail_by_name(name=)" + ), + }, + ) + + if category not in valids: + # difflib hint for typos / hallucinated categories + import difflib + suggestions = difflib.get_close_matches(category, valids, n=3, cutoff=0.4) + raise ValidationBizError( + message=( + f"Category '{category}' does not exist. Pick exactly one of " + "valid_categories below. Do not invent category names." + ), + code=40011, + data={ + "error": "unknown_category", + "valid_categories": valids, + "did_you_mean": suggestions, + }, + ) + + filtered = [ + op for op in op_list + if (op.get("type") or {}).get("level_1") == category + ] + return ok(filtered) + except ValidationBizError: + raise + except HTTPException: + raise + except Exception as e: + log.error(f"Failed to get operator list: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get( + "/categories", + response_model=ApiResponse[Dict[str, Dict[str, Any]]], + operation_id="list_operator_categories", + summary=( + "Return mapping of operator top-level categories with count + " + "use_for / not_for guidance + example operator names. " + "ALWAYS call this BEFORE list_operators." + ), +) +def list_operator_categories(lang: str = "zh"): + """Cheap entry-point agents must call first. + + Each category in the response carries: + - ``count`` : number of operators in this category + - ``use_for`` : the data-engineering jobs this category is intended for + - ``not_for`` : common mis-mappings (anti-patterns) to avoid + - ``examples`` : 3-5 representative operator names + + The agent should read use_for/not_for, decide which single category + matches the user's task, and then call ``list_operators?category=``. + """ + try: + op_list = container.operator_registry.get_op_list(lang=lang) + return ok(build_category_response(op_list)) + except Exception as e: + log.error(f"Failed to get operator categories: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.get( "/details", response_model=ApiResponse[OperatorDetailsResponseSchema], diff --git a/backend/app/api/v1/endpoints/prompts.py b/backend/app/api/v1/endpoints/prompts.py index e4cbee1..5fe5684 100644 --- a/backend/app/api/v1/endpoints/prompts.py +++ b/backend/app/api/v1/endpoints/prompts.py @@ -1,13 +1,98 @@ - from fastapi import APIRouter, HTTPException -from app.schemas.prompt import GetPromptSchema, PromptSourceOut, OperatorPromptMapOut, PromptInfoMapOut, PromptInfoOut -# from app.services.prompt_registry import _PROMPT_REGISTRY +from typing import List +from app.schemas.prompt import ( + GetPromptSchema, PromptSourceOut, OperatorPromptMapOut, + PromptInfoMapOut, PromptInfoOut, + UserPromptTemplateCreate, UserPromptTemplateUpdate, UserPromptTemplateOut, + RenderPreviewIn, RenderPreviewOut, +) from app.core.container import container from app.api.v1.resp import ok from app.api.v1.envelope import ApiResponse router = APIRouter(tags=["prompts"]) + +# ── User-defined prompt templates (CRUD) ───────────────────────────────────── +# 注:这些路由必须**放在** "/{operator_name}" 这类通配路由之前, +# 否则 "/user" 会被当成算子名匹配到下面的只读接口。 + +@router.get( + "/user", + response_model=ApiResponse[List[UserPromptTemplateOut]], + summary="列出所有用户自定义 prompt 模板", +) +def list_user_prompts(): + items = container.user_prompt_registry.list_all() + return ok([UserPromptTemplateOut(**i) for i in items]) + + +@router.post( + "/user", + response_model=ApiResponse[UserPromptTemplateOut], + summary="创建用户自定义 prompt 模板", +) +def create_user_prompt(body: UserPromptTemplateCreate): + rec = container.user_prompt_registry.create( + name=body.name, + description=body.description or "", + template=body.template, + allowed_operators=body.allowed_operators, + example_variables=body.example_variables, + ) + return ok(UserPromptTemplateOut(**rec)) + + +@router.get( + "/user/{tpl_id}", + response_model=ApiResponse[UserPromptTemplateOut], + summary="根据 ID 获取用户自定义 prompt 模板", +) +def get_user_prompt(tpl_id: str): + rec = container.user_prompt_registry.get(tpl_id) + if not rec: + raise HTTPException(status_code=404, detail=f"Template {tpl_id} not found") + return ok(UserPromptTemplateOut(**rec)) + + +@router.put( + "/user/{tpl_id}", + response_model=ApiResponse[UserPromptTemplateOut], + summary="更新用户自定义 prompt 模板", +) +def update_user_prompt(tpl_id: str, body: UserPromptTemplateUpdate): + updated = container.user_prompt_registry.update( + tpl_id, **body.model_dump(exclude_unset=True) + ) + if not updated: + raise HTTPException(status_code=404, detail=f"Template {tpl_id} not found") + return ok(UserPromptTemplateOut(**updated)) + + +@router.delete( + "/user/{tpl_id}", + response_model=ApiResponse[dict], + summary="删除用户自定义 prompt 模板", +) +def delete_user_prompt(tpl_id: str): + removed = container.user_prompt_registry.delete(tpl_id) + if not removed: + raise HTTPException(status_code=404, detail=f"Template {tpl_id} not found") + return ok({"deleted": True, "id": tpl_id}) + + +@router.post( + "/user/preview", + response_model=ApiResponse[RenderPreviewOut], + summary="预览 f-string 模板的渲染结果(不落盘)", +) +def preview_user_prompt(body: RenderPreviewIn): + result = container.user_prompt_registry.preview(body.template, body.variables) + return ok(RenderPreviewOut(**result)) + + +# ── Built-in prompts (read-only browsing) ──────────────────────────────────── + @router.get( "/operator-mapping", response_model=ApiResponse[OperatorPromptMapOut], @@ -30,8 +115,11 @@ def get_prompt_info(): response_model=ApiResponse[PromptInfoOut], summary="根据 Prompt 名称获取 Prompt 信息" ) -def get_prompt_info(prompt_name: str): - return ok(container.prompt_registry.list_prompt_info().prompts[prompt_name]) +def get_prompt_info_by_name(prompt_name: str): + info_map = container.prompt_registry.list_prompt_info().prompts + if prompt_name not in info_map: + raise HTTPException(404, f"Prompt '{prompt_name}' not found") + return ok(info_map[prompt_name]) @router.get( "/{operator_name}", diff --git a/backend/app/core/container.py b/backend/app/core/container.py index 586143d..19d4c62 100644 --- a/backend/app/core/container.py +++ b/backend/app/core/container.py @@ -14,6 +14,8 @@ def __init__(self): self.text2sql_database_registry = None self.text2sql_database_manager_registry = None self.dataset_visualize_service = None + self.json_schema_manager = None + self.user_prompt_registry = None def init(self): from app.services.dataset_registry import DatasetRegistry, VisualizeDatasetService @@ -23,6 +25,8 @@ def init(self): from app.services.serving_registry import ServingRegistry from app.services.task_registry import TaskRegistry from app.services.text2sql_database_registry import Text2SQLDatabaseRegistry, Text2SQLDatabaseManagerRegistry + from app.services.json_schema_manager import JsonSchemaManager + from app.services.user_prompt_registry import UserPromptRegistry from .config import settings import importlib @@ -47,6 +51,8 @@ def init(self): self.pipeline_registry = PipelineRegistry() self.text2sql_database_registry = Text2SQLDatabaseRegistry() self.text2sql_database_manager_registry = Text2SQLDatabaseManagerRegistry() + self.json_schema_manager = JsonSchemaManager() + self.user_prompt_registry = UserPromptRegistry() diff --git a/backend/app/mcp_server.py b/backend/app/mcp_server.py index 1c3bd34..ba35322 100644 --- a/backend/app/mcp_server.py +++ b/backend/app/mcp_server.py @@ -28,7 +28,9 @@ def create_mcp_server(app: FastAPI) -> FastApiMCP: description="Tools for managing DataFlow pipelines, tasks, operators and datasets", # 白名单:只暴露安全的只读/创建操作 + 自定义同步工具 include_operations=[ - "list_operators", # GET /api/v1/operators/ + "list_operator_categories", # GET /api/v1/operators/categories ← 便宜入口,带 use_for/not_for 指引;agent 必须先调 + "list_operators", # GET /api/v1/operators/by_category?category=xxx category 必填,缺省/非法会 40010/40011 + "get_operator_detail_by_name", # GET /api/v1/operators/details/{name} 单个算子详情 "list_pipelines", # GET /api/v1/pipelines/ "create_pipeline", # POST /api/v1/pipelines/ "update_pipeline", # PUT /api/v1/pipelines/{pipeline_id} @@ -38,6 +40,7 @@ def create_mcp_server(app: FastAPI) -> FastApiMCP: "get_execution_status", # GET /api/v1/tasks/execution/{task_id}/status "get_task_result", # GET /api/v1/tasks/execution/{task_id}/result "list_datasets", # GET /api/v1/datasets/ + "register_dataset", # POST /api/v1/datasets/ "list_serving", # GET /api/v1/serving/ "render_pipeline_in_editor", # POST /api/v1/agent/render (自定义) ], @@ -80,7 +83,7 @@ class RenderResponse(BaseModel): ) async def render_pipeline_in_editor(req: RenderRequest) -> RenderResponse: from app.core.container import container - from app.api.v1.endpoints.agent import ws_manager + from app.api.v1.endpoints.agent import ws_manager, agent_manager pipeline = container.pipeline_registry.get_pipeline(req.pipeline_id) if not pipeline: @@ -90,16 +93,29 @@ async def render_pipeline_in_editor(req: RenderRequest) -> RenderResponse: ) nodes, edges = _config_to_vue_flow(pipeline.get("config", {})) - await ws_manager.broadcast({ + payload = { "type": "sync_pipeline", - "pipeline": pipeline, + "pipeline": pipeline, # ← 关键:前端 renderPipeline() 用这个(完整 config) + "pipeline_id": req.pipeline_id, + # 下面两个是给旧消费者的兼容字段,真正的渲染走 pipeline.config "nodes": nodes, "edges": edges, - }) - logger.info( - f"Pipeline {req.pipeline_id} synced to editor " - f"({len(nodes)} nodes, {len(edges)} edges)" - ) + } + # 优先向"最近发起对话的用户"定向推送,避免串扰到其他浏览器窗口。 + # 仅在没有已知活跃用户时回退为广播。 + target_uid = agent_manager.get_last_active_user_id() + if target_uid: + await ws_manager.send(target_uid, payload) + logger.info( + f"Pipeline {req.pipeline_id} synced to editor for user={target_uid} " + f"({len(nodes)} nodes, {len(edges)} edges)" + ) + else: + await ws_manager.broadcast(payload) + logger.info( + f"Pipeline {req.pipeline_id} broadcast to all editors " + f"({len(nodes)} nodes, {len(edges)} edges)" + ) return RenderResponse( status="ok", message=f"Pipeline '{pipeline.get('name', req.pipeline_id)}' 已同步到编辑器", diff --git a/backend/app/schemas/prompt.py b/backend/app/schemas/prompt.py index 33b817a..50c57e5 100644 --- a/backend/app/schemas/prompt.py +++ b/backend/app/schemas/prompt.py @@ -32,4 +32,49 @@ class PromptInfoOut(BaseModel): parameter: PromptParameterGroupsSchema class PromptInfoMapOut(BaseModel): - prompts: Dict[str, PromptInfoOut] \ No newline at end of file + prompts: Dict[str, PromptInfoOut] + + +# ── User-defined prompt templates ──────────────────────────────────────────── +# 用户自定义的 prompt 模板(本地保存)。底层在运行时实例化为 FormatStrPrompt +# 或自定义 PromptABC 子类,供算子调用。 + +class UserPromptTemplateBase(BaseModel): + name: str + description: Optional[str] = "" + # f-string 风格模板,例如 "Given {question}, produce an answer." + # 占位符名必须与运行时传入的列名一致。 + template: str + # 允许使用此模板的算子名称(可为空,代表对所有算子都可选)。 + allowed_operators: List[str] = Field(default_factory=list) + # 样例渲染时的占位符 → 示例值映射 + example_variables: Dict[str, Any] = Field(default_factory=dict) + + +class UserPromptTemplateCreate(UserPromptTemplateBase): + pass + + +class UserPromptTemplateUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + template: Optional[str] = None + allowed_operators: Optional[List[str]] = None + example_variables: Optional[Dict[str, Any]] = None + + +class UserPromptTemplateOut(UserPromptTemplateBase): + id: str + created_at: str + updated_at: str + + +class RenderPreviewIn(BaseModel): + template: str + variables: Dict[str, Any] = Field(default_factory=dict) + + +class RenderPreviewOut(BaseModel): + rendered: str + placeholders: List[str] + missing: List[str] \ No newline at end of file diff --git a/backend/app/services/agent_session.py b/backend/app/services/agent_session.py index 7a8d2c1..9652082 100644 --- a/backend/app/services/agent_session.py +++ b/backend/app/services/agent_session.py @@ -1,14 +1,39 @@ """ AgentSessionManager:管理 Claude Code CLI 子进程和会话 ID。 -每个 user_id 对应一个 claude session_id,实现多轮对话上下文保持。 +每个 user_id 对应一个"当前活跃"的 claude session_id,并保留一份历史会话列表 +以便前端切换 / 恢复旧对话。 """ import asyncio import json +import os +import shutil +import threading +from datetime import datetime from pathlib import Path from typing import AsyncGenerator - -# Claude Code CLI 可执行文件(需在 PATH 中,或使用绝对路径) -CLAUDE_CLI = "claude" +from app.core.config import settings + +# Claude Code CLI 可执行文件。优先级: +# 1) 环境变量 DATAFLOW_CLAUDE_CLI(显式覆盖) +# 2) 机器上可用的 claude-internal(腾讯内部封装,接口与官方一致) +# 3) 官方 claude +# 需要切回官方:`export DATAFLOW_CLAUDE_CLI=claude` 后重启后端即可。 +def _detect_claude_cli() -> str: + override = os.environ.get("DATAFLOW_CLAUDE_CLI", "").strip() + if override: + return override + if shutil.which("claude-internal"): + return "claude-internal" + return "claude" + +CLAUDE_CLI = _detect_claude_cli() + +# 启动时打印一次,方便确认使用的是 internal 还是官方 +try: + from app.core.logger_setup import get_logger + get_logger(__name__).info(f"AgentSessionManager will use CLI: {CLAUDE_CLI}") +except Exception: + pass # DataFlow-WebUI 根目录(CLI 在此目录运行,自动读取 .mcp.json 和 .claude/skills/) WEBUI_ROOT = Path(__file__).resolve().parent.parent.parent.parent @@ -16,6 +41,10 @@ # MCP 配置文件路径 MCP_CONFIG = WEBUI_ROOT / ".mcp.json" +# 历史会话持久化路径:backend//agent_sessions.json +BACKEND_DIR = Path(__file__).parent.parent.parent +SESSIONS_FILE = BACKEND_DIR / settings.RESOURCE_DIR / "agent_sessions.json" + # System prompt:只定义角色边界,Skills 内容由 CLI 自动从 .claude/skills/ 加载 SYSTEM_PROMPT = """你是 DataFlow WebUI 的内置助手。你只处理以下范围内的请求: - 帮用户设计和构建 DataFlow pipeline @@ -36,15 +65,31 @@ ## 行为规范(必须严格遵守) ### 1. 算子信息:必须通过 MCP 工具获取,禁止用 Read 翻文件 -- **查询算子列表时,必须调用 `mcp__dataflow__list_operators`**,绝对不要用 `Read` 去翻项目目录或源码 - **禁止**用 `Read` 浏览 `/dataflow/`、`/operators/`、`/examples/` 等目录来寻找算子信息 -- MCP 工具已提供所有必要的算子和 pipeline 信息,优先使用 +- **禁止**一次性调用 `mcp__dataflow__list_operators`(无参数),否则返回 ~90KB 会把上下文塞满 +- **正确查询顺序**: + 1. 先调 `mcp__dataflow__list_operator_categories` 看有哪些类别(如 `reasoning: 13`, `general_text: 26`, `code: 19`, ...) + 2. 再调 `mcp__dataflow__list_operators` 带 `category=<具体类别>` 参数,只拉你要的那一类(通常 <10KB) + 3. 需要某个算子的完整参数签名时,用 `mcp__dataflow__get_operator_detail_by_name` 按名查单个 +- **一轮 reasoning 里 `list_operators` 不超过 2 次**。超过就停下来定下候选集,不要再广撒网。 +- MCP 工具的响应如果被 Claude Code 自动落盘(文件名含 `tool-results/mcp-*`), + **立即改用 `Read` 时带 `offset=0, limit=80` 分页读**,而不是反复重试无 limit 调用。 ### 2. 文件操作:主动执行,不要请求授权 - 当用户提供了文件路径时,**直接用 `Read` 工具读取**,不要询问授权 - 如果用户没有提供样本文件路径但描述了字段结构,**直接用 `Write` 创建示例文件**到 `./data/` 目录,然后继续任务 - 不要说"我需要你的授权"、"请提供文件路径"这类话——直接动手 +### 2.1 数据集注册:写完 jsonl 必须注册 +- **只要你用 `Write` 新建了一个 `.jsonl` 数据文件**,或用户手动提到了一个 jsonl 文件但它不在 `list_datasets` 返回的列表里, + **你必须紧接着调用 `mcp__dataflow__register_dataset`** 来把它注册到后端,否则 pipeline 同步到编辑器时会报 "Input dataset not found"。 +- 注册时字段要求: + - `name`: 一个短小的人类可读名字(如 `math_qa_demo`) + - `root`: jsonl 文件所在目录(**不是文件本身**),例如 `./data/` + - `pipeline`: 你打算把它用在哪个 pipeline 的名字,没有就填一个占位如 `"auto"` 即可 + - `meta`: 可留空 `{}` +- 调用 `register_dataset` 成功后,返回体里有 `id` 字段,**在后续 `create_pipeline` 时必须用这个真实的 `id`** 作为 `config.input_dataset.id`,绝对不要自己编 id。 + ### 2. 禁止自行执行 Pipeline - **严禁**主动调用 `execute_pipeline` 或 `execute_pipeline_async` 工具 - 执行 pipeline 是用户的决定,你的职责是帮用户设计好 pipeline,然后引导用户自己点击界面上的「运行」按钮 @@ -63,6 +108,14 @@ 当你通过工具创建或更新了 pipeline 后,**必须立即**调用 `render_pipeline_in_editor` 工具, 将 pipeline 可视化同步到编辑器,让用户能直观看到节点图。 +### 4.1 一次请求只产出一个 pipeline +- **同一轮对话中,最多只调用一次 `create_pipeline`**。不要在收到用户一个需求后, + 反复"写一版 → 不满意 → 再写一版"地调用多次。 +- 如果觉得第一版不够好,**继续改进同一条 pipeline 用 `update_pipeline`**,而不是创建新的。 +- 用户如果明确说"重新做一个"、"换一种思路",才允许再创建一次。 +- 每次 `create_pipeline` 或 `update_pipeline` 之后**立刻** `render_pipeline_in_editor`, + 等用户反馈再决定是否 `update_pipeline` 继续优化。不要在用户发声之前连续创建多个。 + ### 5. 操作前告知用户 每次调用工具前,先用一句话告诉用户你要做什么,保持透明。 例如:"我来查询一下现有的算子列表……" 或 "我帮你把这个 pipeline 同步到编辑器……" @@ -71,24 +124,124 @@ class AgentSessionManager: """ - 管理 Claude Code CLI 子进程和 session ID 的映射。 - - 每次 chat_stream 调用都会: - 1. 检查 user_id 是否有已有的 session_id - 2. 如有,通过 --resume 恢复上下文;如无,开启新会话 - 3. 从 CLI 输出的 JSON 流中提取 session_id 并保存 - - abort_session 调用会: - 1. 强制 kill 正在运行的 claude 子进程 - 2. 清除 session_id 映射,下次对话重新开始 + 管理 Claude Code CLI 子进程、当前活跃 session_id,以及每个用户的历史会话列表。 + + 运行时内存: + _current: user_id -> 当前活跃的 claude session_id + _processes: user_id -> 正在运行的 asyncio subprocess + _last_active_user_id: 最近一次发起 chat_stream 的 user_id(供 MCP render 定向推送) + + 磁盘持久化: + SESSIONS_FILE = backend/data/agent_sessions.json,结构: + { + "": { + "current": "", + "history": [ + { "session_id": "...", "title": "...", + "created_at": "...", "updated_at": "...", + "message_count": N } + ] + } + } + 新会话在首条消息到来时写入;每次 chat 结束后更新 updated_at 与 message_count。 """ def __init__(self): - # user_id → claude session_id 的映射 - self._sessions: dict[str, str] = {} - # user_id → 当前活跃的 asyncio.subprocess.Process + self._current: dict[str, str] = {} self._processes: dict[str, asyncio.subprocess.Process] = {} + self._last_active_user_id: str | None = None + # 进程间无共享(单进程 uvicorn),但 chat_stream 与 REST 可能并发写磁盘 + self._file_lock = threading.Lock() + self._data: dict = self._load() + # 把落盘里的 current 恢复到内存 + for uid, rec in self._data.items(): + cur = rec.get("current") + if cur: + self._current[uid] = cur + + # ── 磁盘 I/O ───────────────────────────────────────────── + def _ensure_storage(self): + SESSIONS_FILE.parent.mkdir(parents=True, exist_ok=True) + if not SESSIONS_FILE.exists(): + SESSIONS_FILE.write_text("{}") + + def _load(self) -> dict: + self._ensure_storage() + try: + content = SESSIONS_FILE.read_text() + return json.loads(content) if content.strip() else {} + except Exception: + return {} + + def _save(self): + with self._file_lock: + try: + SESSIONS_FILE.write_text( + json.dumps(self._data, ensure_ascii=False, indent=2) + ) + except Exception: + pass + def _user_record(self, user_id: str) -> dict: + rec = self._data.get(user_id) + if not rec: + rec = {"current": None, "history": []} + self._data[user_id] = rec + return rec + + def _find_history_entry(self, user_id: str, session_id: str) -> dict | None: + rec = self._data.get(user_id) or {} + for item in rec.get("history", []): + if item.get("session_id") == session_id: + return item + return None + + # ── 历史会话 API ───────────────────────────────────────── + def list_history(self, user_id: str) -> list[dict]: + rec = self._data.get(user_id) or {} + # 按 updated_at 倒序 + items = list(rec.get("history", [])) + items.sort(key=lambda x: x.get("updated_at", ""), reverse=True) + return items + + def new_session(self, user_id: str) -> None: + """把当前活跃 session 置空;下一轮对话会创建新的 claude session_id。""" + self._current.pop(user_id, None) + rec = self._user_record(user_id) + rec["current"] = None + self._save() + + def switch_session(self, user_id: str, session_id: str) -> bool: + """切换到一条历史会话,后续 chat_stream 将用 --resume 继续它。""" + if not self._find_history_entry(user_id, session_id): + return False + self._current[user_id] = session_id + rec = self._user_record(user_id) + rec["current"] = session_id + self._save() + return True + + def delete_session(self, user_id: str, session_id: str) -> bool: + rec = self._data.get(user_id) + if not rec: + return False + before = len(rec.get("history", [])) + rec["history"] = [h for h in rec.get("history", []) if h.get("session_id") != session_id] + if rec.get("current") == session_id: + rec["current"] = None + self._current.pop(user_id, None) + self._save() + return len(rec["history"]) < before + + def rename_session(self, user_id: str, session_id: str, title: str) -> bool: + entry = self._find_history_entry(user_id, session_id) + if not entry: + return False + entry["title"] = title + self._save() + return True + + # ── Chat 主流程 ───────────────────────────────────────── async def chat_stream( self, user_id: str, @@ -96,15 +249,10 @@ async def chat_stream( ) -> AsyncGenerator[dict, None]: """ 调用 Claude Code CLI,以流式方式返回 Agent 输出的每一个 JSON chunk。 - - Args: - user_id: 前端用户标识符(用于恢复会话) - message: 用户输入的消息文本 - - Yields: - 每个 CLI 输出的 JSON 对象(dict) """ - session_id = self._sessions.get(user_id) + self._last_active_user_id = user_id + session_id = self._current.get(user_id) + is_new_session = session_id is None cmd = [ CLAUDE_CLI, @@ -113,26 +261,21 @@ async def chat_stream( "--verbose", "--mcp-config", str(MCP_CONFIG), "--append-system-prompt", SYSTEM_PROMPT, - "--allowedTools", "mcp__dataflow__*,Read,Write,Edit", # dataflow MCP 工具 + 文件读写(不含 Bash) - "--permission-mode", "dontAsk", # 禁用交互式权限确认 + "--allowedTools", "mcp__dataflow__*,Read,Write,Edit", + "--permission-mode", "dontAsk", ] - if session_id: cmd += ["--resume", session_id] - # limit 设为 10 MB,防止大型 MCP 响应(如 list_operators ~112KB)触发 - # "Separator is found, but chunk is longer than limit" 错误 large_limit = 10 * 1024 * 1024 # 10 MB process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - cwd=str(WEBUI_ROOT), # 在 WebUI 根目录运行,自动读取 .mcp.json 和 .claude/skills/ + cwd=str(WEBUI_ROOT), limit=large_limit, ) - - # 注册进程,以便 abort_session 可以强制 kill self._processes[user_id] = process try: @@ -145,28 +288,46 @@ async def chat_stream( except json.JSONDecodeError: continue - # 提取并保存 session_id(首条消息中包含) + # 首条 chunk 含 session_id,用于登记/更新历史 sid = chunk.get("session_id") - if sid and user_id not in self._sessions: - self._sessions[user_id] = sid + if sid and self._current.get(user_id) != sid: + self._current[user_id] = sid + rec = self._user_record(user_id) + rec["current"] = sid + if not self._find_history_entry(user_id, sid): + now = datetime.utcnow().isoformat() + title = message.strip().splitlines()[0][:40] if message else "新会话" + rec["history"].append({ + "session_id": sid, + "title": title, + "created_at": now, + "updated_at": now, + "message_count": 0, + }) + self._save() yield chunk finally: - # 无论正常结束、异常还是被取消,都清理进程引用并确保进程退出 + # 本轮结束:更新历史 updated_at / message_count + sid = self._current.get(user_id) + if sid: + entry = self._find_history_entry(user_id, sid) + if entry: + entry["updated_at"] = datetime.utcnow().isoformat() + entry["message_count"] = int(entry.get("message_count", 0)) + 1 + self._save() + self._processes.pop(user_id, None) if process.returncode is None: - # 进程仍在运行(可能是任务被取消),强制终止 try: process.kill() except Exception: pass + # ── 终止 / 清除 ───────────────────────────────────────── def abort_session(self, user_id: str): - """ - 强制终止用户的 claude 子进程并清除会话。 - 下次对话将重新开始(丢失上下文)。 - """ + """kill 当前子进程,并把当前活跃 session 置空(历史保留)。""" process = self._processes.get(user_id) if process and process.returncode is None: try: @@ -174,23 +335,28 @@ def abort_session(self, user_id: str): except Exception: pass self._processes.pop(user_id, None) - self._sessions.pop(user_id, None) + self._current.pop(user_id, None) + rec = self._user_record(user_id) + rec["current"] = None + self._save() def clear_session(self, user_id: str): - """清除指定用户的 session ID,下次对话将重新开始(丢失上下文)。 - 注意:不终止正在运行的子进程,如需同时终止进程请使用 abort_session。 - """ - self._sessions.pop(user_id, None) + """仅脱离当前活跃会话(不 kill 进程、不删历史)。""" + self._current.pop(user_id, None) + rec = self._user_record(user_id) + rec["current"] = None + self._save() def get_session_id(self, user_id: str) -> str | None: - """获取指定用户的当前 session_id""" - return self._sessions.get(user_id) + return self._current.get(user_id) def list_sessions(self) -> dict: - """列出所有活跃 session(用于调试)""" - return dict(self._sessions) + """当前活跃 session 快照(调试用)。""" + return dict(self._current) def has_active_process(self, user_id: str) -> bool: - """检查用户是否有正在运行的子进程""" process = self._processes.get(user_id) return process is not None and process.returncode is None + + def get_last_active_user_id(self) -> str | None: + return self._last_active_user_id diff --git a/backend/app/services/dataflow_engine.py b/backend/app/services/dataflow_engine.py index 1b3fdf2..2407e37 100644 --- a/backend/app/services/dataflow_engine.py +++ b/backend/app/services/dataflow_engine.py @@ -480,6 +480,10 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): run_sig = inspect.signature(getattr(operator_cls, "run", lambda: None)) # 处理 init 参数 + init_accepts_var_kw = any( + p.kind == inspect.Parameter.VAR_KEYWORD + for p in init_sig.parameters.values() + ) for param in op.get("params", {}).get("init", []): param_name = param.get("name") param_value = param.get("value") @@ -487,7 +491,11 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): try: if param_name == "llm_serving": + # Agents sometimes pass `{"id": ""}` + # instead of the bare string id. Normalize. serving_id = param_value + if isinstance(serving_id, dict): + serving_id = serving_id.get("id") or serving_id.get("serving_id") logger.info(f"Operator {op_name}: initializing serving {serving_id}") add_log("init", f"[{datetime.now().isoformat()}] - Initializing LLM serving: {serving_id}", op_key) if serving_id not in serving_instance_map: @@ -496,6 +504,8 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): elif param_name == "embedding_serving": serving_id = param_value + if isinstance(serving_id, dict): + serving_id = serving_id.get("id") or serving_id.get("serving_id") logger.info(f"Operator {op_name}: initializing embedding serving {serving_id}") add_log("init", f"[{datetime.now().isoformat()}] - Initializing embedding serving: {serving_id}", op_key) if serving_id not in embedding_serving_instance_map: @@ -557,7 +567,25 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): else: ann = init_sig.parameters.get(param_name).annotation if param_name in init_sig.parameters else inspect.Parameter.empty param_value = coerce_param_value(param_value, annotation=ann, default_value=default_value) - + + # Skip params the operator's __init__ does not accept + # (and which would otherwise cause a TypeError below). + # Only the well-known special-cased names above are kept + # unconditionally because they are dereferenced before + # being passed to operator_cls(**init_params). + if (param_name not in init_sig.parameters + and not init_accepts_var_kw + and param_name not in ( + "llm_serving", "embedding_serving", + "database_manager", "prompt_template", + "process_fn", "filter_rules", + )): + logger.warning( + f"Operator {op_name}.__init__ does not accept '{param_name}'" + f"; dropping it to avoid TypeError." + ) + continue + init_params[param_name] = param_value except DataFlowEngineError: @@ -575,6 +603,14 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): ) # 处理 run 参数 + # Drop DYNAMIC params (e.g. agent-set system_prompt) when the + # operator's run() does NOT accept **kwargs — otherwise the + # final operator.run(**run_params) call raises + # `TypeError: run() got an unexpected keyword argument ...`. + run_accepts_var_kw = any( + p.kind == inspect.Parameter.VAR_KEYWORD + for p in run_sig.parameters.values() + ) for param in op.get("params", {}).get("run", []): param_name = param.get("name") param_value = param.get("value") @@ -582,6 +618,15 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): param_kind = param.get("kind", "") if param_kind == "VAR_KEYWORD" and param_value is None: continue + # Skip DYNAMIC / unknown params the operator's run() can't accept. + if (param_name not in run_sig.parameters) and not run_accepts_var_kw: + logger.warning( + f"Operator {op_name}.run() does not accept '{param_name}'" + f" (kind={param_kind!r}); dropping it from run_params" + f" to avoid TypeError. Set this param on a prompt_template" + f" or operator init arg instead." + ) + continue ann = run_sig.parameters.get(param_name).annotation if param_name in run_sig.parameters else inspect.Parameter.empty param_value = coerce_param_value(param_value, annotation=ann, default_value=default_value) run_params[param_name] = param_value diff --git a/backend/app/services/operator_category_guide.py b/backend/app/services/operator_category_guide.py new file mode 100644 index 0000000..adc6677 --- /dev/null +++ b/backend/app/services/operator_category_guide.py @@ -0,0 +1,224 @@ +"""Authoritative category guidance returned by /api/v1/operators/categories. + +Each entry provides the agent with: +- ``use_for`` : the data-engineering jobs this category is genuinely intended for +- ``not_for`` : common mis-mappings observed in agent traces (anti-patterns) +- ``examples`` : 3-5 representative operator names so the agent has anchors when + it issues the follow-up ``list_operators?category=...`` call + +The dictionary keys MUST stay in sync with the ``type.level_1`` values of the +operator registry. The endpoint cross-checks against the live registry and +backfills any unknown category with a stub so we never crash if a new category +is introduced upstream. +""" + +CATEGORY_GUIDE: dict[str, dict] = { + "core_text": { + "use_for": ( + "通用文本→QA / 多跳 QA / 摘要 / 长文本切块生成;prompt-driven 的 " + "filter / refine / evaluator (PromptedFilter/PromptedRefiner/" + "PromptedEvaluator);表格/字段重写用 PandasOperator。" + "首选这一类做『从原始文本生成 SFT 训练数据』。" + ), + "not_for": ( + "**不要**用 SFTGeneratorSeed —— 那个属于 text_sft,是从已有 SFT " + "种子扩展,而不是从原始文本生成 QA。需要『基础 QA 生成』时直接选 " + "Text2QAGenerator 或 Text2MultiHopQAGenerator。" + "也不要用它做 PT 语料质量过滤(去 text_pt)或代码相关任务(去 code)。" + ), + "examples": [ + "Text2QAGenerator", "Text2MultiHopQAGenerator", + "ChunkedPromptedGenerator", "PromptedFilter", + "PromptedRefiner", "PandasOperator", + ], + }, + "general_text": { + "use_for": ( + "纯文本表面层清洗与统计:基于规则的 Filter(长度、字符比例、" + "html 实体、URL、PII),N-gram / Hash 去重,BLEU/CIDEr 等无 LLM " + "评测。**所有不需要 LLM 调用的『通用文本质量过滤』都来这里找**。" + ), + "not_for": ( + "需要语义理解的过滤(如『基于 LLM 判断是否为优质问题』)应该用 " + "core_text 的 PromptedFilter;代码质量过滤去 code;" + "PT 语料 perplexity/edu-classifier 过滤去 text_pt。" + ), + "examples": [ + "CharNumberFilter", "NgramFilter", "HashDeduplicateFilter", + "HtmlEntityRefiner", "LLMLanguageFilter", + "ContentNullFilter", "BleuSampleEvaluator", + ], + }, + "text_sft": { + "use_for": ( + "已经有 SFT 数据(instruction / response 列)后做的事:" + "用 Condor / Treeinstruct / Deita / Alpagasus / Superfiltering / " + "RM / Instag 等方法对 SFT 样本打分、过滤、扩写。" + ), + "not_for": ( + "**不要**在『还没生成 QA / instruction』的阶段用这一类。" + "如果输入只是原始文档,应该先去 core_text(Text2QAGenerator)" + "或 reasoning(ReasoningQuestionGenerator)生成训练样本," + "再回来走 text_sft 做质量评估。SFTGeneratorSeed 也不是『从文本生成 QA』," + "它需要既有 SFT 种子。" + ), + "examples": [ + "CondorGenerator", "CondorRefiner", "DeitaQualityFilter", + "AlpagasusFilter", "SuperfilteringFilter", "TreeinstructFilter", + ], + }, + "text_pt": { + "use_for": ( + "**预训练(PT)语料**质量过滤:perplexity 过滤、FineWeb-Edu 分类器、" + "DebertaV3 质量、CCNet 去重。Phi4QAGenerator 是 PT 风格的 QA 合成。" + ), + "not_for": ( + "SFT/QA 数据评估走 text_sft;通用规则去重走 general_text;" + "推理/数学数据去 reasoning。" + ), + "examples": [ + "PerplexityFilter", "FineWebEduFilter", "DebertaV3Filter", + "CCNetDeduplicateFilter", "Phi4QAGenerator", + ], + }, + "reasoning": { + "use_for": ( + "数学/逻辑推理 CoT 数据:从问题生成 CoT 答案、答案抽取、" + "题目难度/分类评估、答案 N-gram/format/judge 过滤、" + "PT 格式转换。**只有当任务明确是 math / step-by-step CoT 时才选这里**。" + ), + "not_for": ( + "通用 QA 生成不在这里,去 core_text;代码推理不在这里,去 code;" + "纯文本清洗不在这里,去 general_text。" + ), + "examples": [ + "ReasoningQuestionGenerator", "ReasoningAnswerGenerator", + "ReasoningQuestionFilter", "ReasoningAnswerFormatterFilter", + "ReasoningQuestionDifficultySampleEvaluator", + ], + }, + "code": { + "use_for": ( + "代码数据:从代码生成指令、从指令生成代码、代码质量打分、" + "auto-generated 代码检测、长度/编码异常过滤、sandbox 评估。" + ), + "not_for": ( + "自然语言 QA / refine 不在这里;代码相关 SFT 评估也不能直接套通用 " + "DeitaQuality(去 text_sft 但参数要切换 instruction/response)。" + ), + "examples": [ + "CodeInstructionGenerator", "CodeCodeToInstructionGenerator", + "CodeQualityScoreFilter", "CodeAutoGeneratedFilter", + "CodeSandboxSampleEvaluator", + ], + }, + "conversations": { + "use_for": ( + "多轮对话/智能体场景的合成与评估:场景抽取/扩写、原子任务/" + "顺序任务生成、function-calling 对话生成与评估、组合任务过滤。" + ), + "not_for": ( + "单轮 QA 不属于这里(去 core_text);单条 instruction 评估不属于这里" + "(去 text_sft)。" + ), + "examples": [ + "MultiTurnConversationGenerator", "ConsistentChatGenerator", + "ScenarioExtractGenerator", "FunctionGenerator", + "FuncCallConversationSampleEvaluator", + ], + }, + "knowledge_cleaning": { + "use_for": ( + "**文档/URL 摄取链路**:File/URL→Markdown 转换(API/Flash/Local 三种)," + "KBC 切块、清洗、批量 QA 生成。把『PDF/HTML/网页 → 训练数据』的" + "前几步全部放在这里。" + ), + "not_for": ( + "如果输入已经是干净纯文本,跳过这一类,直接去 core_text。" + "API-only 部署中 KBCChunkGenerator 等可能因依赖缺失不可用," + "此时回退到 core_text 的 ChunkedPromptedGenerator。" + ), + "examples": [ + "FileOrURLToMarkdownConverterAPI", "FileOrURLToMarkdownConverterFlash", + "KBCChunkGenerator", "KBCMultiHopQAGeneratorBatch", + "MathBookQuestionExtract", + ], + }, + "agentic_rag": { + "use_for": ( + "Agentic / 多跳 RAG 训练数据合成:原子任务、宽度/深度 QA 生成与 F1 评估。" + ), + "not_for": "普通单跳 QA 走 core_text;多轮对话走 conversations。", + "examples": [ + "AgenticRAGAtomicTaskGenerator", "AgenticRAGDepthQAGenerator", + "AgenticRAGWidthQAGenerator", "AgenticRAGQAF1SampleEvaluator", + ], + }, + "text2sql": { + "use_for": "Text2SQL 数据生成与评估:可执行性过滤、组件分类、CoT 投票生成。", + "not_for": "通用 QA / 推理任务不要走这里。", + "examples": [ + "Text2SQLCoTVotingGenerator", "SQLExecutabilityFilter", + "SQLExecutionFilter", "SQLComponentClassifier", + ], + }, + "pdf2vqa": { + "use_for": "PDF→视觉问答数据:MinerU 输入构造、PDF/QA 合并、VQA 格式化、LLM 输出解析。", + "not_for": "纯文本 QA 走 core_text;普通 OCR/markdown 走 knowledge_cleaning。", + "examples": [ + "MinerU2LLMInputOperator", "PDF_Merger", "QA_Merger", + "VQAFormatter", "LLMOutputParser", + ], + }, + "core_vision": { + "use_for": "通用视觉 QA 合成(PromptedVQAGenerator)。", + "not_for": "PDF 专用流水线去 pdf2vqa。", + "examples": ["PromptedVQAGenerator"], + }, + "core_speech": { + "use_for": "语音转文本(Speech2TextGenerator)。", + "not_for": "其他模态不属于这里。", + "examples": ["Speech2TextGenerator"], + }, + "chemistry": { + "use_for": "化学领域:从文本抽取 SMILES、SMILES 等价数据集评估。", + "not_for": "其他领域不属于这里。", + "examples": [ + "ExtractSmilesFromTextGenerator", "SmilesEquivalenceDatasetEvaluator", + ], + }, +} + + +def build_category_response(op_list: list[dict]) -> dict[str, dict]: + """Return ``{category: {count, use_for, not_for, examples}}``. + + Counts are recomputed from the live registry; use_for/not_for come from + CATEGORY_GUIDE. Categories present in the registry but missing from the + guide get an "unknown" stub so the response is never silently lossy. + """ + counts: dict[str, int] = {} + for op in op_list: + cat = (op.get("type") or {}).get("level_1") or "unknown" + counts[cat] = counts.get(cat, 0) + 1 + + result: dict[str, dict] = {} + for cat, n in counts.items(): + guide = CATEGORY_GUIDE.get(cat, {}) + result[cat] = { + "count": n, + "use_for": guide.get("use_for", "(no guidance available)"), + "not_for": guide.get("not_for", "(no guidance available)"), + "examples": guide.get("examples", []), + } + return result + + +def valid_categories(op_list: list[dict]) -> list[str]: + """Categories actually present in the registry (sorted).""" + seen = set() + for op in op_list: + cat = (op.get("type") or {}).get("level_1") + if cat: + seen.add(cat) + return sorted(seen) diff --git a/backend/app/services/param_coercion.py b/backend/app/services/param_coercion.py index a493c48..1dd1afb 100644 --- a/backend/app/services/param_coercion.py +++ b/backend/app/services/param_coercion.py @@ -9,6 +9,44 @@ _TRUE_STRINGS = {"true", "1", "yes", "y", "on"} _FALSE_STRINGS = {"false", "0", "no", "n", "off"} +# Magic prefix used by the frontend operator param panel to refer to a saved +# JSON Schema by id (see components/manage/mainFlow/nodes/operatorNode/valueInput). +# The string "schema_ref:" is dereferenced to the stored schema JSON +# before any type coercion runs. +_SCHEMA_REF_PREFIX = "schema_ref:" + + +def _resolve_schema_ref(value: Any) -> Any: + """If value looks like "schema_ref:", return the stored schema JSON + (as a Python object when possible, otherwise the raw string). Unknown refs + are returned unchanged so that the caller can fail loudly instead of + silently passing an empty dict.""" + if not isinstance(value, str): + return value + if not value.startswith(_SCHEMA_REF_PREFIX): + return value + schema_id = value[len(_SCHEMA_REF_PREFIX):].strip() + if not schema_id: + return value + # Lazy import to avoid circular dependency at module load time. + try: + from app.core.container import container + manager = getattr(container, "json_schema_manager", None) + if manager is None: + return value + record = manager.get(schema_id) + if not record: + return value + schema_text = record.get("schema") + if isinstance(schema_text, str): + try: + return json.loads(schema_text) + except Exception: + return schema_text + return schema_text + except Exception: + return value + def _normalize_empty_string(value: Any) -> Any: if isinstance(value, str) and value == "": @@ -74,6 +112,9 @@ def coerce_param_value( 2) Fall back to coercion based on default_value runtime type. """ value = _normalize_empty_string(value) + # Resolve "schema_ref:" written by the operator params panel into the + # actual JSON Schema contents before any type coercion. + value = _resolve_schema_ref(value) if value is not None and annotation is not inspect.Parameter.empty: try: diff --git a/backend/app/services/user_prompt_registry.py b/backend/app/services/user_prompt_registry.py new file mode 100644 index 0000000..cc1358d --- /dev/null +++ b/backend/app/services/user_prompt_registry.py @@ -0,0 +1,163 @@ +""" +UserPromptRegistry:用户自定义 Prompt 模板的本地持久化管理。 + +与 dataflow.utils.registry.PROMPT_REGISTRY 完全解耦——后者是 DataFlow 内置 +prompt 类的反射入口,本 Registry 仅服务"用户在 WebUI 上写的 f-string 模板"。 + +持久化文件:backend//prompts/user_templates.json,结构: + { + "tpl_xxxxxxx": { + "id": "tpl_xxxxxxx", + "name": "...", + "description": "...", + "template": "Given {question}, produce ...", + "allowed_operators": ["OperatorA"], + "example_variables": { "question": "..." }, + "created_at": "...", "updated_at": "..." + }, + ... + } + +运行时解引用:算子参数中出现 "user_prompt:" 字符串时,由 +param_coercion.resolve_user_prompt_ref(...) 拿到对应模板文本。 +""" +import json +import re +import uuid +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Any +from loguru import logger as log +from app.core.config import settings + + +BACKEND_DIR = Path(__file__).parent.parent.parent +PROMPTS_DIR = BACKEND_DIR / settings.RESOURCE_DIR / "prompts" +PROMPTS_FILE = PROMPTS_DIR / "user_templates.json" + +_PLACEHOLDER_RE = re.compile(r"\{([A-Za-z_][A-Za-z0-9_]*)\}") + + +def _extract_placeholders(template: str) -> List[str]: + if not template: + return [] + seen: List[str] = [] + for m in _PLACEHOLDER_RE.finditer(template): + name = m.group(1) + if name not in seen: + seen.append(name) + return seen + + +def render_template(template: str, variables: Dict[str, Any]) -> Dict[str, Any]: + """Render an f-string template with the given variables. Missing keys are + reported separately instead of raising, so the UI can flag them inline.""" + placeholders = _extract_placeholders(template) + missing = [p for p in placeholders if p not in variables] + if missing: + safe_vars = {p: "{" + p + "}" for p in placeholders} + safe_vars.update({k: v for k, v in variables.items() if k in placeholders}) + rendered = template.format(**safe_vars) + else: + try: + rendered = template.format(**{p: variables[p] for p in placeholders}) + except Exception as e: + rendered = f"[render error: {e}]" + return { + "rendered": rendered, + "placeholders": placeholders, + "missing": missing, + } + + +class UserPromptRegistry: + """CRUD + template rendering for user-defined prompt templates.""" + + def __init__(self): + self._ensure_storage() + self._templates: Dict[str, Dict[str, Any]] = self._load() + + # ── storage ────────────────────────────────────────────── + def _ensure_storage(self): + PROMPTS_DIR.mkdir(parents=True, exist_ok=True) + if not PROMPTS_FILE.exists(): + PROMPTS_FILE.write_text("{}") + + def _load(self) -> Dict[str, Dict[str, Any]]: + try: + content = PROMPTS_FILE.read_text() + return json.loads(content) if content.strip() else {} + except Exception as e: + log.warning(f"Failed to load user prompt templates: {e}") + return {} + + def _save(self): + try: + PROMPTS_FILE.write_text( + json.dumps(self._templates, ensure_ascii=False, indent=2) + ) + except Exception as e: + log.error(f"Failed to save user prompt templates: {e}") + raise + + # ── CRUD ───────────────────────────────────────────────── + def create( + self, + name: str, + description: str, + template: str, + allowed_operators: Optional[List[str]] = None, + example_variables: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + tpl_id = f"tpl_{uuid.uuid4().hex[:8]}" + now = datetime.utcnow().isoformat() + rec = { + "id": tpl_id, + "name": name, + "description": description or "", + "template": template or "", + "allowed_operators": list(allowed_operators or []), + "example_variables": dict(example_variables or {}), + "created_at": now, + "updated_at": now, + } + self._templates[tpl_id] = rec + self._save() + log.info(f"Created user prompt template {tpl_id}") + return rec + + def get(self, tpl_id: str) -> Optional[Dict[str, Any]]: + return self._templates.get(tpl_id) + + def list_all(self) -> List[Dict[str, Any]]: + return list(self._templates.values()) + + def update(self, tpl_id: str, **fields) -> Optional[Dict[str, Any]]: + rec = self._templates.get(tpl_id) + if not rec: + return None + for k in ("name", "description", "template"): + if fields.get(k) is not None: + rec[k] = fields[k] + if fields.get("allowed_operators") is not None: + rec["allowed_operators"] = list(fields["allowed_operators"]) + if fields.get("example_variables") is not None: + rec["example_variables"] = dict(fields["example_variables"]) + rec["updated_at"] = datetime.utcnow().isoformat() + self._save() + return rec + + def delete(self, tpl_id: str) -> bool: + if tpl_id in self._templates: + del self._templates[tpl_id] + self._save() + return True + return False + + # ── helpers ────────────────────────────────────────────── + def preview(self, template: str, variables: Dict[str, Any]) -> Dict[str, Any]: + return render_template(template, variables) + + def get_template_text(self, tpl_id: str) -> Optional[str]: + rec = self._templates.get(tpl_id) + return rec.get("template") if rec else None diff --git a/docs/math_data_synthesis_system.md b/docs/math_data_synthesis_system.md new file mode 100644 index 0000000..bb30468 --- /dev/null +++ b/docs/math_data_synthesis_system.md @@ -0,0 +1,86 @@ +# 数学强推理数据合成系统 + +## 概述 + +本系统包含三个 pipeline,用于合成不同复杂度的数学推理数据,特别适合训练需要强推理能力的AI模型。 + +## 已创建的 Pipeline + +### 1. 数学强推理数据合成器 (高级) +- **ID**: `37d4977e-57cd-48ed-a143-495963002ed4` +- **功能**: 从基础数学问题生成需要多步骤逻辑推理的复杂问题 +- **算子流程**: + 1. `data_complexity_enhancer` - 提升问题复杂度 + 2. `reasoning_step_generator` - 生成推理步骤 + 3. `multi_concept_integrator` - 整合多个数学概念 + 4. `data_validator` - 验证数据质量 + 5. `data_save` - 保存结果 + +### 2. 数学推理数据生成器 (中级) +- **ID**: `57251f75-4fda-48de-954b-fa73af0a22de` +- **功能**: 使用模板方法增强数学问题的推理要求 +- **算子流程**: + 1. `data_template` - 应用生成模板 + 2. `data_augment` - 数据增强 + 3. `data_format` - 格式化输出 + 4. `data_save` - 保存结果 + +### 3. 基础数学数据合成器 (初级) +- **ID**: `7cccb609-047f-4cfe-839e-e58a3f67fe77` +- **功能**: 使用LLM生成复杂推理问题 + +## 数据集 + +### 输入数据集 +1. `math_problems.jsonl` - 基础数学问题集合 +2. `comprehensive_math_dataset.jsonl` - 综合数学数据集,包含代数、几何等 + +### 输出数据集 +1. `strong_reasoning_math_data.jsonl` - 强推理数学数据 +2. `enhanced_math_reasoning.jsonl` - 增强推理数据 +3. `synthetic_math_reasoning.jsonl` - 合成推理数据 + +## 数据特征 + +生成的强推理数据具有以下特征: + +1. **多步骤推理**: 每个问题需要3-6个推理步骤 +2. **概念整合**: 结合代数、几何、逻辑等多个数学概念 +3. **逻辑矛盾检测**: 包含需要识别逻辑矛盾的问题 +4. **验证要求**: 解决方案需要可验证性 +5. **难度分级**: 从基础到高级的难度梯度 + +## 使用指南 + +### 运行 Pipeline + +1. 确保数据集文件存在 +2. 在编辑器中选择要运行的 pipeline +3. 点击"运行"按钮执行 pipeline +4. 查看生成的强推理数据文件 + +### 数据格式 + +每个数据条目包含: +- `original_problem`: 原始问题 +- `enhanced_problem`: 增强后的复杂问题 +- `reasoning_steps`: 推理步骤数组 +- `final_answer`: 最终答案 +- `difficulty`: 难度级别 +- `category`: 数学概念分类 +- `requires_logical_reasoning`: 是否需要逻辑推理 +- `has_multiple_concepts`: 是否包含多个概念 + +## 应用场景 + +1. **AI模型训练**: 训练需要强推理能力的数学AI助手 +2. **教育评估**: 评估学生的逻辑推理能力 +3. **研究工具**: 数学推理能力的研究和测试 +4. **数据增强**: 扩充数学训练数据集 + +## 扩展建议 + +1. 可以添加更多数学概念(如概率、统计等) +2. 增加不同难度级别的模板 +3. 集成更多数据验证规则 +4. 添加自动评分和反馈机制 \ No newline at end of file diff --git a/frontend/src/axios/config.js b/frontend/src/axios/config.js index 331dbea..04667e2 100644 --- a/frontend/src/axios/config.js +++ b/frontend/src/axios/config.js @@ -3,10 +3,12 @@ import axios from 'axios' let ax = axios.create() // config here +// 所有生成的 API 方法里 URL 形如 '/api/v1/...',因此 baseURL 保持为空, +// 由 vite dev proxy (/api -> backend 8000) 或生产环境同源处理。 if (import.meta.env.MODE == 'production') { - ax.defaults.baseURL = import.meta.env.VITE_BACKEND_URL + ax.defaults.baseURL = import.meta.env.VITE_BACKEND_URL || '' } else { - ax.defaults.baseURL = '/api' + ax.defaults.baseURL = '' } ax.interceptors.request.use( diff --git a/frontend/src/components/manage/chatPanel/index.vue b/frontend/src/components/manage/chatPanel/index.vue index 0c7ab4e..3600107 100644 --- a/frontend/src/components/manage/chatPanel/index.vue +++ b/frontend/src/components/manage/chatPanel/index.vue @@ -7,11 +7,29 @@ DataFlow 助手
+ + + + + + + +
+ +
+
+
+ 历史会话 + +
+
+ 还没有历史对话 +
+
+
+
{{ s.title || s.session_id }}
+
+ {{ formatTime(s.updated_at) }} + · {{ s.message_count || 0 }} 轮 +
+ +
+
+
+
+
@@ -38,11 +89,42 @@ >
+
+ +
+
+ + {{ formatToolName(msg.name) }} + 调用中… + 失败 + 完成 + +
+
+
+
input
+
{{ msg.input_preview }}
+
+
+
output
+
{{ msg.output_preview }}
+
+
+
@@ -99,6 +181,7 @@ import { useAppConfig } from '@/stores/appConfig' import { useTheme } from '@/stores/theme' import { useDataflow } from '@/stores/dataflow' import MarkdownIt from 'markdown-it' +import axios from 'axios' const md = new MarkdownIt({ html: false, @@ -117,6 +200,19 @@ function getUserId() { return uid } +// 根据当前页面协议/主机动态构造 WebSocket URL。 +// 开发模式下 vite.config.js 的 proxy (/api, ws:true) 会把 WS 转发到后端; +// 生产模式下后端直接托管前端静态文件,同样同源可达。 +// 关键:**不要**写死 localhost —— 当用户通过局域网 IP 访问前端时, +// localhost 指的是用户本机,而不是运行后端的机器。 +function buildWsUrl() { + if (typeof window === 'undefined' || !window.location) { + return 'ws://localhost:8000/api/v1/agent/ws' + } + const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:' + return `${proto}//${window.location.host}/api/v1/agent/ws` +} + export default { name: 'ChatPanel', data() { @@ -126,8 +222,12 @@ export default { isLoading: false, ws: null, userId: getUserId(), - wsUrl: `ws://localhost:8000/api/v1/agent/ws`, + wsUrl: buildWsUrl(), reconnectTimer: null, + // 历史会话相关 + showHistoryPanel: false, + historyList: [], + currentSessionId: null, } }, computed: { @@ -139,6 +239,7 @@ export default { }, mounted() { this.connectWebSocket() + this.loadHistory() }, beforeUnmount() { this.disconnectWebSocket() @@ -186,6 +287,35 @@ export default { if (msg.type === 'text_chunk') { this.appendToCurrentMessage(msg.content) + } else if (msg.type === 'tool_call_start') { + // Agent 开始调用一个工具:把当前 streaming 的 assistant 消息先 finalize, + // 插入一条 role=tool 消息,后续 text_chunk 会另起一条新的 assistant 消息。 + this.finalizeCurrentMessage() + this.messages.push({ + role: 'tool', + tool_use_id: msg.tool_use_id, + name: msg.name, + input_preview: msg.input_preview || '', + output_preview: '', + status: 'running', + is_error: false, + expanded: false, + streaming: false, + }) + this.$nextTick(() => this.scrollToBottom()) + + } else if (msg.type === 'tool_call_end') { + // 找到对应的 tool 消息,回填输出 + const toolMsg = [...this.messages].reverse().find( + (m) => m.role === 'tool' && m.tool_use_id === msg.tool_use_id + ) + if (toolMsg) { + toolMsg.output_preview = msg.output_preview || '' + toolMsg.is_error = !!msg.is_error + toolMsg.status = msg.is_error ? 'error' : 'done' + } + this.$nextTick(() => this.scrollToBottom()) + } else if (msg.type === 'sync_pipeline') { // 同步 pipeline 到 DAG 编辑器 this.syncFromAgent(msg) @@ -199,12 +329,24 @@ export default { } else if (msg.type === 'session_aborted') { this.isLoading = false this.messages = [] - this.addSystemMessage('⏹ 已终止运行并清除对话历史,开始新会话') + this.currentSessionId = null + this.addSystemMessage('⏹ 已终止运行并开始新会话(历史保留)') + this.loadHistory() } else if (msg.type === 'session_cleared') { this.isLoading = false this.messages = [] - this.addSystemMessage('对话历史已清除,开始新会话') + this.currentSessionId = null + this.addSystemMessage('已开始新会话(历史保留)') + this.loadHistory() + + } else if (msg.type === 'session_switched') { + this.isLoading = false + this.messages = [] + this.currentSessionId = msg.session_id + this.addSystemMessage(`↩ 已切换到历史会话 ${msg.session_id.slice(0, 8)}…,继续对话将从上次记忆恢复`) + this.showHistoryPanel = false + this.loadHistory() } else if (msg.type === 'error') { this.isLoading = false @@ -212,6 +354,82 @@ export default { } }, + formatToolName(name) { + if (!name) return 'tool' + // mcp__dataflow__list_operators → list_operators + const m = name.match(/^mcp__[^_]+__(.+)$/) + return m ? m[1] : name + }, + + formatTime(isoStr) { + if (!isoStr) return '' + try { + const d = new Date(isoStr) + const now = new Date() + const diff = (now - d) / 1000 + if (diff < 60) return '刚刚' + if (diff < 3600) return `${Math.floor(diff / 60)} 分钟前` + if (diff < 86400) return `${Math.floor(diff / 3600)} 小时前` + return d.toLocaleDateString() + } catch (e) { + return isoStr + } + }, + + async toggleHistoryPanel() { + this.showHistoryPanel = !this.showHistoryPanel + if (this.showHistoryPanel) { + await this.loadHistory() + } + }, + + async loadHistory() { + try { + const res = await axios.get('/api/v1/agent/sessions', { + params: { user_id: this.userId }, + }) + const data = res.data && res.data.data ? res.data.data : res.data + this.historyList = data.history || [] + this.currentSessionId = data.current || null + } catch (e) { + console.warn('[ChatPanel] loadHistory failed:', e) + } + }, + + newSession() { + // 通过 WS 让后端脱离当前 session;若断开则仅清空本地视图 + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify({ type: 'new_session' })) + } else { + this.messages = [] + this.currentSessionId = null + } + }, + + switchSession(sessionId) { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + this.addSystemMessage('⚠️ 连接已断开,正在重连…') + this.connectWebSocket() + return + } + this.ws.send(JSON.stringify({ type: 'switch_session', session_id: sessionId })) + }, + + async deleteSession(sessionId) { + try { + await axios.delete(`/api/v1/agent/sessions/${sessionId}`, { + params: { user_id: this.userId }, + }) + if (sessionId === this.currentSessionId) { + this.messages = [] + this.currentSessionId = null + } + await this.loadHistory() + } catch (e) { + console.warn('[ChatPanel] deleteSession failed:', e) + } + }, + appendToCurrentMessage(text) { const lastMsg = this.messages[this.messages.length - 1] if (lastMsg && lastMsg.role === 'assistant' && lastMsg.streaming) { @@ -294,6 +512,7 @@ export default { diff --git a/frontend/src/components/manage/mainFlow/nodes/operatorNode/valueInput/index.vue b/frontend/src/components/manage/mainFlow/nodes/operatorNode/valueInput/index.vue index f8a349b..4196417 100644 --- a/frontend/src/components/manage/mainFlow/nodes/operatorNode/valueInput/index.vue +++ b/frontend/src/components/manage/mainFlow/nodes/operatorNode/valueInput/index.vue @@ -15,6 +15,22 @@ :options="dataManagerList" :choosen-slider-background="thisData.borderColor" :reveal-background-color="[thisData.shadowColor, 'rgba(255, 255, 255, 1)']" :reveal-border-color="thisData.borderColor" border-radius="8" style="width: 100%"> + +
+ + +
@@ -22,10 +38,20 @@ @@ -748,4 +1039,258 @@ export default { 0%, 100% { transform: scale(0.8); opacity: 0.5; } 50% { transform: scale(1.2); opacity: 1; } } + +/* ── Tabs + My Templates ─────────────────────────────────── */ +.df-prompts-page { + display: flex; + flex-direction: column; + height: 100%; + background: rgba(250, 250, 250, 1); + + &.dark { + background: rgba(36, 36, 36, 1); + color: rgba(220, 220, 220, 1); + } +} +.prompts-tabs { + display: flex; + gap: 4px; + padding: 8px 12px 0; + border-bottom: 1px solid rgba(0, 0, 0, 0.08); +} +.df-prompts-page.dark .prompts-tabs { + border-bottom-color: rgba(255, 255, 255, 0.08); +} +.prompts-tab { + padding: 8px 14px; + cursor: pointer; + font-size: 13px; + border-radius: 6px 6px 0 0; + opacity: 0.65; + display: flex; + align-items: center; +} +.prompts-tab:hover { opacity: 0.85; } +.prompts-tab.active { + opacity: 1; + font-weight: 600; + background: rgba(255, 255, 255, 1); + box-shadow: 0 -1px 0 rgba(69, 98, 213, 0.6) inset; +} +.df-prompts-page.dark .prompts-tab.active { + background: rgba(50, 50, 50, 1); +} +.prompts-tab-count { + margin-left: 6px; + padding: 0 6px; + font-size: 11px; + border-radius: 8px; + background: rgba(69, 98, 213, 0.12); + color: #4562d5; +} + +.df-prompts-page .df-prompts-container { + flex: 1; + min-height: 0; +} + +.df-user-prompts { + flex: 1; + min-height: 0; + display: flex; + overflow: hidden; + + .user-prompts-sidebar { + width: 280px; + border-right: 1px solid rgba(0, 0, 0, 0.08); + display: flex; + flex-direction: column; + + .sidebar-header { + display: flex; + align-items: center; + padding: 10px 12px; + border-bottom: 1px solid rgba(0, 0, 0, 0.06); + font-weight: 600; + } + .sidebar-new-btn { + color: #4562d5; + cursor: pointer; + font-size: 12px; + display: flex; + align-items: center; + gap: 2px; + } + .sidebar-scroll { + flex: 1; + overflow-y: auto; + } + .user-prompt-item { + padding: 8px 12px; + border-bottom: 1px solid rgba(0, 0, 0, 0.04); + cursor: pointer; + + &:hover { background: rgba(69, 98, 213, 0.05); } + &.active { background: rgba(69, 98, 213, 0.12); } + } + .user-prompt-name { + font-weight: 500; + font-size: 13px; + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + } + .user-prompt-desc { + font-size: 11px; + opacity: 0.6; + margin-top: 2px; + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + } + .user-prompt-meta { + font-size: 11px; + opacity: 0.5; + margin-top: 2px; + display: flex; + gap: 4px; + } + .sidebar-empty { + padding: 18px; + opacity: 0.5; + font-size: 12px; + text-align: center; + } + } + + .user-prompts-editor { + flex: 1; + overflow-y: auto; + padding: 16px 24px; + + &.empty { + display: flex; + align-items: center; + justify-content: center; + flex-direction: column; + color: rgba(100, 100, 100, 0.6); + text-align: center; + } + + .editor-row { + margin-bottom: 14px; + + label { + display: block; + font-size: 12px; + font-weight: 500; + opacity: 0.7; + margin-bottom: 4px; + } + } + .editor-input { + width: 100%; + padding: 6px 10px; + font-size: 13px; + border-radius: 4px; + border: 1px solid rgba(0, 0, 0, 0.15); + outline: none; + box-sizing: border-box; + } + .editor-input:focus { border-color: #4562d5; } + .editor-textarea { + width: 100%; + padding: 8px 10px; + font-size: 13px; + font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; + border-radius: 4px; + border: 1px solid rgba(0, 0, 0, 0.15); + outline: none; + box-sizing: border-box; + resize: vertical; + } + .editor-textarea:focus { border-color: #4562d5; } + .placeholder-list { + margin-top: 6px; + font-size: 11px; + display: flex; + flex-wrap: wrap; + gap: 4px; + } + .placeholder-label { opacity: 0.55; margin-right: 4px; } + .placeholder-chip { + background: rgba(69, 98, 213, 0.1); + color: #4562d5; + padding: 1px 6px; + border-radius: 8px; + font-family: ui-monospace, Menlo, monospace; + } + .kv-row { + display: flex; + align-items: center; + gap: 8px; + margin-bottom: 6px; + + .kv-key { + width: 100px; + font-size: 12px; + font-family: ui-monospace, Menlo, monospace; + opacity: 0.7; + } + .editor-input { flex: 1; } + } + .preview-block { + background: rgba(246, 246, 250, 0.8); + border: 1px solid rgba(0, 0, 0, 0.08); + border-radius: 4px; + padding: 10px 12px; + font-family: ui-monospace, Menlo, monospace; + font-size: 12px; + white-space: pre-wrap; + word-break: break-word; + max-height: 240px; + overflow-y: auto; + margin: 0; + } + .preview-missing { + margin: 4px 0 0 0; + font-size: 11px; + color: #d64545; + } + .editor-actions { + margin-top: 16px; + display: flex; + gap: 8px; + } + .btn-primary, .btn-secondary, .btn-danger { + padding: 6px 14px; + font-size: 13px; + border-radius: 4px; + border: none; + cursor: pointer; + } + .btn-primary { background: #4562d5; color: white; } + .btn-primary:hover { background: #3550c2; } + .btn-secondary { background: rgba(0, 0, 0, 0.05); } + .btn-danger { background: #d64545; color: white; } + .btn-danger:hover { background: #c23838; } + } +} + +.df-prompts-page.dark .df-user-prompts { + .user-prompts-sidebar, + .user-prompts-sidebar .sidebar-header { border-color: rgba(255, 255, 255, 0.08); } + .user-prompt-item { border-bottom-color: rgba(255, 255, 255, 0.05); } + .editor-input, .editor-textarea { + background: rgba(40, 40, 40, 1); + color: rgba(220, 220, 220, 1); + border-color: rgba(255, 255, 255, 0.15); + } + .preview-block { + background: rgba(40, 40, 40, 0.7); + border-color: rgba(255, 255, 255, 0.08); + color: rgba(220, 220, 220, 1); + } + .btn-secondary { background: rgba(255, 255, 255, 0.08); color: #e0e0e0; } +} diff --git a/frontend/src/views/manage/schemas/index.vue b/frontend/src/views/manage/schemas/index.vue index f573599..437d3b1 100644 --- a/frontend/src/views/manage/schemas/index.vue +++ b/frontend/src/views/manage/schemas/index.vue @@ -36,19 +36,31 @@

-

{{ local('JSON Schema') }}

+
+

{{ local('JSON Schema') }}

+ +
+ @input="onSchemaInput('new', $event.target.value)">

{{ schemaError }}


-

{{ local('Example Data') }}

+
+

{{ local('Example Data') }}

+ +
+ :placeholder="local('Example JSON data that matches this schema...')" + @input="onExampleInput('new', $event.target.value)"> +

{{ exampleError }}

@@ -122,17 +134,29 @@

-

{{ local('Schema') }}

+
+

{{ local('Schema') }}

+ +
+ @input="onSchemaInput(item.id, $event.target.value)">

{{ editError[item.id] }}


-

{{ local('Example Data') }}

+
+

{{ local('Example Data') }}

+ +
+ :class="[{ dark: theme === 'dark' }]" + @input="onExampleInput(item.id, $event.target.value)"> +

{{ editExampleError[item.id] }}

@@ -172,7 +196,9 @@ export default { add: false }, schemaError: '', + exampleError: '', editError: {}, + editExampleError: {}, copyLabel: '', lock: { add: true, @@ -219,6 +245,140 @@ export default { this.schemaError = msg } }, + // ── 结构化校验工具 ────────────────────────────────────── + // 返回 { schema, error } ;error 为空代表 JSON 合法 + tryParseJson(text) { + if (!text || !text.trim()) return { value: null, error: '' } + try { + return { value: JSON.parse(text), error: '' } + } catch (e) { + // e.message 常形如 "Unexpected token } in JSON at position 42" + const m = /position\s+(\d+)/.exec(e.message || '') + if (m) { + const pos = parseInt(m[1], 10) + const pre = text.slice(0, pos) + const line = (pre.match(/\n/g) || []).length + 1 + const col = pos - pre.lastIndexOf('\n') + return { value: null, error: `${e.message} (line ${line}, col ${col})` } + } + return { value: null, error: e.message || 'Invalid JSON' } + } + }, + // 轻量 schema 校验:支持 type/required/properties/items/enum/anyOf/oneOf + // 返回 [] 代表通过;否则返回字符串错误列表。 + validateAgainstSchema(data, schema, pathPrefix = '') { + const errors = [] + if (!schema || typeof schema !== 'object') return errors + + if (Array.isArray(schema.anyOf)) { + const ok = schema.anyOf.some((s) => this.validateAgainstSchema(data, s, pathPrefix).length === 0) + if (!ok) errors.push(`${pathPrefix || 'value'} does not match any of anyOf schemas`) + return errors + } + if (Array.isArray(schema.oneOf)) { + const matches = schema.oneOf.filter((s) => this.validateAgainstSchema(data, s, pathPrefix).length === 0) + if (matches.length !== 1) { + errors.push(`${pathPrefix || 'value'} must match exactly one of oneOf (matched ${matches.length})`) + } + return errors + } + const type = schema.type + const actual = Array.isArray(data) ? 'array' : (data === null ? 'null' : typeof data) + if (type) { + const allowed = Array.isArray(type) ? type : [type] + // JSON Schema 里 integer 是 number 的子集 + const matchType = + allowed.includes(actual) || + (allowed.includes('integer') && actual === 'number' && Number.isInteger(data)) + if (!matchType) { + errors.push(`${pathPrefix || 'value'} expected type ${allowed.join('|')}, got ${actual}`) + return errors + } + } + if (Array.isArray(schema.enum)) { + const found = schema.enum.some((v) => JSON.stringify(v) === JSON.stringify(data)) + if (!found) errors.push(`${pathPrefix || 'value'} not in enum`) + } + if (type === 'object' || (!type && typeof data === 'object' && data !== null && !Array.isArray(data))) { + const props = schema.properties || {} + const required = schema.required || [] + for (const r of required) { + if (!(r in data)) errors.push(`missing required field "${r}"${pathPrefix ? ' at ' + pathPrefix : ''}`) + } + for (const k of Object.keys(props)) { + if (k in data) { + errors.push(...this.validateAgainstSchema(data[k], props[k], pathPrefix ? `${pathPrefix}.${k}` : k)) + } + } + } + if (type === 'array' && schema.items && Array.isArray(data)) { + data.forEach((el, i) => { + errors.push(...this.validateAgainstSchema(el, schema.items, `${pathPrefix || 'value'}[${i}]`)) + }) + } + return errors + }, + // scope: 'new' | item.id;反射到 schemaError / editError + onSchemaInput(scope, text) { + const { error } = this.tryParseJson(text) + if (scope === 'new') { + this.schemaError = error + } else { + if (error) this.editError[scope] = error + else delete this.editError[scope] + } + // schema 变了之后,example 的校验结果也要刷新 + const exampleText = + scope === 'new' + ? this.newSchema.example + : (this.schemaList.find((x) => x.id === scope) || {}).example + this.onExampleInput(scope, exampleText || '') + }, + onExampleInput(scope, text) { + const setErr = (msg) => { + if (scope === 'new') this.exampleError = msg + else { + if (msg) this.editExampleError[scope] = msg + else delete this.editExampleError[scope] + } + } + const { value, error } = this.tryParseJson(text) + if (error) { setErr(error); return } + if (value === null) { setErr(''); return } + // 把 schema 拿出来做结构化校验 + const schemaText = + scope === 'new' + ? this.newSchema.schema + : (this.schemaList.find((x) => x.id === scope) || {}).schema + const parsedSchema = this.tryParseJson(schemaText || '').value + if (!parsedSchema) { setErr(''); return } + const errs = this.validateAgainstSchema(value, parsedSchema) + setErr(errs.length ? errs.slice(0, 3).join('; ') : '') + }, + formatField(objName, field) { + const obj = this[objName] + const { value, error } = this.tryParseJson(obj[field]) + if (error) { + this.$barWarning({ status: 'warning', title: 'Cannot format: ' + error }) + return + } + if (value === null) return + obj[field] = JSON.stringify(value, null, 2) + // 重新触发校验 + if (field === 'schema') this.onSchemaInput('new', obj.schema) + else this.onExampleInput('new', obj.example) + }, + formatEditField(item, field) { + const { value, error } = this.tryParseJson(item[field]) + if (error) { + this.$barWarning({ status: 'warning', title: 'Cannot format: ' + error }) + return + } + if (value === null) return + item[field] = JSON.stringify(value, null, 2) + if (field === 'schema') this.onSchemaInput(item.id, item.schema) + else this.onExampleInput(item.id, item.example) + }, formatJson(json) { try { if (typeof json === 'string') { @@ -230,16 +390,17 @@ export default { } }, checkAdd() { - return this.newSchema.name.trim() && this.newSchema.schema.trim() && !this.schemaError + return this.newSchema.name.trim() && this.newSchema.schema.trim() && !this.schemaError && !this.exampleError }, checkEdit(item) { - return item.name.trim() && item.schema.trim() && !this.editError[item.id] + return item.name.trim() && item.schema.trim() && !this.editError[item.id] && !this.editExampleError[item.id] }, handleAdd() { this.show.add = !this.show.add if (!this.show.add) { this.newSchema = { name: '', description: '', schema: '', example: '' } this.schemaError = '' + this.exampleError = '' } }, confirmAdd() { @@ -532,6 +693,34 @@ export default { color: rgba(191, 95, 95, 1); margin: 5px 0 0 0; } + + .editor-label-row { + display: flex; + align-items: center; + justify-content: space-between; + width: 100%; + margin-bottom: 6px; + + .schema-item-light-title { + margin: 0 !important; + } + + .editor-actions { + display: flex; + gap: 12px; + + .editor-link { + font-size: 12px; + color: rgba(100, 100, 200, 1); + cursor: pointer; + user-select: none; + + &:hover { + text-decoration: underline; + } + } + } + } } } } diff --git a/frontend/vite.config.js b/frontend/vite.config.js index 9d5f4b5..0962e99 100644 --- a/frontend/vite.config.js +++ b/frontend/vite.config.js @@ -23,10 +23,16 @@ export default defineConfig({ }, server: { proxy: { + // 后端 FastAPI 路由本身就挂在 /api/v1/... 下,这里**不要**重写路径 '/api': { - target: 'http://127.0.0.1:8000/', // 后端 FastAPI 地址 + target: 'http://127.0.0.1:8000/', changeOrigin: true, - rewrite: path => path.replace(/^\/api/, '') // 如果后端没有 /api 前缀 + ws: true // 让 /api/v1/agent/ws 也走这个代理 + }, + // MCP SSE(FastAPI-MCP 挂在 /mcp) + '/mcp': { + target: 'http://127.0.0.1:8000/', + changeOrigin: true } } } diff --git a/frontend/yarn.lock b/frontend/yarn.lock index c5898a3..754787c 100644 --- a/frontend/yarn.lock +++ b/frontend/yarn.lock @@ -213,10 +213,10 @@ swiper "8.3.2" vue "^3.4.20" -"@esbuild/win32-x64@0.19.12": +"@esbuild/linux-x64@0.19.12": version "0.19.12" - resolved "https://registry.npmjs.org/@esbuild/win32-x64/-/win32-x64-0.19.12.tgz" - integrity sha512-T1QyPSDCyMXaO3pzBkF96E8xMkiRYbUEZADd29SyPGabqxMViNoii+NcK7eWJAEoU6RZyEm5lVSIjTmcdoB9HA== + resolved "https://registry.npmjs.org/@esbuild/linux-x64/-/linux-x64-0.19.12.tgz" + integrity sha512-B71g1QpxfwBvNrfyJdVDexenDIt1CiDN1TIXLbhOw0KhJzE78KIFGX6OJ9MrtC0oOqMWf+0xop4qEU8JrJTwCg== "@eslint-community/eslint-utils@^4.2.0", "@eslint-community/eslint-utils@^4.4.0": version "4.4.0" @@ -321,10 +321,10 @@ "@nodelib/fs.scandir" "2.1.5" fastq "^1.6.0" -"@parcel/watcher-win32-x64@2.5.1": +"@parcel/watcher-linux-x64-glibc@2.5.1": version "2.5.1" - resolved "https://registry.npmjs.org/@parcel/watcher-win32-x64/-/watcher-win32-x64-2.5.1.tgz" - integrity sha512-9lHBdJITeNR++EvSQVUcaZoWupyHfXe1jZvGZ06O/5MflPcuPLtEphScIBL+AiCWBO46tDSHzWyD0uDmmZqsgA== + resolved "https://registry.npmjs.org/@parcel/watcher-linux-x64-glibc/-/watcher-linux-x64-glibc-2.5.1.tgz" + integrity sha512-GcESn8NZySmfwlTsIur+49yDqSny2IhPeZfXunQi48DMugKeZ7uy1FX83pO0X22sHntJ4Ub+9k34XQCX+oHt2A== "@parcel/watcher@^2.4.1": version "2.5.1" @@ -370,10 +370,10 @@ resolved "https://registry.npmjs.org/@remirror/core-constants/-/core-constants-3.0.0.tgz" integrity sha512-42aWfPrimMfDKDi4YegyS7x+/0tlzaqwPQCULLanv3DMIlu96KTJR0fM5isWX2UViOqlGnX6YFgqWepcX+XMNg== -"@rollup/rollup-win32-x64-msvc@4.13.0": +"@rollup/rollup-linux-x64-gnu@4.13.0": version "4.13.0" - resolved "https://registry.npmjs.org/@rollup/rollup-win32-x64-msvc/-/rollup-win32-x64-msvc-4.13.0.tgz" - integrity sha512-UKXUQNbO3DOhzLRwHSpa0HnhhCgNODvfoPWv2FCXme8N/ANFfhIPMGuOT+QuKd16+B5yxZ0HdpNlqPvTMS1qfw== + resolved "https://registry.npmjs.org/@rollup/rollup-linux-x64-gnu/-/rollup-linux-x64-gnu-4.13.0.tgz" + integrity sha512-yUD/8wMffnTKuiIsl6xU+4IA8UNhQ/f1sAnQebmE/lyQ8abjsVyDkyRkWop0kdMhKMprpNIhPmYlCxgHrPoXoA== "@rushstack/eslint-patch@^1.3.3": version "1.7.2" diff --git a/test.py b/test.py deleted file mode 100644 index fd5deb5..0000000 --- a/test.py +++ /dev/null @@ -1 +0,0 @@ -print('Hello, Git!')