-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel_worker.php
More file actions
193 lines (159 loc) · 5.37 KB
/
parallel_worker.php
File metadata and controls
193 lines (159 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
<?php
/**
* 并行代理检测工作进程
* 处理单个批次的代理检测任务
*/
// 设置错误报告
error_reporting(E_ALL);
ini_set('display_errors', 1);
// 设置执行时间限制
set_time_limit(600); // 10分钟
require_once 'config.php';
require_once 'database.php';
require_once 'monitor.php';
require_once 'logger.php';
// 检查命令行参数
if ($argc < 5) {
echo "Usage: php parallel_worker.php <batch_id> <offset> <limit> <status_file> [offline_only]\n";
exit(1);
}
$batchId = $argv[1];
$offset = intval($argv[2]);
$limit = intval($argv[3]);
$statusFile = $argv[4];
$offlineOnly = isset($argv[5]) && $argv[5] === '1';
// 初始化组件
$db = new Database();
$monitor = new NetworkMonitor();
$logger = new Logger();
$logger->info("工作进程启动: {$batchId}, offset={$offset}, limit={$limit}");
try {
// 更新批次状态为运行中
updateBatchStatus($statusFile, [
'status' => 'running',
'start_time' => time()
]);
// 获取当前批次的代理列表
$proxies = $offlineOnly ? $db->getOfflineProxiesBatch($offset, $limit) : $db->getProxiesBatch($offset, $limit);
$totalProxies = count($proxies);
if ($totalProxies == 0) {
$errorMsg = $offlineOnly ? '没有找到离线代理数据' : '没有找到代理数据';
updateBatchStatus($statusFile, [
'status' => 'completed',
'end_time' => time(),
'error' => $errorMsg
]);
exit(0);
}
$checkType = $offlineOnly ? "离线代理" : "代理";
$logger->info("批次 {$batchId} 获取到 {$totalProxies} 个{$checkType}");
// 检测每个代理
$checkedCount = 0;
$onlineCount = 0;
$offlineCount = 0;
foreach ($proxies as $proxy) {
// 检查是否被取消
if (isCancelled()) {
$logger->info("批次 {$batchId} 被取消");
updateBatchStatus($statusFile, [
'status' => 'cancelled',
'end_time' => time()
]);
exit(0);
}
// 使用快速检测方法
$result = $monitor->checkProxyFast($proxy);
$checkedCount++;
if ($result['status'] === 'online') {
$onlineCount++;
} else {
$offlineCount++;
}
// 减少状态文件更新频率,每10个代理更新一次
if ($checkedCount % 10 == 0 || $checkedCount == $totalProxies) {
$progress = ($checkedCount / $totalProxies) * 100;
updateBatchStatus($statusFile, [
'progress' => round($progress, 2),
'checked' => $checkedCount,
'online' => $onlineCount,
'offline' => $offlineCount
]);
}
// 每检查20个代理记录一次日志
if ($checkedCount % 20 == 0) {
$logger->info("批次 {$batchId} 进度: {$checkedCount}/{$totalProxies} (在线: {$onlineCount}, 离线: {$offlineCount})");
}
}
// 标记批次完成
updateBatchStatus($statusFile, [
'status' => 'completed',
'progress' => 100,
'end_time' => time()
]);
$logger->info("批次 {$batchId} 完成: 检查 {$checkedCount} 个代理,在线 {$onlineCount} 个,离线 {$offlineCount} 个");
} catch (Exception $e) {
$logger->error("批次 {$batchId} 出现错误: " . $e->getMessage());
updateBatchStatus($statusFile, [
'status' => 'error',
'end_time' => time(),
'error' => $e->getMessage()
]);
exit(1);
}
/**
* 更新批次状态
* @param string $statusFile 状态文件路径
* @param array $updates 更新内容
* @return bool 是否成功
*/
function updateBatchStatus(string $statusFile, array $updates): bool {
if (!file_exists($statusFile)) {
return false;
}
// 使用文件锁确保原子性操作
$lockFile = $statusFile . '.lock';
$lockHandle = fopen($lockFile, 'w');
if (!$lockHandle || !flock($lockHandle, LOCK_EX)) {
if ($lockHandle) fclose($lockHandle);
return false;
}
try {
$status = json_decode(file_get_contents($statusFile), true);
if (!$status) {
return false;
}
// 合并更新
foreach ($updates as $key => $value) {
$status[$key] = $value;
}
// 原子性写入:先写临时文件,再重命名
$tempFile = $statusFile . '.tmp';
$result = file_put_contents($tempFile, json_encode($status, JSON_UNESCAPED_UNICODE));
if ($result !== false) {
// 原子性重命名
$success = rename($tempFile, $statusFile);
// 强制刷新文件系统缓存
if ($success && function_exists('opcache_invalidate')) {
opcache_invalidate($statusFile, true);
}
return $success;
}
return false;
} finally {
// 释放锁
flock($lockHandle, LOCK_UN);
fclose($lockHandle);
@unlink($lockFile);
}
}
/**
* 检查是否被取消
* @return bool 是否已取消
*/
function isCancelled(): bool {
global $statusFile;
// 从状态文件路径推导出临时目录
$tempDir = dirname($statusFile);
$cancelFile = $tempDir . '/cancel.flag';
return file_exists($cancelFile);
}