-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
201 lines (170 loc) · 6.21 KB
/
Copy pathserver.py
File metadata and controls
201 lines (170 loc) · 6.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
A2A Server 端实现
模拟一个远程 Agent 服务,通过 A2A 协议接收请求并流式响应。
用 FastAPI + SSE (Server-Sent Events) 实现。
启动: uvicorn server:app --port 8000
"""
from __future__ import annotations
import asyncio
import json
import uuid
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI(title="A2A Demo Agent Server")
# ─── 1. Agent Card:agent 的"名片" ─────────────────────────
@app.get("/agent.json")
async def agent_card():
"""Agent 发现机制:告诉调用方"我是谁、我能干什么"
类比:你去拜访一家公司,先看前台的名牌知道这是哪家公司。
"""
return {
"name": "Demo Calculator Agent",
"description": "一个能做简单数学计算的演示 Agent",
"url": "http://localhost:8000/a2a",
"version": "0.1.0",
"capabilities": ["streaming", "state-management"],
"default_input_modes": ["text/plain"],
"default_output_modes": ["text/plain"],
"skills": [
{
"id": "calculate",
"name": "数学计算",
"description": "执行加减乘除等数学运算",
}
],
}
# ─── 2. A2A 端点:接收消息,流式返回 ───────────────────────
@app.post("/a2a")
async def send_streaming_message(request: dict):
"""核心 A2A 端点:接收 MessageSendParams,返回 SSE 流
流程:
1. 解析请求中的 message
2. 创建 Task(告诉客户端"我接了这个任务")
3. 流式推送 Message(思考过程、工具调用、文本回复)
4. 推送 TaskStatusUpdate(任务完成)
"""
message = request.get("message", {})
parts = message.get("parts", [])
context_id = message.get("context_id") or str(uuid.uuid4())
# 提取用户文本
user_text = ""
for part in parts:
if part.get("type") == "text":
user_text += part.get("text", "")
task_id = f"task_{uuid.uuid4().hex[:12]}"
async def event_stream():
"""SSE 生成器:逐步推送 A2A 事件"""
# ── 事件 1: Task 创建 ──
yield _sse({
"kind": "task",
"task": {
"id": task_id,
"context_id": context_id,
"status": {"state": "submitted"},
},
})
await asyncio.sleep(0.3)
# ── 事件 2: 状态更新 → Working ──
yield _sse({
"kind": "status-update",
"task_id": task_id,
"context_id": context_id,
"status": {"state": "working"},
})
await asyncio.sleep(0.5)
# ── 事件 3: Thinking(思考过程) ──
yield _sse({
"kind": "message",
"message_id": f"msg_{uuid.uuid4().hex[:12]}",
"role": "agent",
"task_id": task_id,
"context_id": context_id,
"parts": [{
"type": "data",
"data": {
"type": "thinking",
"thinking": f"用户说: '{user_text}',我需要解析并计算。",
},
}],
})
await asyncio.sleep(0.5)
# ── 事件 4: Tool Use(工具调用) ──
tool_id = f"tool_{uuid.uuid4().hex[:8]}"
yield _sse({
"kind": "message",
"message_id": f"msg_{uuid.uuid4().hex[:12]}",
"role": "agent",
"task_id": task_id,
"context_id": context_id,
"parts": [{
"type": "data",
"data": {
"type": "tool_use",
"tool_id": tool_id,
"tool_name": "calculator",
"input": {"expression": user_text},
},
}],
})
await asyncio.sleep(0.5)
# ── 事件 5: Tool Result(工具结果) ──
result = _mock_calculate(user_text)
yield _sse({
"kind": "message",
"message_id": f"msg_{uuid.uuid4().hex[:12]}",
"role": "agent",
"task_id": task_id,
"context_id": context_id,
"parts": [{
"type": "data",
"data": {
"type": "tool_result",
"tool_use_id": tool_id,
"content": str(result),
"is_error": False,
},
}],
})
await asyncio.sleep(0.3)
# ── 事件 6: 文本回复 ──
yield _sse({
"kind": "message",
"message_id": f"msg_{uuid.uuid4().hex[:12]}",
"role": "agent",
"task_id": task_id,
"context_id": context_id,
"parts": [{"type": "text", "text": f"计算结果是: **{result}**"}],
})
await asyncio.sleep(0.3)
# ── 事件 7: 任务完成 ──
yield _sse({
"kind": "status-update",
"task_id": task_id,
"context_id": context_id,
"status": {"state": "completed"},
})
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
# ─── 3. 取消任务 ────────────────────────────────────────────
@app.post("/a2a/cancel")
async def cancel_task(request: dict):
"""客户端可以主动取消正在执行的任务"""
task_id = request.get("id", "")
return {"status": "cancelled" if task_id else "not_found", "task_id": task_id}
def _mock_calculate(expr: str) -> str:
"""模拟计算器工具"""
try:
cleaned = "".join(c for c in expr if c in "0123456789.+-*/() ")
if cleaned:
return str(eval(cleaned)) # noqa: S307
except Exception:
pass
return f"无法解析: {expr}"
def _sse(data: dict) -> str:
return f"data: {json.dumps(data)}\n\n"
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)