Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Messenger/Kernel/CommandBusDependencies.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ enum CommandBusDependencies: string {
case Serializer = self::class.'::Serializer';
case Worker = self::class.'::Worker';
case SupervisorConfigDir = self::class.'::SupervisorConfigDir';
case SendersLocator = self::class.'::SendersLocator';
case ReceiversLocator = self::class.'::ReceiversLocator';
}
57 changes: 46 additions & 11 deletions src/Messenger/Kernel/MessengerServiceFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use WonderNetwork\SlimKernel\Messenger\CommandBus;
use WonderNetwork\SlimKernel\Messenger\QueryBus;
use WonderNetwork\SlimKernel\ServiceFactory;
use WonderNetwork\SlimKernel\ServiceOfExpectedType;
use WonderNetwork\SlimKernel\ServicesBuilder;
use WonderNetwork\SlimKernel\Supervisor\GenerateSupervisorConfigCommand;
use WonderNetwork\SlimKernel\Supervisor\SupervisorConfiguration;
Expand Down Expand Up @@ -78,10 +79,25 @@ public function __invoke(ServicesBuilder $builder): iterable {

// region senders
yield TransportLocatorBuilder::class => $this->transports ?? TransportLocatorBuilder::empty();
yield SendersLocator::class => function (TransportLocatorBuilder $config, ContainerInterface $container) {
yield CommandBusDependencies::SendersLocator->value => fn (
TransportLocatorBuilder $config,
ContainerInterface $container,
) => $config->sendersLocator($container);
yield CommandBusDependencies::ReceiversLocator->value => fn (
TransportLocatorBuilder $config,
ContainerInterface $container,
) => $config->receiversLocator($container);

yield SendersLocator::class => function (ContainerInterface $container) {
$sendersLocator = ServiceOfExpectedType::getFromContainer(
container: $container,
key: CommandBusDependencies::SendersLocator->value,
expectedType: ContainerInterface::class,
);

return new SendersLocator(
sendersMap: [],
sendersLocator: $config->sendersLocator($container),
sendersLocator: $sendersLocator,
);
};

Expand All @@ -101,12 +117,26 @@ public function __invoke(ServicesBuilder $builder): iterable {
yield QueryBus::class => autowire();

yield ConsumeMessagesCommand::class => function (TransportLocatorBuilder $config, ContainerInterface $container) {
/** @var LoggerInterface $logger */
$logger = $container->get(CommandBusDependencies::Logger->value);
/** @var CacheItemPoolInterface $pool */
$pool = $container->get(CommandBusDependencies::CachePool->value);
/** @var EventDispatcher $eventDispatcher */
$eventDispatcher = $container->get(CommandBusDependencies::EventDispatcher->value);
$logger = ServiceOfExpectedType::getFromContainer(
container: $container,
key: CommandBusDependencies::Logger->value,
expectedType: LoggerInterface::class,
);
$pool = ServiceOfExpectedType::getFromContainer(
container: $container,
key: CommandBusDependencies::CachePool->value,
expectedType: CacheItemPoolInterface::class,
);
$receiversLocator = ServiceOfExpectedType::getFromContainer(
container: $container,
key: CommandBusDependencies::ReceiversLocator->value,
expectedType: ContainerInterface::class,
);
$eventDispatcher = ServiceOfExpectedType::getFromContainer(
container: $container,
key: CommandBusDependencies::EventDispatcher->value,
expectedType: EventDispatcher::class,
);
$eventDispatcher->addSubscriber(
new StopWorkerOnRestartSignalListener(
cachePool: $pool,
Expand All @@ -121,9 +151,11 @@ public function __invoke(ServicesBuilder $builder): iterable {
busLocator: new Container(),
fallbackBus: $container->get(MessageBusInterface::class),
),
receiverLocator: $config->receiversLocator($container),
receiverLocator: $receiversLocator,
eventDispatcher: $eventDispatcher,
logger: $logger,
// if it works, it works.
// if we override the receivers locator, then we’re out of luck
receiverNames: array_keys($config->receivers),
);
};
Expand All @@ -140,8 +172,11 @@ public function __invoke(ServicesBuilder $builder): iterable {

yield CommandBusDependencies::Worker->value => function (ContainerInterface $container) {
$app = new Application('worker');
/** @var EventDispatcher $eventDispatcher */
$eventDispatcher = $container->get(CommandBusDependencies::EventDispatcher->value);
$eventDispatcher = ServiceOfExpectedType::getFromContainer(
container: $container,
key: CommandBusDependencies::EventDispatcher->value,
expectedType: EventDispatcher::class,
);

$app->setDispatcher($eventDispatcher);
$app->addCommands(
Expand Down
36 changes: 36 additions & 0 deletions src/ServiceOfExpectedType.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace WonderNetwork\SlimKernel;

use DI\DependencyException;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;

final readonly class ServiceOfExpectedType {
/**
* @template T of object
* @param class-string<T> $expectedType
* @return T
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public static function getFromContainer(ContainerInterface $container, string $key, string $expectedType): mixed {
$actual = $container->get($key);

if (false === $actual instanceof $expectedType) {
throw new DependencyException(
sprintf(
'Service "%s" is expected to be of type "%s", %s given',
$key,
$expectedType,
get_debug_type($actual),
),
);
}

return $actual;
}
}
57 changes: 52 additions & 5 deletions tests/Messenger/MessengerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,36 @@

use Acme\Sample\SideEffectsCommand;
use Acme\Sample\StateQuery;
use DI\Container;
use PHPUnit\Framework\TestCase;
use RuntimeException;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Output\BufferedOutput;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
use WonderNetwork\SlimKernel\KernelBuilder;
use WonderNetwork\SlimKernel\Messenger\Kernel\CommandBusDependencies;
use WonderNetwork\SlimKernel\Messenger\Kernel\MessengerServiceFactory;
use WonderNetwork\SlimKernel\Messenger\Kernel\TransportLocatorBuilder;

final class MessengerTest extends TestCase {
private KernelBuilder $kernelBuilder;

protected function setUp(): void {
if (file_exists($filename = __DIR__.'/../../.cache/CompiledContainer.php')) {
unlink($filename);
}

$this->kernelBuilder = KernelBuilder::start(
realpath(__DIR__.'/../Resources/Messenger')
?: throw new RuntimeException('Oops'),
);
}

public function testMessenger(): void {
$transportName = 'in-memory';

$root = realpath(__DIR__.'/../Resources/Messenger')
?: throw new RuntimeException('Oops');
$container = KernelBuilder::start($root)
$container = $this->kernelBuilder
->useCache(__DIR__.'/../../.cache/')
->register(
new MessengerServiceFactory(
Expand Down Expand Up @@ -68,8 +75,7 @@ public function testMessenger(): void {
}

public function testHandlersCanDependOnCommandBus(): void {
$root = realpath(__DIR__.'/../Resources/Messenger') ?: throw new RuntimeException('Oops');
$container = KernelBuilder::start($root)
$container = $this->kernelBuilder
->register(
new MessengerServiceFactory(
commandPath: 'src/Requeue/*Handler.php',
Expand All @@ -81,4 +87,45 @@ public function testHandlersCanDependOnCommandBus(): void {
$this->expectNotToPerformAssertions();
$container->get(CommandBus::class);
}

public function testCustomTransports(): void {
$transportName = 'default';

$defaultTransport = new InMemoryTransport();
$customTransport = new InMemoryTransport();

$container = $this->kernelBuilder
->register(
new MessengerServiceFactory(
commandPath: 'src/Sample/*AsyncHandler.php',
queryPath: 'src/Sample/*QueryHandler.php',
transports: TransportLocatorBuilder::start()
->withTransport(
name: $transportName,
sender: InMemoryTransport::class,
receiver: InMemoryTransport::class,
),
),
)
->add(
[
InMemoryTransport::class => $defaultTransport,
CommandBusDependencies::SendersLocator->value => fn () => new Container(
[
$transportName => $customTransport,
],
),
],
)
->build();

/** @var CommandBus $commandBus */
$commandBus = $container->get(CommandBus::class);

$some = bin2hex(random_bytes(16));
$commandBus->queue(new SideEffectsCommand($some), $transportName);

self::assertCount(0, $defaultTransport->get());
self::assertCount(1, $customTransport->get());
}
}
Loading