Skip to content

Commit

Permalink
[client] Improve client extension.
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Aug 27, 2018
1 parent 0560f5a commit eee2902
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 52 deletions.
42 changes: 32 additions & 10 deletions pkg/enqueue/Client/ChainExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,45 @@ public function __construct(array $extensions)
$this->extensions = $extensions;
}

/**
* {@inheritdoc}
*/
public function onPreSend($topic, Message $message)
public function onPrepareEventBody(PrepareBody $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPreSend($topic, $message);
$extension->onPrepareEventBody($event);
}
}

/**
* {@inheritdoc}
*/
public function onPostSend($topic, Message $message)
public function onPrepareCommandBody(PrepareBody $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPreSendCommand($event);
}
}

public function onPreSendEvent(PreSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPreSendEvent($event);
}
}

public function onPreSendCommand(PreSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPreSendCommand($event);
}
}

public function onPostSendEvent(PostSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPostSendEvent($event);
}
}

public function onPostSendCommand(PostSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPostSend($topic, $message);
$extension->onPostSendCommand($event);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
namespace Enqueue\Client\ConsumptionExtension;

use Enqueue\Client\Config;
use Enqueue\Client\EmptyExtensionTrait as ClientEmptyExtensionTrait;
use Enqueue\Client\ExtensionInterface as ClientExtensionInterface;
use Enqueue\Client\Message;
use Enqueue\Client\PreSend;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\EmptyExtensionTrait;
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;

class ExclusiveCommandExtension implements ConsumptionExtensionInterface, ClientExtensionInterface
{
use EmptyExtensionTrait;
use ConsumptionEmptyExtensionTrait, ClientEmptyExtensionTrait;

/**
* @var string[]
Expand Down Expand Up @@ -60,26 +61,14 @@ public function onPreReceived(Context $context)
}
}

/**
* {@inheritdoc}
*/
public function onPreSend($topic, Message $message)
public function onPreSendCommand(PreSend $event): void
{
if (Config::COMMAND_TOPIC != $topic) {
return;
}
$message = $event->getMessage();
$commandName = $event->getCommand();

$commandName = $message->getProperty(Config::PARAMETER_COMMAND_NAME);
if (array_key_exists($commandName, $this->processorNameToQueueNameMap)) {
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $commandName);
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$commandName]);
}
}

/**
* {@inheritdoc}
*/
public function onPostSend($topic, Message $message)
{
}
}
30 changes: 30 additions & 0 deletions pkg/enqueue/Client/EmptyExtensionTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Enqueue\Client;

trait EmptyExtensionTrait
{
public function onPrepareEventBody(PrepareBody $event): void
{
}

public function onPrepareCommandBody(PrepareBody $event): void
{
}

public function onPreSendEvent(PreSend $event): void
{
}

public function onPreSendCommand(PreSend $event): void
{
}

public function onPostSendEvent(PostSend $event): void
{
}

public function onPostSendCommand(PostSend $event): void
{
}
}
36 changes: 21 additions & 15 deletions pkg/enqueue/Client/ExtensionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@

interface ExtensionInterface
{
/**
* @param string $topic
* @param Message $message
*
* @return
*/
public function onPreSend($topic, Message $message);

/**
* @param string $topic
* @param Message $message
*
* @return
*/
public function onPostSend($topic, Message $message);
public function onPrepareEventBody(PrepareBody $event): void;

public function onPrepareCommandBody(PrepareBody $event): void;

public function onPreSendEvent(PreSend $event): void;

public function onPreSendCommand(PreSend $event): void;

public function onPostSendEvent(PostSend $event): void;

public function onPostSendCommand(PostSend $event): void;

// /**
// * @deprecated
// */
// public function onPreSend($topic, Message $message);
//
// /**
// * @deprecated
// */
// public function onPostSend($topic, Message $message);
}
33 changes: 33 additions & 0 deletions pkg/enqueue/Client/PostSend.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Enqueue\Client;

class PostSend
{
private $message;

public function __construct(Message $message)
{
$this->message = $message;
}

public function getMessage(): Message
{
return $this->message;
}

public function isEvent(): bool
{
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}

public function getCommand(): string
{
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
}

public function getTopic(): string
{
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}
}
33 changes: 33 additions & 0 deletions pkg/enqueue/Client/PreSend.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Enqueue\Client;

class PreSend
{
private $message;

public function __construct(Message $message)
{
$this->message = $message;
}

public function getMessage(): Message
{
return $this->message;
}

public function isEvent(): bool
{
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}

public function getCommand(): string
{
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
}

public function getTopic(): string
{
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}
}
47 changes: 47 additions & 0 deletions pkg/enqueue/Client/PrepareBody.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace Enqueue\Client;

class PrepareBody
{
private $message;

private $body;

private $command;

private $topic;

public function __construct(Message $message, $body, string $command = null, string $topic = null)
{
$this->message = $message;
$this->body = $body;
$this->command = $command;
$this->topic = $topic;
}

public function getMessage(): Message
{
return $this->message;
}

public function getBody()
{
return $this->body;
}

public function setBody($body): void
{
$this->body = $body;
}

public function getCommand(): string
{
return $this->command;
}

public function getTopic(): string
{
return $this->topic;
}
}
32 changes: 23 additions & 9 deletions pkg/enqueue/Client/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ public function __construct(
public function sendEvent($topic, $message)
{
if (false == $message instanceof Message) {
$body = $message;
$message = new Message();
$message->setBody($body);
$message = new Message($message);
}

$this->prepareBody($message);
if (Config::COMMAND_TOPIC !== $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
$prepareEvent = new PrepareBody($message, $message->getBody(), null, $topic);
$this->extension->onPrepareEventBody($prepareEvent);

$message->setBody((string) $prepareEvent->getBody());
}

// $this->prepareBody($message);

$message->setProperty(Config::PARAMETER_TOPIC_NAME, $topic);

Expand All @@ -75,9 +80,9 @@ public function sendEvent($topic, $message)
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME));
}

$this->extension->onPreSend($topic, $message);
$this->extension->onPreSendEvent(new PreSend($message));
$this->driver->sendToRouter($message);
$this->extension->onPostSend($topic, $message);
$this->extension->onPostSendEvent(new PostSend($message));
} elseif (Message::SCOPE_APP == $message->getScope()) {
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName());
Expand All @@ -86,9 +91,13 @@ public function sendEvent($topic, $message)
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName());
}

$this->extension->onPreSend($topic, $message);
$this->driver->sendToProcessor($message);
$this->extension->onPostSend($topic, $message);
$preSend = new PreSend($message);
$preSend->isEvent() ? $this->extension->onPreSendEvent($preSend) : $this->extension->onPreSendCommand($preSend);

$this->driver->sendToRouter($message);

$postSend = new PostSend($message);
$postSend->isEvent() ? $this->extension->onPostSendEvent($postSend) : $this->extension->onPostSendCommand($postSend);
} else {
throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope()));
}
Expand All @@ -103,6 +112,11 @@ public function sendCommand($command, $message, $needReply = false)
$message = new Message($message);
}

$prepareEvent = new PrepareBody($message, $message->getBody(), $command);
$this->extension->onPrepareEventBody($prepareEvent);

$message->setBody((string) $prepareEvent->getBody());

$deleteReplyQueue = false;
$replyTo = $message->getReplyTo();

Expand Down

0 comments on commit eee2902

Please sign in to comment.