-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathsubagent_delegation_basic.py
More file actions
322 lines (288 loc) · 12.7 KB
/
subagent_delegation_basic.py
File metadata and controls
322 lines (288 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""Sub-agent delegation — basic example.
This example shows the standard pattern for delegating a task to a sub-agent:
- Manager agent calls the ``subagent`` tool (synchronous, ``background=False``).
- ``SubagentSystem`` creates a child entity, runs it, and returns the result.
- Manager receives the result and produces a final summary.
- ``SystemPromptRenderSystem`` resolves ``${_installed_subagents}`` automatically.
- ``UserPromptNormalizationSystem`` normalises outbound user messages.
For a more advanced demo covering background queuing, FIFO scheduling, and
streaming telemetry see ``examples/subagent_delegation.py``.
Dual-mode operation:
- Without ``LLM_API_KEY``: Uses FakeProvider (no network calls needed).
- With ``LLM_API_KEY``: Uses OpenAIProvider for real LLM interaction.
"""
import asyncio
import os
from ecs_agent.components import (
ConversationComponent,
LLMComponent,
OwnerComponent,
SubagentRegistryComponent,
SubagentSessionTableComponent,
ToolRegistryComponent,
UserPromptConfigComponent,
)
from ecs_agent.core import Runner, World
from ecs_agent.prompts.contracts import (
PlaceholderSpec,
PromptTemplateSource,
SystemPromptConfigSpec,
)
from ecs_agent.providers import FakeProvider
from ecs_agent.providers import OpenAIProvider
from ecs_agent.providers.config import ApiFormat, ProviderConfig
from ecs_agent.providers.protocol import LLMProvider
from ecs_agent.systems.error_handling import ErrorHandlingSystem
from ecs_agent.systems.memory import MemorySystem
from ecs_agent.systems.reasoning import ReasoningSystem
from ecs_agent.systems.subagent import SubagentSystem
from ecs_agent.systems.system_prompt_render_system import SystemPromptRenderSystem
from ecs_agent.systems.tool_execution import ToolExecutionSystem
from ecs_agent.systems.user_prompt_normalization_system import (
UserPromptNormalizationSystem,
)
from ecs_agent.types import (
CompletionResult,
EntityId,
InheritancePolicy,
Message,
SubagentConfig,
ToolCall,
)
async def main() -> None:
"""Run the basic sub-agent delegation example.
Flow:
1. SystemPromptRenderSystem resolves ``${_installed_subagents}`` and
custom placeholders into the manager's rendered system prompt.
2. UserPromptNormalizationSystem normalises the outbound user message.
3. Manager receives user question and calls the ``subagent`` tool
synchronously (``background=False``).
4. SubagentSystem creates and runs the child entity inline.
5. Result is delivered back to the manager conversation.
6. Manager synthesises a final summary.
"""
world = World()
# ── LLM Provider Configuration ──────────────────────────────────
DEFAULT_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
DEFAULT_MODEL = "qwen3.5-flash"
api_key = os.environ.get("LLM_API_KEY", "")
base_url = os.environ.get("LLM_BASE_URL", DEFAULT_BASE_URL)
model = os.environ.get("LLM_MODEL", DEFAULT_MODEL)
if api_key:
print(f"Using OpenAIProvider with model: {model}")
print(f"Base URL: {base_url}")
print()
else:
print("No LLM_API_KEY provided. Using FakeProvider for demonstration.")
print("To use a real API, set LLM_API_KEY, LLM_BASE_URL, and LLM_MODEL.")
print()
# ── Provider Setup ──────────────────────────────────────────────
manager_provider: LLMProvider
subagent_provider: LLMProvider
if api_key:
manager_provider = OpenAIProvider(
config=ProviderConfig(
provider_id="openai",
base_url=base_url,
api_key=api_key,
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
),
model=model,
)
subagent_provider = OpenAIProvider(
config=ProviderConfig(
provider_id="openai",
base_url=base_url,
api_key=api_key,
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
),
model=model,
)
else:
# FakeProvider for manager:
# Turn 1 — call the 'subagent' tool synchronously (background=False).
# Turn 2 — produce a final summary once the result is available.
manager_provider = FakeProvider(
responses=[
CompletionResult(
message=Message(
role="assistant",
content="I'll delegate the research task to the researcher sub-agent.",
tool_calls=[
ToolCall(
id="call_sync_001",
name="subagent",
arguments={
"category": "researcher",
"prompt": (
"Research the most promising near-term "
"applications of quantum computing."
),
"background": False,
},
)
],
)
),
# Turn 2: final summary
CompletionResult(
message=Message(
role="assistant",
content=(
"Based on the research, here is the summary:\n\n"
"Quantum computing uses qubits to perform calculations "
"impossible for classical computers.\n"
"Key near-term applications include:\n"
"1. Drug discovery — simulating molecular interactions\n"
"2. Optimization — logistics and supply-chain routing\n"
"3. Cryptography — post-quantum encryption standards"
),
)
),
]
)
# FakeProvider for the researcher sub-agent
subagent_provider = FakeProvider(
responses=[
CompletionResult(
message=Message(
role="assistant",
content=(
"After researching quantum computing applications, I found "
"three promising areas: (1) drug discovery through molecular "
"simulation, (2) combinatorial optimization for logistics, "
"and (3) post-quantum cryptography."
),
)
),
]
)
# ── Manager Entity Setup ────────────────────────────────────────
manager_id = world.create_entity()
# LLMComponent — system_prompt left empty; rendered by SystemPromptRenderSystem
world.add_component(
manager_id,
LLMComponent(
provider=manager_provider,
model=model if api_key else "fake-manager",
),
)
# SystemPromptConfigSpec — uses ${_installed_subagents} builtin placeholder
world.add_component(
manager_id,
SystemPromptConfigSpec(
template_source=PromptTemplateSource(
inline=(
"You are a manager agent. When given a complex question, "
"use the 'subagent' tool to delegate work to a sub-agent. "
"After receiving the result, synthesize it into a concise summary.\n\n"
"Available tools:\n${_installed_tools}\n\n"
"Available subagents:\n${_installed_subagents}\n\n"
"Session: ${session_label}"
)
),
placeholders=[
PlaceholderSpec(
name="session_label", value="subagent-delegation-basic"
),
],
),
)
world.add_component(
manager_id,
ConversationComponent(
messages=[
Message(
role="user",
content=(
"What are the most promising near-term applications "
"of quantum computing?"
),
)
]
),
)
# UserPromptConfigComponent — opts the manager into user-prompt normalisation
world.add_component(manager_id, UserPromptConfigComponent())
# Sub-agent registry: one named sub-agent "researcher"
world.add_component(
manager_id,
SubagentRegistryComponent(
subagents={
"researcher": SubagentConfig(
name="researcher",
provider=subagent_provider,
model=model if api_key else "fake-researcher",
description="Research and gather information on any topic",
system_prompt=(
"You are a research sub-agent. Investigate the given topic "
"thoroughly and report your findings back to the manager."
),
max_ticks=10,
inheritance_policy=InheritancePolicy(
inherit_system_prompt=True,
inherit_tools=[],
),
)
}
),
)
# Required for subagent tools
world.add_component(manager_id, ToolRegistryComponent(tools={}, handlers={}))
world.add_component(manager_id, SubagentSessionTableComponent(sessions={}))
# ── Systems Registration ────────────────────────────────────────
# Prompt rendering systems run first (negative priority)
world.register_system(SystemPromptRenderSystem(priority=-20), priority=-20)
world.register_system(UserPromptNormalizationSystem(priority=-10), priority=-10)
subagent_system = SubagentSystem(priority=-1)
world.register_system(subagent_system, priority=-1)
# Install the 'subagent' tool (sync + async/background delegation)
subagent_system.install_subagent_tool(world, manager_id, tool_name="subagent")
subagent_system.install_subagent_control_tools(world, manager_id)
world.register_system(ReasoningSystem(priority=0), priority=0)
world.register_system(ToolExecutionSystem(priority=5), priority=5)
world.register_system(MemorySystem(), priority=10)
world.register_system(ErrorHandlingSystem(priority=99), priority=99)
# ── Run Agent ───────────────────────────────────────────────────
runner = Runner()
await runner.run(world, max_ticks=10)
# ── Print Results ───────────────────────────────────────────────
print("=" * 60)
print("Manager Conversation (Basic Subagent Delegation)")
print("=" * 60)
_print_conversation("Manager", manager_id, world)
# Show parent-child relationship
for entity_id, components in world.query(OwnerComponent):
(owner_comp,) = components
print(
f"\n[OwnerComponent] Sub-agent (entity {entity_id}) "
f"→ Manager (entity {owner_comp.owner_id})"
)
def _print_conversation(label: str, entity_id: EntityId, world: World) -> None:
"""Pretty-print an entity's conversation."""
print(f"\n--- {label} (entity {entity_id}) ---")
conv = world.get_component(entity_id, ConversationComponent)
if conv is None:
print(" (no conversation)")
return
for msg in conv.messages:
role = msg.role.upper()
if msg.tool_calls:
print(f" [{role}] {msg.content or '(no content)'}")
for tool_call in msg.tool_calls:
print(f" → Tool Call: {tool_call.name}({tool_call.arguments})")
elif msg.role == "tool":
print(f" [{role}] (tool_call_id={msg.tool_call_id})")
lines = (msg.content or "").split("\n")
first, rest = lines[0], lines[1:]
print(f" {first}")
for line in rest:
print(f" {line}")
else:
lines = (msg.content or "").split("\n")
first, rest = lines[0], lines[1:]
print(f" [{role}] {first}")
for line in rest:
print(f" {line}")
if __name__ == "__main__":
asyncio.run(main())