From f99955f1d367d3b43b316d65a662e573fd2aeac3 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 19 Jun 2017 11:35:12 +0300 Subject: [PATCH] [simple-client] Allow processor instance bind. --- pkg/simple-client/SimpleClient.php | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index ecc31aee2..82f093c53 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -19,6 +19,7 @@ use Enqueue\Dbal\Symfony\DbalTransportFactory; use Enqueue\Fs\Symfony\FsTransportFactory; use Enqueue\Psr\PsrContext; +use Enqueue\Psr\PsrProcessor; use Enqueue\Redis\Symfony\RedisTransportFactory; use Enqueue\Sqs\Symfony\SqsTransportFactory; use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory; @@ -80,17 +81,25 @@ public function __construct($config) } /** - * @param string $topic - * @param string $processorName - * @param callback $processor + * @param string $topic + * @param string $processorName + * @param callable|PsrProcessor $processor */ - public function bind($topic, $processorName, callable $processor) + public function bind($topic, $processorName, $processor) { + if (is_callable($processor)) { + $processor = new CallbackProcessor($processor); + } + + if (false == $processor instanceof PsrProcessor) { + throw new \LogicException('The processor must be either callable or instance of PsrProcessor'); + } + $queueName = $this->getConfig()->getDefaultProcessorQueueName(); $this->getTopicMetaRegistry()->addProcessor($topic, $processorName); $this->getQueueMetaRegistry()->addProcessor($queueName, $processorName); - $this->getProcessorRegistry()->add($processorName, new CallbackProcessor($processor)); + $this->getProcessorRegistry()->add($processorName, $processor); $this->getRouterProcessor()->add($topic, $queueName, $processorName); }