-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathworker.php
More file actions
68 lines (54 loc) · 1.52 KB
/
Copy pathworker.php
File metadata and controls
68 lines (54 loc) · 1.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
<?php
/*
* This is a worker process that will run forever (blocking)
* and execute tasks as they appear in the queue.
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
define("RABBITMQ_HOST", "rabbitmq.programster.org");
define("RABBITMQ_PORT", 5672);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
RABBITMQ_HOST,
RABBITMQ_PORT,
RABBITMQ_USERNAME,
RABBITMQ_PASSWORD
);
$channel = $connection->channel();
# Create the queue if it doesn't already exist.
$channel->queue_declare(
$queue = RABBITMQ_QUEUE_NAME,
$passive = false,
$durable = true,
$exclusive = false,
$auto_delete = false,
$nowait = false,
$arguments = null,
$ticket = null
);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
$job = json_decode($msg->body, $assocForm=true);
sleep($job['sleep_period']);
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume(
$queue = RABBITMQ_QUEUE_NAME,
$consumer_tag = '',
$no_local = false,
$no_ack = false,
$exclusive = false,
$nowait = false,
$callback
);
while (count($channel->callbacks))
{
$channel->wait();
}
$channel->close();
$connection->close();