Skip to content

Commit

Permalink
Merge pull request #119 from php-enqueue/simple-client-allow-processo…
Browse files Browse the repository at this point in the history
…rs-bind

[simple-client] Allow processor instance bind.
  • Loading branch information
makasim authored Jun 19, 2017
2 parents 4edabc7 + f99955f commit 6d297fe
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions pkg/simple-client/SimpleClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Enqueue\Dbal\Symfony\DbalTransportFactory;
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrProcessor;
use Enqueue\Redis\Symfony\RedisTransportFactory;
use Enqueue\Sqs\Symfony\SqsTransportFactory;
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
Expand Down Expand Up @@ -80,17 +81,25 @@ public function __construct($config)
}

/**
* @param string $topic
* @param string $processorName
* @param callback $processor
* @param string $topic
* @param string $processorName
* @param callable|PsrProcessor $processor
*/
public function bind($topic, $processorName, callable $processor)
public function bind($topic, $processorName, $processor)
{
if (is_callable($processor)) {
$processor = new CallbackProcessor($processor);
}

if (false == $processor instanceof PsrProcessor) {
throw new \LogicException('The processor must be either callable or instance of PsrProcessor');
}

$queueName = $this->getConfig()->getDefaultProcessorQueueName();

$this->getTopicMetaRegistry()->addProcessor($topic, $processorName);
$this->getQueueMetaRegistry()->addProcessor($queueName, $processorName);
$this->getProcessorRegistry()->add($processorName, new CallbackProcessor($processor));
$this->getProcessorRegistry()->add($processorName, $processor);
$this->getRouterProcessor()->add($topic, $queueName, $processorName);
}

Expand Down

0 comments on commit 6d297fe

Please sign in to comment.