-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathverify_async_system.py
More file actions
executable file
·195 lines (161 loc) · 6.41 KB
/
verify_async_system.py
File metadata and controls
executable file
·195 lines (161 loc) · 6.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/env python3
"""
PDF转换任务状态轮询机制系统验证脚本
"""
import sys
import asyncio
import importlib.util
from pathlib import Path
def check_module_import(module_path: str, module_name: str) -> bool:
"""检查模块是否可以成功导入"""
try:
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
print(f"✅ {module_name}: 导入成功")
return True
except Exception as e:
print(f"❌ {module_name}: 导入失败 - {e}")
return False
def check_file_exists(file_path: str, description: str) -> bool:
"""检查文件是否存在"""
if Path(file_path).exists():
print(f"✅ {description}: 文件存在")
return True
else:
print(f"❌ {description}: 文件不存在")
return False
async def main():
"""主验证函数"""
print("=== PDF转换任务状态轮询机制系统验证 ===")
print()
success_count = 0
total_checks = 0
# 1. 检查核心文件是否存在
print("1. 检查核心文件...")
files_to_check = [
("app/models/jobs_queue.py", "JobsQueue数据模型"),
("app/models/processing_job.py", "ProcessingJob数据模型(扩展)"),
("app/core/queue_manager.py", "混合队列管理器"),
("app/core/process_pool.py", "独立处理进程池"),
("app/api/async_convert.py", "异步转换API"),
("app/schemas/async_task.py", "异步任务Schema"),
("migrate_jobs_queue.py", "JobsQueue表迁移脚本"),
("migrate_task_id.py", "Task ID迁移脚本"),
("start_async.sh", "启动脚本"),
("run_async_tests.sh", "测试脚本"),
("ASYNC_README.md", "异步任务文档")
]
for file_path, description in files_to_check:
total_checks += 1
if check_file_exists(file_path, description):
success_count += 1
print()
# 2. 检查模块导入
print("2. 检查模块导入...")
modules_to_check = [
("app/models/jobs_queue.py", "JobsQueue"),
("app/core/queue_manager.py", "QueueManager"),
("app/core/process_pool.py", "ProcessPool"),
("app/schemas/async_task.py", "AsyncTaskSchemas"),
]
for module_path, module_name in modules_to_check:
total_checks += 1
if check_module_import(module_path, module_name):
success_count += 1
print()
# 3. 检查配置完整性
print("3. 检查配置完整性...")
try:
from app.config import settings
# 检查新增的配置项
config_items = [
("vllm_parallel_size", "进程池大小"),
("queue_size", "队列容量"),
("proceed_timeout", "任务超时时间"),
("task_check_interval", "轮询间隔"),
("max_retry_attempts", "最大重试次数"),
]
for attr, description in config_items:
total_checks += 1
if hasattr(settings, attr):
value = getattr(settings, attr)
print(f"✅ {description} ({attr}): {value}")
success_count += 1
else:
print(f"❌ {description} ({attr}): 配置项缺失")
except Exception as e:
print(f"❌ 配置检查失败: {e}")
total_checks += len(config_items)
print()
# 4. 检查数据库模型扩展
print("4. 检查数据库模型扩展...")
try:
from app.models import JobsQueue, ProcessingJob
# 检查JobsQueue模型
total_checks += 1
if hasattr(JobsQueue, '__tablename__') and JobsQueue.__tablename__ == 'jobs_queue':
print("✅ JobsQueue模型: 表名正确")
success_count += 1
else:
print("❌ JobsQueue模型: 表名错误")
# 检查ProcessingJob扩展
total_checks += 1
if hasattr(ProcessingJob, 'update_progress'):
print("✅ ProcessingJob模型: 进度跟踪方法存在")
success_count += 1
else:
print("❌ ProcessingJob模型: 进度跟踪方法缺失")
except Exception as e:
print(f"❌ 数据库模型检查失败: {e}")
total_checks += 2
print()
# 5. 检查API路由注册
print("5. 检查API路由注册...")
try:
from app.api import async_convert_router
from app.main import app
total_checks += 1
# 检查路由是否包含预期的端点
routes = [route.path for route in app.routes]
if "/api/v1/convert/submit" in [route.path for route in async_convert_router.routes]:
print("✅ 异步转换API: 路由注册成功")
success_count += 1
else:
print("❌ 异步转换API: 路由注册失败")
except Exception as e:
print(f"❌ API路由检查失败: {e}")
total_checks += 1
print()
# 6. 检查测试文件
print("6. 检查测试文件...")
test_files = [
("tests/test_queue_manager.py", "队列管理器测试"),
("tests/test_process_pool.py", "进程池测试"),
("tests/test_async_convert_api.py", "异步API测试"),
]
for test_file, description in test_files:
total_checks += 1
if check_file_exists(test_file, description):
success_count += 1
print()
# 验证结果总结
print("=== 验证结果总结 ===")
print(f"总检查项: {total_checks}")
print(f"成功项: {success_count}")
print(f"失败项: {total_checks - success_count}")
print(f"成功率: {(success_count / total_checks * 100):.1f}%")
if success_count == total_checks:
print("\n🎉 系统验证完全通过!异步任务轮询机制已成功实现。")
print("\n下一步:")
print("1. 配置.env文件中的必需环境变量")
print("2. 运行数据库迁移: python migrate_jobs_queue.py && python migrate_task_id.py")
print("3. 启动应用: ./start_async.sh")
print("4. 运行测试: ./run_async_tests.sh")
return 0
else:
print(f"\n⚠️ 系统验证发现 {total_checks - success_count} 个问题,请检查失败项。")
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)