Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
> [!IMPORTANT]
> Dotkernel component used to queue tasks to be processed asynchronously based on [netglue/laminas-messenger](https://github.com/netglue/laminas-messenger)

A queue system is a vital component in modern web applications that enables the decoupling of certain tasks from the regular request-response cycle.
A queue system is a vital part in modern web applications that enables the decoupling of certain tasks from the regular request-response cycle.

This is especially useful for time-consuming and resource-intensive operations which are thus handled asynchronously by background workers on a separate system.
This is especially useful for time-consuming and resource-intensive operations which are thus handled asynchronously by background workers on a separate system.

The greatest benefit is to application responsiveness which allows faster execution, while the heavy lifting is scheduled in the queue based on available resources.
The greatest benefit is to application responsiveness, which allows faster execution, while the heavy lifting is scheduled in the queue based on available resources.

The queue system uses logs to ensure maintainability and implements retry features for reliability and stability.
The queue system uses logs to ensure maintainability and implements retry features for reliability and stability.

![Queue process](https://docs.dotkernel.org/img/queue/schema.png)

Expand All @@ -28,10 +28,6 @@ The greatest benefit is to application responsiveness which allows faster execut
[![Qodana](https://github.com/dotkernel/queue/actions/workflows/qodana_code_quality.yml/badge.svg?branch=main)](https://github.com/dotkernel/queue/actions/workflows/qodana_code_quality.yml)
[![PHPStan](https://github.com/dotkernel/queue/actions/workflows/static-analysis.yml/badge.svg?branch=main)](https://github.com/dotkernel/queue/actions/workflows/static-analysis.yml)

## Installation

> Until we have a compiled documentation, read the files from /doc/book/v1 folder

## Documentation

Documentation is available at: https://docs.dotkernel.org/queue-documentation
8 changes: 4 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
},
"require-dev": {
"laminas/laminas-coding-standard": "^3.0",
"phpunit/phpunit": "^10.5.45",
"roave/security-advisories": "dev-master",
"swoole/ide-helper": "~5.0.0",
"phpstan/phpstan": "^2.0",
"phpstan/phpstan-doctrine": "^2.0",
"phpstan/phpstan-phpunit": "^2.0"
"phpstan/phpstan-phpunit": "^2.0",
"phpunit/phpunit": "^10.5.45",
"roave/security-advisories": "dev-master",
"swoole/ide-helper": "~5.0.0"
},
"autoload": {
"psr-4": {
Expand Down
12 changes: 6 additions & 6 deletions config/autoload/local.php.dist
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

declare(strict_types=1);

$baseUrl = 'http://queue.dotkernel.net';
$baseUrl = 'https://queue.dotkernel.net';

$databases = [
'default' => [
Expand Down Expand Up @@ -43,13 +43,13 @@ return [
'protocol' => 'tcp',
'host' => 'localhost',
'port' => '8556',
'eof' => "\n",
'eof' => PHP_EOL,
],
],
//delay time until the message is added back to the queue if an error occurs during processing
'fail-safe' => [
'first_retry' => 3600000, // 1h
'second_retry' => 43200000, // 12h
'third_retry' => 86400000, // 24h
],
'first_retry' => 3600000, // 1h
'second_retry' => 43200000, // 12h
'third_retry' => 86400000, // 24h
],
];
10 changes: 6 additions & 4 deletions config/autoload/log.local.php.dist
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?php

declare(strict_types=1);

use Dot\Log\Formatter\Json;
use Dot\Log\Logger;

Expand All @@ -9,17 +11,17 @@ return [
'queue-log' => [
'writers' => [
'FileWriter' => [
'name' => 'stream',
'level' => \Dot\Log\Logger::ALERT, // this is equal to 1
'name' => 'stream',
'level' => Logger::ALERT,
'options' => [
'stream' => __DIR__ . '/../../log/queue-log.log',
'stream' => __DIR__ . '/../../log/queue-log.log',
'formatter' => [
'name' => Json::class,
],
],
],
],
]
],
],
],
];
35 changes: 19 additions & 16 deletions config/autoload/messenger.local.php.dist
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
<?php

declare(strict_types=1);

use Netglue\PsrContainer\Messenger\Container\TransportFactory;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as SymfonySerializer;

return [
"symfony" => [
"messenger" => [
"transports" => [
"redis_transport" => [
'dsn' => 'redis://127.0.0.1:6379/messages',
'options' => [], // Redis specific options
'symfony' => [
'messenger' => [
'transports' => [
'redis_transport' => [
'dsn' => 'redis://127.0.0.1:6379/messages',
'options' => [], // Redis specific options
'serializer' => SymfonySerializer::class,
]
]
]
],
],
],
],
'dependencies' => [
'factories' => [
'redis_transport' => [TransportFactory::class, 'redis_transport'],
SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(),
],
],
"dependencies" => [
"factories" => [
"redis_transport" => [TransportFactory::class, 'redis_transport'],
SymfonySerializer::class => fn(\Psr\Container\ContainerInterface $container) => new PhpSerializer()
]
]
];
];
15 changes: 8 additions & 7 deletions config/autoload/swoole.local.php.dist
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
<?php

declare(strict_types=1);

return [
'dotkernel-queue-swoole' => [
// Available in Swoole 4.1 and up; enables coroutine support
// for most I/O operations:
// Available in Swoole 4.1 and up; enables coroutine support for most I/O operations:
'enable_coroutine' => true,

// Configure Swoole TCP Server:
Expand All @@ -13,17 +14,17 @@ return [
'mode' => SWOOLE_BASE, // SWOOLE_BASE or SWOOLE_PROCESS;
// SWOOLE_BASE is the default
'protocol' => SWOOLE_SOCK_TCP, // SWOOLE_SSL, // SSL-enable the server
'options' => [
'options' => [
// Set the SSL certificate and key paths for SSL support:
//'ssl_cert_file' => 'path/to/ssl.crt',
//'ssl_key_file' => 'path/to/ssl.key',
// Whether or not the HTTP server should use coroutines;
// Whether the HTTP server should use coroutines;
// enabled by default, and generally should not be disabled:
'package_eof' => "\n",
'open_eof_check' => true,
'package_eof' => PHP_EOL,
'open_eof_check' => true,
'open_length_check' => true,

// in order to run swoole as daemon
// to run swoole as a daemon
'daemonize' => true,

// Overwrite the default location of the pid file;
Expand Down
2 changes: 1 addition & 1 deletion daemon/messenger.service
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ WorkingDirectory=/home/dotkernel/queue/
ExecStart=/usr/bin/php /home/dotkernel/queue/bin/cli.php messenger:start

[Install]
WantedBy=swoole.service
WantedBy=swoole.service
2 changes: 1 addition & 1 deletion daemon/swoole.service
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ WorkingDirectory=/home/dotkernel/queue/
ExecStart=/usr/bin/php /home/dotkernel/queue/bin/cli.php swoole:start

[Install]
WantedBy=multi-user.target
WantedBy=multi-user.target
2 changes: 1 addition & 1 deletion phpcs.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

<!-- Paths to check -->
<file>config</file>
<file>public</file>
<file>src</file>
<file>test</file>
<exclude-pattern>config/config.php</exclude-pattern>
<exclude-pattern>config/routes.php</exclude-pattern>

<!-- Include all rules from the Laminas Coding Standard -->
<rule ref="LaminasCodingStandard">
Expand Down
2 changes: 2 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ includes:
parameters:
level: 5
paths:
- config
- public
- src
- test
treatPhpDocTypesAsCertain: false
16 changes: 9 additions & 7 deletions public/index.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

declare(strict_types=1);

use Mezzio\Application;
use Mezzio\MiddlewareFactory;
use Psr\Container\ContainerInterface;

// Delegate static file requests back to the PHP built-in webserver
if (PHP_SAPI === 'cli-server' && $_SERVER['SCRIPT_FILENAME'] !== __FILE__) {
return false;
Expand All @@ -14,17 +18,15 @@
* Self-called anonymous function that creates its own scope and keeps the global namespace clean.
*/
(function () {
/** @var \Psr\Container\ContainerInterface $container */
/** @var ContainerInterface $container */
$container = require 'config/container.php';

/** @var \Mezzio\Application $app */
$app = $container->get(\Mezzio\Application::class);
$factory = $container->get(\Mezzio\MiddlewareFactory::class);
/** @var Application $app */
$app = $container->get(Application::class);
$factory = $container->get(MiddlewareFactory::class);

// Execute programmatic/declarative middleware pipeline and routing
// configuration statements
// Execute programmatic/declarative middleware pipeline and routing configuration statements
(require 'config/pipeline.php')($app, $factory, $container);
(require 'config/routes.php')($app, $factory, $container);

$app->run();
})();
28 changes: 14 additions & 14 deletions src/App/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ConfigProvider
public function __invoke(): array
{
return [
"dependencies" => $this->getDependencies(),
'dependencies' => $this->getDependencies(),
'symfony' => [
'messenger' => [
'buses' => $this->busConfig(),
Expand All @@ -31,23 +31,23 @@ public function __invoke(): array
private function getDependencies(): array
{
return [
"factories" => [
"message_bus" => [MessageBusStaticFactory::class, "message_bus"],
"message_bus_stamp_middleware" => [BusNameStampMiddlewareStaticFactory::class, "message_bus"],
"message_bus_sender_middleware" => [MessageSenderMiddlewareStaticFactory::class, "message_bus"],
"message_bus_handler_middleware" => [MessageHandlerMiddlewareStaticFactory::class, "message_bus"],
'factories' => [
'message_bus' => [MessageBusStaticFactory::class, 'message_bus'],
'message_bus_stamp_middleware' => [BusNameStampMiddlewareStaticFactory::class, 'message_bus'],
'message_bus_sender_middleware' => [MessageSenderMiddlewareStaticFactory::class, 'message_bus'],
'message_bus_handler_middleware' => [MessageHandlerMiddlewareStaticFactory::class, 'message_bus'],
MessageHandler::class => AttributedServiceFactory::class,
],
"aliases" => [
MessageBusInterface::class => "message_bus",
'aliases' => [
MessageBusInterface::class => 'message_bus',
],
];
}

private function busConfig(): array
{
return [
"message_bus" => [
'message_bus' => [
// Means that it's an error if no handlers are defined for a given message
'allows_zero_handlers' => false,

Expand All @@ -58,7 +58,7 @@ private function busConfig(): array
*/
'middleware' => [
// … Middleware that inspects the message before it has been sent to a transport would go here.
"message_bus_stamp_middleware",
'message_bus_stamp_middleware',
'message_bus_sender_middleware', // Sends messages via a transport if configured.
'message_bus_handler_middleware', // Executes the handlers configured for the message
],
Expand All @@ -78,16 +78,16 @@ private function busConfig(): array
* Routes define which transport(s) that messages dispatched on this bus should be sent with.
*
* The * wildcard applies to all messages.
* The transport for each route must be an array of one or more transport identifiers. Each transport
* is retrieved from the DI container by this value.
* The transport for each route must be an array of one or more transport identifiers.
* This value retrieves each transport from the DI container.
*
* An empty routes definition would mean that messages would be handled immediately and synchronously,
* i.e. no transport would be used.
* i.e., no transport would be used.
*
* Route specific messages to specific transports by using the message name as the key.
*/
'routes' => [
Message::class => ["redis_transport"],
Message::class => ['redis_transport'],
],
],
];
Expand Down
15 changes: 10 additions & 5 deletions src/App/Message/MessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

use Dot\DependencyInjection\Attribute\Inject;
use Dot\Log\Logger;
use Exception;
use Symfony\Component\Messenger\Exception\ExceptionInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Throwable;

class MessageHandler
{
Expand All @@ -24,6 +26,9 @@ public function __construct(
) {
}

/**
* @throws ExceptionInterface
*/
public function __invoke(Message $message): void
{
$payload = $message->getPayload();
Expand All @@ -35,9 +40,9 @@ public function __invoke(Message $message): void
if ($payload['foo'] === 'control') {
$this->logger->info($payload['foo'] . ': was processed successfully');
} else {
throw new \Exception("Failed to execute");
throw new Exception('Failed to execute');
}
} catch (\Throwable $exception) {
} catch (Throwable $exception) {
$this->logger->error($payload['foo'] . ' failed with message: '
. $exception->getMessage() . ' after ' . ($payload['retry'] ?? 0) . ' retries');
$this->retry($payload);
Expand All @@ -50,21 +55,21 @@ public function __invoke(Message $message): void
public function retry(array $payload): void
{
if (! isset($payload['retry'])) {
$this->bus->dispatch(new Message(["foo" => $payload['foo'], 'retry' => 1]), [
$this->bus->dispatch(new Message(['foo' => $payload['foo'], 'retry' => 1]), [
new DelayStamp($this->config['fail-safe']['first_retry']),
]);
} else {
$retry = $payload['retry'];
switch ($retry) {
case 1:
$delay = $this->config['fail-safe']['second_retry'];
$this->bus->dispatch(new Message(["foo" => $payload['foo'], 'retry' => ++$retry]), [
$this->bus->dispatch(new Message(['foo' => $payload['foo'], 'retry' => ++$retry]), [
new DelayStamp($delay),
]);
break;
case 2:
$delay = $this->config['fail-safe']['third_retry'];
$this->bus->dispatch(new Message(["foo" => $payload['foo'], 'retry' => ++$retry]), [
$this->bus->dispatch(new Message(['foo' => $payload['foo'], 'retry' => ++$retry]), [
new DelayStamp($delay),
]);
break;
Expand Down
Loading
Loading