Skip to content

Commit

Permalink
[client] Improve client extension.
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Aug 27, 2018
1 parent 0560f5a commit 6c0e31d
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 358 deletions.
28 changes: 18 additions & 10 deletions pkg/enqueue/Client/ChainExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,31 @@ public function __construct(array $extensions)
$this->extensions = $extensions;
}

/**
* {@inheritdoc}
*/
public function onPreSend($topic, Message $message)
public function onPreSendEvent(PreSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPreSend($topic, $message);
$extension->onPreSendEvent($event);
}
}

/**
* {@inheritdoc}
*/
public function onPostSend($topic, Message $message)
public function onPreSendCommand(PreSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPreSendCommand($event);
}
}

public function onPreDriverSend(PreDriverSend $context): void
{
foreach ($this->extensions as $extension) {
$extension->onPreDriverSend($context);
}
}

public function onPostSend(PostSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPostSend($topic, $message);
$extension->onPostSend($event);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
namespace Enqueue\Client\ConsumptionExtension;

use Enqueue\Client\Config;
use Enqueue\Client\EmptyExtensionTrait as ClientEmptyExtensionTrait;
use Enqueue\Client\ExtensionInterface as ClientExtensionInterface;
use Enqueue\Client\Message;
use Enqueue\Client\PreSend;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\EmptyExtensionTrait;
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;

class ExclusiveCommandExtension implements ConsumptionExtensionInterface, ClientExtensionInterface
{
use EmptyExtensionTrait;
use ConsumptionEmptyExtensionTrait, ClientEmptyExtensionTrait;

/**
* @var string[]
Expand Down Expand Up @@ -60,26 +61,14 @@ public function onPreReceived(Context $context)
}
}

/**
* {@inheritdoc}
*/
public function onPreSend($topic, Message $message)
public function onPreSendCommand(PreSend $context): void
{
if (Config::COMMAND_TOPIC != $topic) {
return;
}
$message = $context->getMessage();
$command = $context->getCommandOrTopic();

$commandName = $message->getProperty(Config::PARAMETER_COMMAND_NAME);
if (array_key_exists($commandName, $this->processorNameToQueueNameMap)) {
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $commandName);
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$commandName]);
if (array_key_exists($command, $this->processorNameToQueueNameMap)) {
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command);
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$command]);
}
}

/**
* {@inheritdoc}
*/
public function onPostSend($topic, Message $message)
{
}
}
22 changes: 22 additions & 0 deletions pkg/enqueue/Client/EmptyExtensionTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Enqueue\Client;

trait EmptyExtensionTrait
{
public function onPreSendEvent(PreSend $context): void
{
}

public function onPreSendCommand(PreSend $context): void
{
}

public function onPreDriverSend(PreDriverSend $context): void
{
}

public function onPostSend(PostSend $context): void
{
}
}
59 changes: 59 additions & 0 deletions pkg/enqueue/Client/Extension/PrepareBodyExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

namespace Enqueue\Client\Extension;

use Enqueue\Client\EmptyExtensionTrait;
use Enqueue\Client\ExtensionInterface;
use Enqueue\Client\Message;
use Enqueue\Client\PreSend;
use Enqueue\Util\JSON;

class PrepareBodyExtension implements ExtensionInterface
{
use EmptyExtensionTrait;

public function onPreSendEvent(PreSend $context): void
{
$this->prepareBody($context->getMessage());
}

public function onPreSendCommand(PreSend $context): void
{
$this->prepareBody($context->getMessage());
}

private function prepareBody(Message $message): void
{
$body = $message->getBody();
$contentType = $message->getContentType();

if (is_scalar($body) || null === $body) {
$contentType = $contentType ?: 'text/plain';
$body = (string) $body;
} elseif (is_array($body)) {
// only array of scalars is allowed.
array_walk_recursive($body, function ($value) {
if (!is_scalar($value) && null !== $value) {
throw new \LogicException(sprintf(
'The message\'s body must be an array of scalars. Found not scalar in the array: %s',
is_object($value) ? get_class($value) : gettype($value)
));
}
});

$contentType = $contentType ?: 'application/json';
$body = JSON::encode($body);
} elseif ($body instanceof \JsonSerializable) {
$contentType = $contentType ?: 'application/json';
$body = JSON::encode($body);
} else {
throw new \InvalidArgumentException(sprintf(
'The message\'s body must be either null, scalar, array or object (implements \JsonSerializable). Got: %s',
is_object($body) ? get_class($body) : gettype($body)
));
}

$message->setContentType($contentType);
$message->setBody($body);
}
}
30 changes: 16 additions & 14 deletions pkg/enqueue/Client/ExtensionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@

interface ExtensionInterface
{
/**
* @param string $topic
* @param Message $message
*
* @return
*/
public function onPreSend($topic, Message $message);
public function onPreSendEvent(PreSend $context): void;

/**
* @param string $topic
* @param Message $message
*
* @return
*/
public function onPostSend($topic, Message $message);
public function onPreSendCommand(PreSend $context): void;

public function onPreDriverSend(PreDriverSend $context): void;

public function onPostSend(PostSend $context): void;

// /**
// * @deprecated
// */
// public function onPreSend($topic, Message $message);
//
// /**
// * @deprecated
// */
// public function onPostSend($topic, Message $message);
}
49 changes: 49 additions & 0 deletions pkg/enqueue/Client/PostSend.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Enqueue\Client;

class PostSend
{
private $message;

private $producer;

private $driver;

public function __construct(Message $message, ProducerInterface $producer, DriverInterface $driver)
{
$this->message = $message;
$this->producer = $producer;
$this->driver = $driver;
}

public function getMessage(): Message
{
return $this->message;
}

public function getProducer(): ProducerInterface
{
return $this->producer;
}

public function getDriver(): DriverInterface
{
return $this->driver;
}

public function isEvent(): bool
{
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}

public function getCommand(): string
{
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
}

public function getTopic(): string
{
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}
}
49 changes: 49 additions & 0 deletions pkg/enqueue/Client/PreDriverSend.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Enqueue\Client;

class PreDriverSend
{
private $message;

private $producer;

private $driver;

public function __construct(Message $message, ProducerInterface $producer, DriverInterface $driver)
{
$this->message = $message;
$this->producer = $producer;
$this->driver = $driver;
}

public function getMessage(): Message
{
return $this->message;
}

public function getProducer(): ProducerInterface
{
return $this->producer;
}

public function getDriver(): DriverInterface
{
return $this->driver;
}

public function isEvent(): bool
{
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}

public function getCommand(): string
{
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
}

public function getTopic(): string
{
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}
}
69 changes: 69 additions & 0 deletions pkg/enqueue/Client/PreSend.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

namespace Enqueue\Client;

class PreSend
{
private $message;

private $originalMessage;

private $commandOrTopic;

private $producer;

private $driver;

public function __construct(
string $commandOrTopic,
Message $message,
ProducerInterface $producer,
DriverInterface $driver
) {
$this->message = $message;
$this->commandOrTopic = $commandOrTopic;
$this->producer = $producer;
$this->driver = $driver;

$this->originalMessage = clone $message;
}

public function getCommandOrTopic(): string
{
return $this->commandOrTopic;
}

public function changeCommandOrTopic(string $commandOrTopic): void
{
$this->commandOrTopic = $commandOrTopic;
}

public function changeBody($body, string $contentType = null): void
{
$this->message->setBody($body);

if (null !== $contentType) {
$this->message->setContentType($contentType);
}
}

public function getMessage(): Message
{
return $this->message;
}

public function getOriginalMessage(): Message
{
return $this->originalMessage;
}

public function getProducer(): ProducerInterface
{
return $this->producer;
}

public function getDriver(): DriverInterface
{
return $this->driver;
}
}
Loading

0 comments on commit 6c0e31d

Please sign in to comment.