Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ $data = $this->get('communicator')
**Important:** If you want to use GraphQL wrapper you have to install [garlicservices/graphql-bundle](https://github.com/garlicservices/graphql-bundle) on all the services you requiested in your queries.
To install bundle on application just type in console the command showed below
```bash
composer require garlic/grpahql-bundle
composer require garlic/graphql-bundle
```
#### Easy way to use GraphQl query

Expand Down Expand Up @@ -269,6 +269,14 @@ $apartmentQuery
$result = $graphQLService->fetch();
```

### Event dispatching
Mass event dispatching with aTopic name for all working daemons, and array is a payload to work with.
```
$this->get(CommunicatorService::class)
->event('eventTopicName', ['some' => 'value']);
```


You can use stitching with query and mutation and vise-versa. Even several mutation can be stitched to one another.

## Enjoy
21 changes: 21 additions & 0 deletions Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ services:
tags:
- { name: 'enqueue.client.processor'}

app.servicediscovery.processor:
class: Garlic\Bus\Service\Processor\ServiceDiscoveryProcessor
public: true
arguments:
- "@service.request"
- "@service.response"
- "@router"
- "@kernel"
tags:
- { name: 'enqueue.client.processor', topicName: 'serviceDiscovery'}

service.request.producer:
class: Garlic\Bus\Service\Producer\RequestProducer
arguments:
Expand All @@ -44,6 +55,15 @@ services:
- "%env(SERVICE_NAME)%"
- "%env(SERVICE_NAMESPACE)%"

service.event.producer:
class: Garlic\Bus\Service\Producer\EventProducer
arguments:
- "@enqueue.transport.context"
- '@enqueue.transport.rpc_factory'
- "@service.response"
- "%env(SERVICE_NAME)%"
- "%env(SERVICE_NAMESPACE)%"

service.request:
class: Garlic\Bus\Service\Request\RequestService
arguments:
Expand All @@ -58,6 +78,7 @@ services:
class: Garlic\Bus\Service\CommunicatorService
public: true
arguments:
- "@service.event.producer"
- "@service.request.producer"
- "@service.command.producer"
- "@service.request"
Expand Down
27 changes: 27 additions & 0 deletions Service/Abstracts/RpcProducerAbstract.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
use Enqueue\Rpc\RpcFactory;
use Garlic\Bus\Service\Request\ResponseService;
use Enqueue\Util\UUID;
use Interop\Amqp\Impl\AmqpMessage;
use Interop\Amqp\Impl\AmqpTopic;
use Interop\Queue\PsrContext;
use Symfony\Component\DependencyInjection\Container;

abstract class RpcProducerAbstract extends ProducerAbstract
{
Expand Down Expand Up @@ -89,4 +92,28 @@ public function sendCommand(string $service, $message, bool $reply = true)

return false;
}

/**
* Send multicast event
*
* @param $name
* @param AmqpMessage $message
* @throws \Interop\Queue\Exception
* @throws \Interop\Queue\InvalidDestinationException
* @throws \Interop\Queue\InvalidMessageException
*/
public function sendEvent($name, $message)
{
if (false == $message instanceof Message) {
$message = $this->context->createMessage($message);
}

$message->setProperty(Config::PARAMETER_TOPIC_NAME, $name);

$topic = $this->context->createTopic('enqueue.default');
$topic->setType(AmqpTopic::TYPE_FANOUT);
$topic->addFlag(AmqpTopic::FLAG_DURABLE);

$this->context->createProducer()->send($topic, $message);
}
}
31 changes: 29 additions & 2 deletions Service/CommunicatorService.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Garlic\Bus\Service\Interfaces\CommunicatorServiceInterface;
use Garlic\Bus\Service\Interfaces\ProducerInterface;
use Garlic\Bus\Service\Producer\CommandProducer;
use Garlic\Bus\Service\Producer\EventProducer;
use Garlic\Bus\Service\Producer\RequestProducer;
use Garlic\Bus\Service\Request\RequestService;
use Symfony\Component\HttpFoundation\RequestStack;
Expand Down Expand Up @@ -34,21 +35,32 @@ class CommunicatorService implements CommunicatorServiceInterface
/** @var CommandProducer */
private $commandProducer;

/** @var EventProducer */
private $eventProducer;

/**
* CommunicatorService constructor.
* @param EventProducer $eventProducer
* @param RequestProducer $requestProducer
* @param CommandProducer $commandProducer
* @param RequestService $request
* @param RequestStack $requestStack
* @param $namespace
*/
public function __construct(RequestProducer $requestProducer, CommandProducer $commandProducer, RequestService $request, RequestStack $requestStack, $namespace)
{
public function __construct(
EventProducer $eventProducer,
RequestProducer $requestProducer,
CommandProducer $commandProducer,
RequestService $request,
RequestStack $requestStack,
$namespace
) {
$this->requestProducer = $requestProducer;
$this->requestService = $request;
$this->requestStack = $requestStack;
$this->namespace = $namespace;
$this->commandProducer = $commandProducer;
$this->eventProducer = $eventProducer;
}

/**
Expand Down Expand Up @@ -82,6 +94,21 @@ public function request($service)
return $this;
}

/**
* Create event producer to the service
*
* @param $eventName
* @return $this
*/
public function event($eventName, array $payload)
{
$this->producer = $this->eventProducer->setTargetServiceName($eventName);

$this->send($eventName, $payload);

return $this;
}

/**
* Create request producer to the service
*
Expand Down
123 changes: 123 additions & 0 deletions Service/Processor/ServiceDiscoveryProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
<?php

namespace Garlic\Bus\Service\Processor;

use Enqueue\Client\TopicSubscriberInterface;
use Garlic\Bus\Service\Abstracts\ProcessorConfigAbstract;
use Garlic\Bus\Service\CommunicatorService;
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpKernel\HttpKernelInterface;

class ServiceDiscoveryProcessor extends ProcessorConfigAbstract implements PsrProcessor, TopicSubscriberInterface
{
public static function getSubscribedTopics()
{
return ['serviceDiscovery']; //, 'anotherTopic' and any other
}

/**
* {@inheritdoc}
* @todo:: implement
*/
public function process(PsrMessage $message, PsrContext $context)
{
$container = $this->kernel->getContainer();

$response = $container->get('http_kernel')
->handle(Request::create('/graphql', 'POST', ['query' => $this->getIntrospectionQuery()]), HttpKernelInterface::MASTER_REQUEST);

$container->get(CommunicatorService::class)
->command('gateway')
->post()
->serviceRebuildSchema(['data' => $response->getContent()]);

return self::ACK;
}

protected function getIntrospectionQuery()
{
return '
query IntrospectionQuery {
__schema {
queryType { name }
mutationType { name }
subscriptionType { name }
types {
...FullType
}
directives {
name
description
args {
...InputValue
}
onOperation
onFragment
onField
}
}
}

fragment FullType on __Type {
kind
name
description
fields(includeDeprecated: true) {
name
description
args {
...InputValue
}
type {
...TypeRef
}
isDeprecated
deprecationReason
}
inputFields {
...InputValue
}
interfaces {
...TypeRef
}
enumValues(includeDeprecated: true) {
name
description
isDeprecated
deprecationReason
}
possibleTypes {
...TypeRef
}
}

fragment InputValue on __InputValue {
name
description
type { ...TypeRef }
defaultValue
}

fragment TypeRef on __Type {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
}
}
}
}
';
}
}
42 changes: 42 additions & 0 deletions Service/Producer/EventProducer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

namespace Garlic\Bus\Service\Producer;

use Garlic\Bus\Entity\Response;
use Garlic\Bus\Service\Abstracts\RpcProducerAbstract;
use Garlic\Bus\Service\Interfaces\ProducerInterface;

/**
* Class RequestProducer
* @package GarlicBusBundle\Producer\RequestProducer
*/
class EventProducer extends RpcProducerAbstract implements ProducerInterface
{
/**
* Type of message
*/
public static $type = 'event';

/**
* Sent request and get response from a service
*
* @param $message
* @return object
*/
public function send($message)
{
try {
$code = 200;
$this->sendEvent($this->targetServiceName, $message);
$message = 'Event has been sent';
} catch (\Exception $exception) {
$message = $exception->getMessage();
$code = $exception->getCode();
}

return new Response(
json_encode(['message' => $message]),
(!empty($code)) ? $code : 500
);
}
}