-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimulation.py
More file actions
298 lines (252 loc) · 10.4 KB
/
simulation.py
File metadata and controls
298 lines (252 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#!/usr/bin/env python3
"""
多智能体协作搜救系统模拟
本程序模拟一个无人机集群协作搜救场景,展示多智能体系统中的:
- 智能体角色(指挥中心、无人机)
- 决策方法(状态机、拍卖机制)
- 协作机制(通信、任务分配、协同搜索)
运行方式:
python simulation.py # 运行可视化模拟
python simulation.py --text # 运行文本模式模拟
python simulation.py --drones 5 # 指定无人机数量
"""
import sys
import argparse
import random
import math
# 添加当前目录到路径
sys.path.insert(0, '.')
from src.agent_base import Position
from src.environment import Environment
from src.drone_agent import DroneAgent, DroneSpec
from src.coordinator_agent import CoordinatorAgent
from src.communication import MessageBus
from src.scenarios import ScenarioManager, print_scenarios
def create_simulation(
num_drones: int = 4,
scenario: str = 'medium',
map_size: float = 120
):
"""
创建模拟环境
Args:
num_drones: 无人机数量
scenario: 场景类型 (easy/medium/hard/extreme/city/mountain/maze)
map_size: 地图大小
Returns:
environment, coordinator, drones, message_bus
"""
print("=" * 60)
print(" 无人机集群协作搜救系统 - 多智能体模拟")
print("=" * 60)
# 创建消息总线
message_bus = MessageBus()
# 创建环境(更大的地图)
environment = Environment(width=map_size, height=map_size)
center = map_size / 2
environment.base_position = Position(center, center)
random.seed(42)
# 使用场景管理器加载场景
ScenarioManager.create_scenario(scenario, environment)
# 创建指挥中心(位于地图中心)
coordinator = CoordinatorAgent(
agent_id="coord_001",
name="指挥中心",
message_bus=message_bus
)
coordinator.position = Position(center, center)
# 注意:coordinator在__init__中已自动注册,不需要再次注册
# 创建无人机(从地图中心均匀分布出发)
# 每个无人机有不同的能力参数,体现多智能体差异化
drones = []
drone_configs = [
{"name": "Alpha", "speed": 7.0, "range": 20.0, "battery": 100, "consumption": 0.12}, # 快速侦察型
{"name": "Bravo", "speed": 5.0, "range": 22.0, "battery": 120, "consumption": 0.18}, # 续航型
{"name": "Charlie", "speed": 6.0, "range": 15.0, "battery": 100, "consumption": 0.15}, # 平衡型
{"name": "Delta", "speed": 5.5, "range": 25.0, "battery": 90, "consumption": 0.20}, # 广域感知型
{"name": "Echo", "speed": 8.0, "range": 12.0, "battery": 80, "consumption": 0.25}, # 高速型
{"name": "Foxtrot", "speed": 4.5, "range": 18.0, "battery": 150, "consumption": 0.10}, # 持久型
]
for i in range(num_drones):
config = drone_configs[i % len(drone_configs)]
# 无人机从中心周围均匀分布,但距离稍有不同
angle = (2 * math.pi * i) / num_drones
dist_from_center = 5 + (i % 3) * 2 # 不同距离,体现差异
start_pos = Position(
center + dist_from_center * math.cos(angle),
center + dist_from_center * math.sin(angle)
)
drone = DroneAgent(
agent_id=f"drone_{i+1:03d}",
name=f"无人机-{config['name']}",
position=start_pos,
spec=DroneSpec(
max_speed=config['speed'],
perception_range=config['range'],
battery_capacity=config['battery'],
battery_consumption=config['consumption']
),
message_bus=message_bus
)
drones.append(drone)
# 注意:drone在__init__中已自动注册,不需要再次注册
coordinator.register_drone(drone)
print(f"\n配置信息:")
print(f" - 场景类型: {scenario}")
print(f" - 地图大小: {map_size} x {map_size}")
print(f" - 基地位置: 中心 ({center:.0f}, {center:.0f})")
print(f" - 无人机数量: {num_drones}")
print(f" - 搜救目标数量: {len(environment.targets)}")
print(f" - 障碍物数量: {len(environment.obstacles)}")
print()
return environment, coordinator, drones, message_bus
def run_text_simulation(
environment,
coordinator,
drones,
max_steps: int = 300
):
"""
运行文本模式模拟
"""
print("\n" + "=" * 60)
print(" 开始文本模式模拟")
print("=" * 60)
# 启动任务
coordinator.start_mission(environment)
step = 0
last_found = 0
mission_complete_step = None # 记录任务完成的步骤
return_steps = 50 # 任务完成后给无人机返航的步数
while step < max_steps:
# 检查是否应该结束模拟
if mission_complete_step is not None:
# 任务已完成,检查是否所有无人机都到达基地或超时
all_at_base = all(
drone.position.distance_to(environment.base_position) < 5
for drone in drones
)
if all_at_base or (step - mission_complete_step) > return_steps:
break
# 更新所有无人机
for drone in drones:
drone.update(environment)
# 检查是否发现目标
perception = drone.perceive(environment)
for target in perception.get('visible_targets', []):
environment.mark_target_found(target.target_id, drone.agent_id)
# 更新指挥中心
coordinator_perception = coordinator.perceive(environment)
action = coordinator.decide(coordinator_perception)
if action:
coordinator.act(action)
# 环境步进
environment.step()
# 定期输出状态
stats = environment.get_statistics()
if step % 20 == 0 or stats['found_targets'] != last_found:
print(f"\n[步骤 {step}] 搜索进度: {stats['search_progress']*100:.1f}% "
f"({stats['found_targets']}/{stats['total_targets']} 目标)")
for drone in drones:
status = drone.get_status()
print(f" {status['name']}: {status['state']}, "
f"位置: {status['position']}, 电量: {status['battery']}")
last_found = stats['found_targets']
step += 1
# 检查是否所有目标都找到
if stats['found_targets'] == stats['total_targets'] and mission_complete_step is None:
print(f"\n*** 所有目标已找到! 用时 {step} 步 ***")
print("无人机正在返回基地...")
mission_complete_step = step
# 输出最终结果
print("\n" + "=" * 60)
print(" 模拟结束")
print("=" * 60)
stats = environment.get_statistics()
print(f"\n最终统计:")
print(f" - 总步数: {step}")
print(f" - 发现目标: {stats['found_targets']}/{stats['total_targets']}")
print(f" - 搜索完成率: {stats['search_progress']*100:.1f}%")
print(f"\n无人机最终状态:")
for drone in drones:
status = drone.get_status()
print(f" {status['name']}:")
print(f" - 状态: {status['state']}")
print(f" - 位置: {status['position']}")
print(f" - 电量: {status['battery']}")
print(f" - 发现目标数: {status['targets_found']}")
print(f"\n发现的目标:")
for target in environment.targets:
status = "已发现" if target.found else "未发现"
finder = f" (发现者: {target.found_by})" if target.found_by else ""
print(f" - {target.target_id}: {target.position} [{status}]{finder}")
def run_visual_simulation(
environment,
coordinator,
drones,
max_steps: int = 300
):
"""
运行可视化模拟
"""
try:
import matplotlib
matplotlib.use('TkAgg') # 使用TkAgg后端
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
from visualization import run_visualization
run_visualization(
environment, coordinator, drones,
max_steps=max_steps, delay=0.05
)
except ImportError as e:
print(f"警告: 无法导入matplotlib ({e})")
print("切换到文本模式...")
run_text_simulation(environment, coordinator, drones, max_steps)
except Exception as e:
print(f"可视化出错: {e}")
print("切换到文本模式...")
run_text_simulation(environment, coordinator, drones, max_steps)
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='多智能体协作搜救系统模拟')
parser.add_argument('--text', action='store_true', help='使用文本模式(无图形界面)')
parser.add_argument('--list', action='store_true', help='列出所有可用场景')
parser.add_argument('--drones', type=int, default=4, help='无人机数量 (默认: 4)')
parser.add_argument('--scenario', type=str, default='medium',
help='场景类型: easy/medium/hard/extreme/city/mountain/maze (默认: medium)')
parser.add_argument('--size', type=float, default=100, help='地图大小 (默认: 100)')
parser.add_argument('--steps', type=int, default=500, help='最大步数 (默认: 500)')
args = parser.parse_args()
# 列出场景
if args.list:
print_scenarios()
return
# 根据场景自动调整无人机数量
scenario_drones = {
'easy': 3,
'medium': 4,
'hard': 5,
'extreme': 6,
'city': 6,
'mountain': 5,
'maze': 4
}
# 如果用户没有指定无人机数量,使用场景推荐值
num_drones = args.drones
if args.drones == 4 and args.scenario in scenario_drones:
num_drones = scenario_drones[args.scenario]
# 创建模拟
environment, coordinator, drones, message_bus = create_simulation(
num_drones=num_drones,
scenario=args.scenario,
map_size=args.size
)
# 运行模拟
if args.text:
run_text_simulation(environment, coordinator, drones, args.steps)
else:
run_visual_simulation(environment, coordinator, drones, args.steps)
if __name__ == "__main__":
main()