forked from yutiansut/qaopenmdgateway
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_multi_ctp.py
More file actions
executable file
·236 lines (187 loc) · 8.46 KB
/
test_multi_ctp.py
File metadata and controls
executable file
·236 lines (187 loc) · 8.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/env python3
"""
多CTP连接测试脚本
测试新的多CTP架构下的订阅分发功能
"""
import asyncio
import websockets
import json
import time
import sys
from typing import List, Dict, Any
class MultiCTPTester:
def __init__(self, server_url: str = "ws://localhost:7799"):
self.server_url = server_url
self.websocket = None
self.received_data = {}
self.subscription_status = {}
async def connect(self):
"""连接到WebSocket服务器"""
try:
self.websocket = await websockets.connect(self.server_url)
print(f"✅ 连接到服务器: {self.server_url}")
return True
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
async def send_message(self, message: Dict[str, Any]):
"""发送消息到服务器"""
if self.websocket:
await self.websocket.send(json.dumps(message))
print(f"📤 发送消息: {message}")
async def receive_messages(self):
"""接收服务器消息"""
try:
async for message in self.websocket:
data = json.loads(message)
await self.handle_message(data)
except websockets.exceptions.ConnectionClosed:
print("🔌 连接已关闭")
except Exception as e:
print(f"❌ 接收消息错误: {e}")
async def handle_message(self, data: Dict[str, Any]):
"""处理接收到的消息"""
msg_type = data.get("type", "unknown")
if msg_type == "welcome":
print(f"🎉 收到欢迎消息:")
print(f" Session ID: {data.get('session_id')}")
print(f" CTP状态: {'已连接' if data.get('ctp_connected') else '未连接'}")
elif msg_type == "subscribe_response":
print(f"📋 订阅响应:")
print(f" 状态: {data.get('status')}")
print(f" 已订阅数量: {data.get('subscribed_count')}")
elif msg_type == "market_data":
instrument_id = data.get("instrument_id")
connection_id = data.get("connection_id", "unknown")
if instrument_id not in self.received_data:
self.received_data[instrument_id] = {
'count': 0,
'connections': set(),
'last_update': None
}
self.received_data[instrument_id]['count'] += 1
self.received_data[instrument_id]['connections'].add(connection_id)
self.received_data[instrument_id]['last_update'] = time.time()
print(f"📊 [{connection_id}] {instrument_id}: 价格={data.get('last_price')}, "
f"成交量={data.get('volume')}, 时间={data.get('update_time')}")
elif msg_type == "error":
print(f"❌ 服务器错误: {data.get('message')}")
else:
print(f"📥 其他消息类型: {msg_type}")
async def test_mass_subscription(self, instruments: List[str]):
"""测试大量订阅"""
print(f"\n🔄 开始测试大量订阅 ({len(instruments)} 个合约)")
# 分批订阅以测试负载均衡
batch_size = 100
for i in range(0, len(instruments), batch_size):
batch = instruments[i:i+batch_size]
message = {
"action": "subscribe",
"instruments": batch
}
await self.send_message(message)
await asyncio.sleep(1) # 避免过快发送
print(f"✅ 已发送所有订阅请求")
async def monitor_connections(self, duration: int = 60):
"""监控连接状态"""
print(f"\n👀 监控连接状态 ({duration} 秒)")
start_time = time.time()
while time.time() - start_time < duration:
# 显示统计信息
total_messages = sum(data['count'] for data in self.received_data.values())
active_instruments = len(self.received_data)
all_connections = set()
for data in self.received_data.values():
all_connections.update(data['connections'])
print(f"📈 实时统计:")
print(f" 活跃合约: {active_instruments}")
print(f" 总消息数: {total_messages}")
print(f" 使用连接: {len(all_connections)} - {list(all_connections)}")
await asyncio.sleep(10)
async def test_failover(self):
"""测试故障转移(需要手动断开一个连接)"""
print(f"\n🚨 故障转移测试")
print("请手动断开一个CTP连接以测试故障转移...")
# 记录故障前的连接状态
before_connections = set()
for data in self.received_data.values():
before_connections.update(data['connections'])
print(f"故障前连接: {before_connections}")
# 监控30秒
await asyncio.sleep(30)
# 记录故障后的连接状态
after_connections = set()
for data in self.received_data.values():
after_connections.update(data['connections'])
print(f"故障后连接: {after_connections}")
if len(after_connections) < len(before_connections):
print("✅ 检测到连接减少,故障转移可能已触发")
else:
print("⚠️ 未检测到连接变化")
async def main():
# 创建测试合约列表
test_instruments = []
# 添加一些期货合约
futures_codes = ['rb', 'i', 'j', 'jm', 'ZC', 'FG', 'MA', 'TA', 'bu', 'ru']
months = ['2501', '2502', '2503', '2504', '2505']
for code in futures_codes:
for month in months:
test_instruments.append(f"{code}{month}")
# 添加一些股指期货
index_codes = ['IF', 'IC', 'IH', 'IM', 'TF', 'T', 'TS']
for code in index_codes:
for month in months[:3]: # 只用前3个月份
test_instruments.append(f"{code}{month}")
print(f"准备测试 {len(test_instruments)} 个合约")
# 创建测试器
server_url = "ws://localhost:7799"
if len(sys.argv) > 1:
server_url = sys.argv[1]
tester = MultiCTPTester(server_url)
# 连接服务器
if not await tester.connect():
return
try:
# 启动消息接收任务
receive_task = asyncio.create_task(tester.receive_messages())
# 等待连接建立
await asyncio.sleep(3)
# 执行测试
print("🚀 开始多CTP连接测试")
# 1. 测试大量订阅
await tester.test_mass_subscription(test_instruments)
# 2. 监控连接状态
await tester.monitor_connections(60)
# 3. 测试故障转移(可选)
test_failover = input("\n是否测试故障转移? (y/N): ").strip().lower() == 'y'
if test_failover:
await tester.test_failover()
print("\n📊 最终统计:")
total_messages = sum(data['count'] for data in tester.received_data.values())
active_instruments = len(tester.received_data)
all_connections = set()
for instrument, data in tester.received_data.items():
all_connections.update(data['connections'])
if data['count'] > 0:
print(f" {instrument}: {data['count']} 消息, 连接: {list(data['connections'])}")
print(f"\n总计:")
print(f" 活跃合约: {active_instruments}")
print(f" 总消息数: {total_messages}")
print(f" 使用连接: {len(all_connections)} - {list(all_connections)}")
except KeyboardInterrupt:
print("\n⏹️ 用户中断测试")
finally:
if tester.websocket:
await tester.websocket.close()
if __name__ == "__main__":
# 检查帮助参数
if len(sys.argv) > 1 and sys.argv[1] == "--help":
print("🧪 QuantAxis 多CTP连接测试脚本")
print("=" * 50)
print("用法: python3 test_multi_ctp.py [WebSocket_URL]")
print("默认URL: ws://127.0.0.1:7799")
print("示例: python3 test_multi_ctp.py ws://localhost:7799")
sys.exit(0)
print("🧪 QuantAxis 多CTP连接测试脚本")
print("=" * 50)
asyncio.run(main())