-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel_batch_manager.php
More file actions
198 lines (169 loc) · 6.14 KB
/
parallel_batch_manager.php
File metadata and controls
198 lines (169 loc) · 6.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
<?php
/**
* 并行批次管理器
* 负责管理和启动多个并行批次
*/
// 设置错误报告:仅记录到日志,不向外部输出
error_reporting(E_ALL);
ini_set('display_errors', 0);
ini_set('log_errors', 1);
// 设置执行时间限制
set_time_limit(0); // 无限制
require_once 'config.php';
require_once 'database.php';
require_once 'logger.php';
// 检查命令行参数
if ($argc < 4) {
echo "Usage: php parallel_batch_manager.php <total_proxies> <batch_size> <temp_dir> [offline_only]\n";
exit(1);
}
$totalProxies = intval($argv[1]);
$batchSize = intval($argv[2]);
$tempDir = $argv[3];
$offlineOnly = isset($argv[4]) && $argv[4] === '1';
// 初始化组件
$logger = new Logger();
$maxProcesses = 24; // 最大并行进程数
$logger->info("批次管理器启动: 总代理={$totalProxies}, 批次大小={$batchSize}, 最大进程={$maxProcesses}");
try {
// 更新主状态为运行中
$mainStatusFile = $tempDir . '/main_status.json';
if (file_exists($mainStatusFile)) {
$mainStatus = json_decode(file_get_contents($mainStatusFile), true);
$mainStatus['status'] = 'running';
file_put_contents($mainStatusFile, json_encode($mainStatus));
}
// 计算批次数
$totalBatches = ceil($totalProxies / $batchSize);
$processes = [];
$batchIndex = 0;
// 启动所有批次
for ($offset = 0; $offset < $totalProxies; $offset += $batchSize) {
// 检查是否被取消
if (isCancelled($tempDir)) {
$logger->info("批次管理器被取消");
break;
}
$batchId = 'batch_' . $batchIndex;
$statusFile = $tempDir . '/' . $batchId . '.json';
// 计算当前批次的实际大小
$currentBatchSize = min($batchSize, $totalProxies - $offset);
// 创建批次状态文件
$batchStatus = [
'batch_id' => $batchId,
'offset' => $offset,
'limit' => $currentBatchSize,
'status' => 'pending',
'progress' => 0,
'checked' => 0,
'online' => 0,
'offline' => 0,
'start_time' => time(),
'end_time' => null,
'error' => null
];
file_put_contents($statusFile, json_encode($batchStatus));
// 如果达到最大进程数,等待一些进程完成
if (count($processes) >= $maxProcesses) {
waitForProcesses($processes, $maxProcesses - 1, $logger);
}
// 启动新的检测进程
$process = startBatchProcess($batchId, $offset, $currentBatchSize, $statusFile, $offlineOnly);
if ($process) {
$processes[$batchId] = $process;
$checkType = $offlineOnly ? "离线代理" : "所有代理";
$logger->info("启动批次 {$batchId} ({$checkType}): offset={$offset}, limit={$currentBatchSize}");
}
$batchIndex++;
}
// 等待所有进程完成
$logger->info("等待所有批次完成...");
waitForAllProcesses($processes, $logger);
// 更新主状态为完成
if (file_exists($mainStatusFile)) {
$mainStatus = json_decode(file_get_contents($mainStatusFile), true);
$mainStatus['status'] = 'completed';
$mainStatus['end_time'] = time();
file_put_contents($mainStatusFile, json_encode($mainStatus));
}
$logger->info("批次管理器完成");
} catch (Exception $e) {
$logger->error("批次管理器出现错误: " . $e->getMessage());
// 更新主状态为错误
if (file_exists($mainStatusFile)) {
$mainStatus = json_decode(file_get_contents($mainStatusFile), true);
$mainStatus['status'] = 'error';
$mainStatus['error'] = $e->getMessage();
$mainStatus['end_time'] = time();
file_put_contents($mainStatusFile, json_encode($mainStatus));
}
exit(1);
}
/**
* 启动单个批次检测进程
* @param string $batchId 批次ID
* @param int $offset 偏移量
* @param int $limit 数量限制
* @param string $statusFile 状态文件路径
* @param bool $offlineOnly 是否只检测离线代理
* @return resource|false 进程句柄
*/
function startBatchProcess(string $batchId, int $offset, int $limit, string $statusFile, bool $offlineOnly = false) {
$scriptPath = __DIR__ . '/parallel_worker.php';
$offlineFlag = $offlineOnly ? '1' : '0';
// 所有参数均通过 escapeshellarg 转义,防止命令注入
$args = implode(' ', [
escapeshellarg($scriptPath),
escapeshellarg($batchId),
(int)$offset,
(int)$limit,
escapeshellarg($statusFile),
$offlineFlag,
]);
if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
$command = 'start /B php ' . $args;
} else {
$command = 'php ' . $args . ' > /dev/null 2>&1 &';
}
$process = popen($command, 'r');
return $process;
}
/**
* 等待指定数量的进程完成
* @param array $processes 进程列表
* @param int $maxRemaining 最大剩余进程数
* @param Logger $logger 日志对象
*/
function waitForProcesses(array &$processes, int $maxRemaining, Logger $logger): void {
while (count($processes) > $maxRemaining) {
foreach ($processes as $batchId => $process) {
// 检查进程是否完成
$status = pclose($process);
unset($processes[$batchId]);
$logger->info("批次 {$batchId} 完成");
break; // 只等待一个进程完成
}
usleep(defined('PARALLEL_BATCH_POLL_US') ? PARALLEL_BATCH_POLL_US : 500000); // 0.5秒检查间隔
}
}
/**
* 等待所有进程完成
* @param array $processes 进程列表
* @param Logger $logger 日志对象
*/
function waitForAllProcesses(array $processes, Logger $logger): void {
foreach ($processes as $batchId => $process) {
pclose($process);
$logger->info("批次 {$batchId} 完成");
}
}
/**
* 检查是否被取消
* @param string $tempDir 临时目录路径
* @return bool 是否已取消
*/
function isCancelled(string $tempDir): bool {
$cancelFile = $tempDir . '/cancel.flag';
return file_exists($cancelFile);
}
?>