Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
*.zip
__pycache__
dist_release/
dist_release/

# local runtime artifacts
data/
backend/dataflow_cache/
backend/cache_local/
backend/data/dataflow_core/

# editor / OS
.DS_Store
*.swp
*.pyc
.idea/
.vscode/

# stale "* copy" skill dirs accidentally committed in earlier branches
.claude/skills/* copy/
87 changes: 47 additions & 40 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,49 @@
# DataFlow WebUI — Project Rules

## WebUI Change Policy (MANDATORY)

All modifications to `backend/app/` and `frontend/src/` MUST comply with this policy.

### Allowed Changes ONLY

1. **Operator Rendering Support (Frontend)** — Add or fix rendering for DataFlow operators not yet properly displayed in the WebUI pipeline canvas.
- New/fixed node components in `components/manage/mainFlow/nodes/`
- Fix parameter rendering in operator config panels
- Add missing operator-specific UI elements (param editors, display widgets)
- Fix display bugs for operators that exist in DataFlow but show incorrectly

Verification: operator exists in `OPERATOR_REGISTRY` (`GET /api/v1/operators/`) but WebUI renders it incorrectly.

2. **Operator API Support (Backend)** — Add or fix API endpoints that serve existing DataFlow operator/pipeline/serving/dataset data.
- Missing CRUD endpoint for an existing resource type
- Fix serialization of operator parameters
- Fix edge cases in existing endpoints (null handling, pagination)
- Add query params or filters to existing list endpoints

### Prohibited Changes

- **NO** new analytical pages or dashboards (e.g., taxonomy analytics, custom reports)
- **NO** new backend endpoints for derived/computed data (e.g., `/api/v1/taxonomy/analytics`, `/api/v1/custom-stats`)
- **NO** new sidebar navigation items (`navList` in `manage/index.vue`)
- **NO** new routes in `router/Manage/index.js`
- **NO** new `views/manage/*/index.vue` pages
- **NO** new services/registries in `core/container.py`
- **NO** new config paths in `core/config.py`
- **NO** feature-level functionality not tied to making existing operators render in the UI

### Decision Rule

Before any frontend/backend change, check:
1. Does it make an existing DataFlow operator render correctly in the WebUI? → Allowed
2. Does it fix a bug in an existing API endpoint? → Allowed
3. Does it add missing CRUD for an existing resource type? → Allowed
4. Anything else → **PROHIBITED** — ask user to explicitly override

If the user requests a prohibited change, explain the policy and ask for explicit override confirmation.
> **Deployment assumption**: this project is a **single-user, local-first** tool.
> Each developer/user runs both backend and frontend on their own machine.
> There is **no authentication, no registration, no multi-tenant concern**.
> Do NOT add login pages, JWT/session infrastructure, or per-user ACL.

## WebUI Change Policy

The previous strict allow-list (only operator-rendering fixes / existing-endpoint
patches) has been **relaxed**. New routes, new sidebar items, new backend
endpoints, and new services are all permitted when they serve the goal of
"completing DataFlow-WebUI as a usable local tool".

### Encouraged work areas

1. **Agent chat experience** — tool-call progress indicators, session
persistence, multi-session switching, reconnect logic, robust error surfacing.
2. **Prompt management** — both read-only browsing of the built-in
`PROMPT_REGISTRY` AND first-class support for user-defined prompt templates
(file-based persistence under `backend/data/prompts/`, CRUD endpoints,
editor UI, reference from operator config).
3. **JSON Schema management** — container-based service injection, schema
validation (ajv), monaco/codemirror editor, deep integration with operator
parameter panels so saved schemas can be chosen for structured LLM outputs.
4. **Operator rendering** — still a priority: new/fixed node components in
`components/manage/mainFlow/nodes/`, parameter editors, correct display of
operators that already exist in `OPERATOR_REGISTRY`.
5. **Repo hygiene** — consolidating/deleting duplicate markdown, removing
stray `* copy` skill directories, fixing obvious code smells (same-name
route handlers, hard-coded URLs, services bypassing the DI container).

### Still prohibited

- **Authentication / registration / multi-user features.** Single-user local
tool only.
- **External telemetry / analytics uploads.** No outbound reporting.
- **Breaking changes to `OPERATOR_REGISTRY` / `PROMPT_REGISTRY` semantics** —
always extend, never redefine, so upstream DataFlow stays compatible.

### Decision rule

Before a change, ask:

1. Does it make DataFlow-WebUI more useful as a single-user local tool? → OK
2. Does it introduce a login page, account system, or remote telemetry? → **NO**
3. Does it silently mutate DataFlow core-registry semantics? → **NO**
4. Everything else → proceed, and keep changes minimal + consistent with
existing patterns (`core/container.py`, `api/v1/envelope.py`, etc.).
173 changes: 166 additions & 7 deletions backend/app/api/v1/endpoints/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,37 @@

消息协议(前端 → 后端):
{ "type": "chat", "message": "用户输入" }
{ "type": "abort_session" } ← 终止正在运行的 claude 进程并清除上下文
{ "type": "clear_session" } ← 仅清除上下文(不终止进程,已废弃,保留兼容)
{ "type": "abort_session" } ← 终止正在运行的 claude 进程并清除当前会话(保留历史)
{ "type": "clear_session" } ← 脱离当前会话(不 kill 进程),等价于"新建一个对话"
{ "type": "new_session" } ← 同 clear_session,语义更清晰
{ "type": "switch_session", ← 切换到一条历史会话,下一轮对话继续它
"session_id": "..." }

消息协议(后端 → 前端):
{ "type": "text_chunk", "content": "..." } ← Agent 回复文本(流式)
{ "type": "tool_call_start", "tool_use_id": "...",
"name": "mcp__dataflow__list_operators",
"input_preview": "{ ... }" } ← Agent 开始调用某个工具
{ "type": "tool_call_end", "tool_use_id": "...",
"is_error": false, "output_preview": "..." } ← 工具调用完成
{ "type": "sync_pipeline", "pipeline": {...},
"nodes": [...], "edges": [...] } ← Pipeline 同步到 DAG 编辑器
{ "type": "done" } ← 本轮回复完成
{ "type": "session_aborted" } ← 会话已终止并清除
{ "type": "session_cleared" } ← 会话已清除(兼容旧版)
{ "type": "session_cleared" } ← 会话已清除(新对话,保留历史)
{ "type": "session_switched", ← 已切换到历史会话
"session_id": "..." }
{ "type": "error", "message": "..." } ← 错误信息
"""
import asyncio
import json
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException
from pydantic import BaseModel
from app.services.agent_session import AgentSessionManager
from app.core.logger_setup import get_logger
from app.api.v1.resp import ok
from app.api.v1.envelope import ApiResponse
from typing import List, Optional

logger = get_logger(__name__)

Expand Down Expand Up @@ -97,6 +111,14 @@ async def agent_websocket(websocket: WebSocket):
async def run_chat_stream(message: str):
"""在后台 Task 中运行 Agent 流式输出"""
nonlocal active_task

def _truncate(obj, limit=400):
try:
s = json.dumps(obj, ensure_ascii=False)
except Exception:
s = str(obj)
return s if len(s) <= limit else s[:limit] + "…"

try:
async for chunk in agent_manager.chat_stream(
user_id=user_id,
Expand All @@ -115,16 +137,71 @@ async def run_chat_stream(message: str):
})

# Claude Code 格式:type=assistant,完整消息体
# content 数组中可能包含 text / tool_use 块
elif chunk_type == "assistant":
message_obj = chunk.get("message", {})
for content_block in message_obj.get("content", []):
if content_block.get("type") == "text":
block_type = content_block.get("type")
if block_type == "text":
text = content_block.get("text", "")
if text:
await ws_manager.send(user_id, {
"type": "text_chunk",
"content": text,
})
elif block_type == "tool_use":
# Agent 开始调用一个工具
await ws_manager.send(user_id, {
"type": "tool_call_start",
"tool_use_id": content_block.get("id", ""),
"name": content_block.get("name", ""),
"input_preview": _truncate(
content_block.get("input", {})
),
})

# Claude Code 格式:type=result,本轮 agent 循环的最终结束信号
# (可能因为成功、kvcache 超限、turn 超限等原因结束)
elif chunk_type == "result":
subtype = chunk.get("subtype", "")
stop_reason = chunk.get("stop_reason", "")
is_error = bool(chunk.get("is_error", False))
result_text = chunk.get("result", "") or ""
# 只在"异常结束但文本又为空"时推送警告——成功结束由主循环稍后的 done 覆盖
if (is_error or stop_reason in (
"kvcache_no_enough", "max_turns_exceeded",
)) and not result_text.strip():
hint = {
"kvcache_no_enough":
"模型上下文/KV 缓存已塞满,Agent 中止。建议:清空对话重来,"
"明确让 Agent 先调 list_operator_categories 再按类别拉算子,"
"避免一次性 list_operators。",
"max_turns_exceeded":
"Agent 工具调用轮次达到上限。建议用更具体的指令或拆小需求再试。",
}.get(stop_reason, f"Agent 异常结束(stop_reason={stop_reason})")
await ws_manager.send(user_id, {
"type": "error",
"message": hint,
})

# Claude Code 格式:type=user,通常是工具执行结果回填
elif chunk_type == "user":
message_obj = chunk.get("message", {})
for content_block in message_obj.get("content", []):
if content_block.get("type") == "tool_result":
output = content_block.get("content", "")
# content 可能是字符串或 content block 列表
if isinstance(output, list):
output = "".join(
(b.get("text", "") if isinstance(b, dict) else str(b))
for b in output
)
await ws_manager.send(user_id, {
"type": "tool_call_end",
"tool_use_id": content_block.get("tool_use_id", ""),
"is_error": bool(content_block.get("is_error", False)),
"output_preview": _truncate(output),
})

# 本轮回复正常完成
await ws_manager.send(user_id, {"type": "done"})
Expand Down Expand Up @@ -177,12 +254,38 @@ async def run_chat_stream(message: str):
await ws_manager.send(user_id, {"type": "session_aborted"})
logger.info(f"[{user_id}] Session aborted")

elif msg_type == "clear_session":
# 兼容旧版:仅清除 session ID(不强制终止进程)
elif msg_type == "clear_session" or msg_type == "new_session":
# 脱离当前会话(不 kill 进程、保留历史),等价于"新建一个对话"
agent_manager.clear_session(user_id)
await ws_manager.send(user_id, {"type": "session_cleared"})
logger.info(f"[{user_id}] Session cleared")

elif msg_type == "switch_session":
sid = (raw.get("session_id") or "").strip()
if not sid:
await ws_manager.send(user_id, {
"type": "error", "message": "switch_session requires session_id",
})
continue
# 正在跑的流需要先中止,避免和旧 session 的子进程纠缠
if active_task and not active_task.done():
active_task.cancel()
try:
await active_task
except (asyncio.CancelledError, Exception):
pass
ok_switch = agent_manager.switch_session(user_id, sid)
if ok_switch:
await ws_manager.send(user_id, {
"type": "session_switched", "session_id": sid,
})
logger.info(f"[{user_id}] Switched to session {sid}")
else:
await ws_manager.send(user_id, {
"type": "error",
"message": f"Session {sid} not found for this user",
})

except WebSocketDisconnect:
# 前端断开连接:清理后台任务
if active_task and not active_task.done():
Expand All @@ -201,3 +304,59 @@ async def run_chat_stream(message: str):
except Exception:
pass
ws_manager.disconnect(user_id)


# ─── Session history REST API ────────────────────────────────────────────────

class SessionInfo(BaseModel):
session_id: str
title: str
created_at: str
updated_at: str
message_count: int = 0


class SessionListOut(BaseModel):
current: Optional[str] = None
history: List[SessionInfo] = []


class RenameSessionIn(BaseModel):
title: str


@router.get(
"/sessions",
response_model=ApiResponse[SessionListOut],
summary="列出指定 user 的历史 Agent 会话",
)
def list_agent_sessions(user_id: str = "default"):
history = agent_manager.list_history(user_id)
return ok(SessionListOut(
current=agent_manager.get_session_id(user_id),
history=[SessionInfo(**h) for h in history],
))


@router.delete(
"/sessions/{session_id}",
response_model=ApiResponse[dict],
summary="删除一条历史 Agent 会话",
)
def delete_agent_session(session_id: str, user_id: str = "default"):
removed = agent_manager.delete_session(user_id, session_id)
if not removed:
raise HTTPException(status_code=404, detail="Session not found")
return ok({"deleted": True, "session_id": session_id})


@router.put(
"/sessions/{session_id}",
response_model=ApiResponse[dict],
summary="重命名一条历史 Agent 会话",
)
def rename_agent_session(session_id: str, body: RenameSessionIn, user_id: str = "default"):
renamed = agent_manager.rename_session(user_id, session_id, body.title.strip())
if not renamed:
raise HTTPException(status_code=404, detail="Session not found")
return ok({"session_id": session_id, "title": body.title.strip()})
Loading