From 61dabcdf7c26d408039acd72f8627ffc08ebaf01 Mon Sep 17 00:00:00 2001 From: "m.halaiev" Date: Mon, 3 Dec 2018 15:09:26 +0200 Subject: [PATCH 1/2] * bus events --- README.md | 8 ++++ Resources/config/services.yml | 27 ++++++------ Service/Abstracts/RpcProducerAbstract.php | 27 ++++++++++++ Service/CommunicatorService.php | 31 +++++++++++++- .../Processor/ServiceDiscoveryProcessor.php | 27 ++++++++++++ Service/Producer/EventProducer.php | 42 +++++++++++++++++++ 6 files changed, 146 insertions(+), 16 deletions(-) create mode 100755 Service/Processor/ServiceDiscoveryProcessor.php create mode 100755 Service/Producer/EventProducer.php diff --git a/README.md b/README.md index cbe14e3..707fea9 100755 --- a/README.md +++ b/README.md @@ -269,6 +269,14 @@ $apartmentQuery $result = $graphQLService->fetch(); ``` +### Event dispatching +Mass event dispatching with aTopic name for all working daemons, and array is a payload to work with. +``` +$this->get(CommunicatorService::class) + ->event('eventTopicName', ['some' => 'value']); +``` + + You can use stitching with query and mutation and vise-versa. Even several mutation can be stitched to one another. ## Enjoy \ No newline at end of file diff --git a/Resources/config/services.yml b/Resources/config/services.yml index 04c861d..422c7d8 100755 --- a/Resources/config/services.yml +++ b/Resources/config/services.yml @@ -4,8 +4,8 @@ services: autoconfigure: true public: false - service.request.processor: - class: Garlic\Bus\Service\Processor\RequestProcessor + app.servicediscovery.processor: + class: Garlic\Bus\Service\Processor\ServiceDiscoveryProcessor public: true arguments: - "@service.request" @@ -13,18 +13,7 @@ services: - "@router" - "@kernel" tags: - - { name: 'enqueue.client.processor'} - - service.command.processor: - class: Garlic\Bus\Service\Processor\CommandProcessor - public: true - arguments: - - "@service.request" - - "@service.response" - - "@router" - - "@kernel" - tags: - - { name: 'enqueue.client.processor'} + - { name: 'enqueue.client.processor', topicName: 'serviceDiscovery'} service.request.producer: class: Garlic\Bus\Service\Producer\RequestProducer @@ -44,6 +33,15 @@ services: - "%env(SERVICE_NAME)%" - "%env(SERVICE_NAMESPACE)%" + service.event.producer: + class: Garlic\Bus\Service\Producer\EventProducer + arguments: + - "@enqueue.transport.context" + - '@enqueue.transport.rpc_factory' + - "@service.response" + - "%env(SERVICE_NAME)%" + - "%env(SERVICE_NAMESPACE)%" + service.request: class: Garlic\Bus\Service\Request\RequestService arguments: @@ -58,6 +56,7 @@ services: class: Garlic\Bus\Service\CommunicatorService public: true arguments: + - "@service.event.producer" - "@service.request.producer" - "@service.command.producer" - "@service.request" diff --git a/Service/Abstracts/RpcProducerAbstract.php b/Service/Abstracts/RpcProducerAbstract.php index 60199b8..cd6ac53 100755 --- a/Service/Abstracts/RpcProducerAbstract.php +++ b/Service/Abstracts/RpcProducerAbstract.php @@ -13,7 +13,10 @@ use Enqueue\Rpc\RpcFactory; use Garlic\Bus\Service\Request\ResponseService; use Enqueue\Util\UUID; +use Interop\Amqp\Impl\AmqpMessage; +use Interop\Amqp\Impl\AmqpTopic; use Interop\Queue\PsrContext; +use Symfony\Component\DependencyInjection\Container; abstract class RpcProducerAbstract extends ProducerAbstract { @@ -89,4 +92,28 @@ public function sendCommand(string $service, $message, bool $reply = true) return false; } + + /** + * Send multicast event + * + * @param $name + * @param AmqpMessage $message + * @throws \Interop\Queue\Exception + * @throws \Interop\Queue\InvalidDestinationException + * @throws \Interop\Queue\InvalidMessageException + */ + public function sendEvent($name, $message) + { + if (false == $message instanceof Message) { + $message = $this->context->createMessage($message); + } + + $message->setProperty(Config::PARAMETER_TOPIC_NAME, $name); + + $topic = $this->context->createTopic('enqueue.default'); + $topic->setType(AmqpTopic::TYPE_FANOUT); + $topic->addFlag(AmqpTopic::FLAG_DURABLE); + + $this->context->createProducer()->send($topic, $message); + } } diff --git a/Service/CommunicatorService.php b/Service/CommunicatorService.php index 6528cfa..1752d89 100755 --- a/Service/CommunicatorService.php +++ b/Service/CommunicatorService.php @@ -5,6 +5,7 @@ use Garlic\Bus\Service\Interfaces\CommunicatorServiceInterface; use Garlic\Bus\Service\Interfaces\ProducerInterface; use Garlic\Bus\Service\Producer\CommandProducer; +use Garlic\Bus\Service\Producer\EventProducer; use Garlic\Bus\Service\Producer\RequestProducer; use Garlic\Bus\Service\Request\RequestService; use Symfony\Component\HttpFoundation\RequestStack; @@ -34,21 +35,32 @@ class CommunicatorService implements CommunicatorServiceInterface /** @var CommandProducer */ private $commandProducer; + /** @var EventProducer */ + private $eventProducer; + /** * CommunicatorService constructor. + * @param EventProducer $eventProducer * @param RequestProducer $requestProducer * @param CommandProducer $commandProducer * @param RequestService $request * @param RequestStack $requestStack * @param $namespace */ - public function __construct(RequestProducer $requestProducer, CommandProducer $commandProducer, RequestService $request, RequestStack $requestStack, $namespace) - { + public function __construct( + EventProducer $eventProducer, + RequestProducer $requestProducer, + CommandProducer $commandProducer, + RequestService $request, + RequestStack $requestStack, + $namespace + ) { $this->requestProducer = $requestProducer; $this->requestService = $request; $this->requestStack = $requestStack; $this->namespace = $namespace; $this->commandProducer = $commandProducer; + $this->eventProducer = $eventProducer; } /** @@ -82,6 +94,21 @@ public function request($service) return $this; } + /** + * Create event producer to the service + * + * @param $eventName + * @return $this + */ + public function event($eventName, array $payload) + { + $this->producer = $this->eventProducer->setTargetServiceName($eventName); + + $this->send($eventName, $payload); + + return $this; + } + /** * Create request producer to the service * diff --git a/Service/Processor/ServiceDiscoveryProcessor.php b/Service/Processor/ServiceDiscoveryProcessor.php new file mode 100755 index 0000000..e4a5418 --- /dev/null +++ b/Service/Processor/ServiceDiscoveryProcessor.php @@ -0,0 +1,27 @@ +process'); + + return self::ACK; + } +} diff --git a/Service/Producer/EventProducer.php b/Service/Producer/EventProducer.php new file mode 100755 index 0000000..c6944c5 --- /dev/null +++ b/Service/Producer/EventProducer.php @@ -0,0 +1,42 @@ +sendEvent($this->targetServiceName, $message); + $message = 'Event has been sent'; + } catch (\Exception $exception) { + $message = $exception->getMessage(); + $code = $exception->getCode(); + } + + return new Response( + json_encode(['message' => $message]), + (!empty($code)) ? $code : 500 + ); + } +} \ No newline at end of file From 46edcdd661b485c37e55de3402a7d389867ee63b Mon Sep 17 00:00:00 2001 From: "m.halaiev" Date: Mon, 10 Dec 2018 17:43:15 +0200 Subject: [PATCH 2/2] * service discovery --- README.md | 2 +- Resources/config/services.yml | 22 ++++ .../Processor/ServiceDiscoveryProcessor.php | 100 +++++++++++++++++- 3 files changed, 121 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 707fea9..1f513db 100755 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ $data = $this->get('communicator') **Important:** If you want to use GraphQL wrapper you have to install [garlicservices/graphql-bundle](https://github.com/garlicservices/graphql-bundle) on all the services you requiested in your queries. To install bundle on application just type in console the command showed below ```bash -composer require garlic/grpahql-bundle +composer require garlic/graphql-bundle ``` #### Easy way to use GraphQl query diff --git a/Resources/config/services.yml b/Resources/config/services.yml index 422c7d8..b7e5d08 100755 --- a/Resources/config/services.yml +++ b/Resources/config/services.yml @@ -4,6 +4,28 @@ services: autoconfigure: true public: false + service.request.processor: + class: Garlic\Bus\Service\Processor\RequestProcessor + public: true + arguments: + - "@service.request" + - "@service.response" + - "@router" + - "@kernel" + tags: + - { name: 'enqueue.client.processor'} + + service.command.processor: + class: Garlic\Bus\Service\Processor\CommandProcessor + public: true + arguments: + - "@service.request" + - "@service.response" + - "@router" + - "@kernel" + tags: + - { name: 'enqueue.client.processor'} + app.servicediscovery.processor: class: Garlic\Bus\Service\Processor\ServiceDiscoveryProcessor public: true diff --git a/Service/Processor/ServiceDiscoveryProcessor.php b/Service/Processor/ServiceDiscoveryProcessor.php index e4a5418..5a218f4 100755 --- a/Service/Processor/ServiceDiscoveryProcessor.php +++ b/Service/Processor/ServiceDiscoveryProcessor.php @@ -3,11 +3,16 @@ namespace Garlic\Bus\Service\Processor; use Enqueue\Client\TopicSubscriberInterface; +use Garlic\Bus\Service\Abstracts\ProcessorConfigAbstract; +use Garlic\Bus\Service\CommunicatorService; use Interop\Queue\PsrProcessor; use Interop\Queue\PsrContext; use Interop\Queue\PsrMessage; +use Symfony\Component\HttpFoundation\JsonResponse; +use Symfony\Component\HttpFoundation\Request; +use Symfony\Component\HttpKernel\HttpKernelInterface; -class ServiceDiscoveryProcessor implements PsrProcessor, TopicSubscriberInterface +class ServiceDiscoveryProcessor extends ProcessorConfigAbstract implements PsrProcessor, TopicSubscriberInterface { public static function getSubscribedTopics() { @@ -20,8 +25,99 @@ public static function getSubscribedTopics() */ public function process(PsrMessage $message, PsrContext $context) { - var_dump('ServiceDiscoveryProcessor->process'); + $container = $this->kernel->getContainer(); + + $response = $container->get('http_kernel') + ->handle(Request::create('/graphql', 'POST', ['query' => $this->getIntrospectionQuery()]), HttpKernelInterface::MASTER_REQUEST); + + $container->get(CommunicatorService::class) + ->command('gateway') + ->post() + ->serviceRebuildSchema(['data' => $response->getContent()]); return self::ACK; } + + protected function getIntrospectionQuery() + { + return ' + query IntrospectionQuery { + __schema { + queryType { name } + mutationType { name } + subscriptionType { name } + types { + ...FullType + } + directives { + name + description + args { + ...InputValue + } + onOperation + onFragment + onField + } + } + } + + fragment FullType on __Type { + kind + name + description + fields(includeDeprecated: true) { + name + description + args { + ...InputValue + } + type { + ...TypeRef + } + isDeprecated + deprecationReason + } + inputFields { + ...InputValue + } + interfaces { + ...TypeRef + } + enumValues(includeDeprecated: true) { + name + description + isDeprecated + deprecationReason + } + possibleTypes { + ...TypeRef + } + } + + fragment InputValue on __InputValue { + name + description + type { ...TypeRef } + defaultValue + } + + fragment TypeRef on __Type { + kind + name + ofType { + kind + name + ofType { + kind + name + ofType { + kind + name + } + } + } +} + '; + } }