Skip to content

Commit

Permalink
Merge pull request #2 from php-enqueue/introduce-psr-message-processo…
Browse files Browse the repository at this point in the history
…r-interface

[psr] Introduce MessageProcessor interface (moved from consumption).
  • Loading branch information
makasim authored Jan 5, 2017
2 parents 192637a + 2bb31ae commit c6f3d06
Show file tree
Hide file tree
Showing 70 changed files with 671 additions and 659 deletions.
24 changes: 10 additions & 14 deletions docs/bundle/job_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ Guaranty that there is only single job running with such name.

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Message;
use Enqueue\Psr\Processor;
use Enqueue\Psr\Context;
use Enqueue\Util\JSON;
use Enqueue\JobQueue\JobRunner;
use Enqueue\JobQueue\Job;

class MessageProcessor implements MessageProcessorInterface
class ReindexProcessor implements Processor
{
/**
* @var JobRunner
Expand All @@ -43,7 +42,7 @@ class MessageProcessor implements MessageProcessorInterface
}
);

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}
```
Expand All @@ -54,16 +53,15 @@ Run several sub jobs in parallel.

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\JobQueue\JobRunner;
use Enqueue\JobQueue\Job;
use Enqueue\Client\MessageProducerInterface;
use Enqueue\Util\JSON;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Psr\Processor;

class Step1MessageProcessor implements MessageProcessorInterface
class Step1Processor implements Processor
{
/**
* @var JobRunner
Expand Down Expand Up @@ -102,11 +100,11 @@ class Step1MessageProcessor implements MessageProcessorInterface
}
);

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}

class Step2MessageProcessor implements MessageProcessorInterface
class Step2Processor implements Processor
{
/**
* @var JobRunner
Expand Down Expand Up @@ -138,17 +136,15 @@ just after all steps are finished.

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\JobQueue\JobRunner;
use Enqueue\JobQueue\Job;
use Enqueue\JobQueue\DependentJobService;
use Enqueue\Client\MessageProducerInterface;
use Enqueue\Util\JSON;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Psr\Processor;

class MessageProcessor implements MessageProcessorInterface
class ReindexProcessor implements Processor
{
/**
* @var JobRunner
Expand Down Expand Up @@ -182,7 +178,7 @@ class MessageProcessor implements MessageProcessorInterface
}
);

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}
```
Expand Down
15 changes: 7 additions & 8 deletions docs/bundle/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,18 @@ To consume messages you have to first create a message processor:
<?php
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Processor;
use Enqueue\Client\TopicSubscriberInterface;

class FooMessageProcessor implements MessageProcessorInterface, TopicSubscriberInterface
class FooProcessor implements Processor, TopicSubscriberInterface
{
public function process(Message $message, Context $session)
{
echo $message->getBody();

return Result::ACK;
// return Result::REJECT; // when the message is broken
// return Result::REQUEUE; // the message is fine but you want to postpone processing
return self::ACK;
// return self::REJECT; // when the message is broken
// return self::REQUEUE; // the message is fine but you want to postpone processing
}

public static function getSubscribedTopics()
Expand All @@ -72,9 +71,9 @@ Register it as a container service and subscribe to the topic:

```yaml
foo_message_processor:
class: 'FooMessageProcessor'
class: 'FooProcessor'
tags:
- { name: 'enqueue.client.message_processor' }
- { name: 'enqueue.client.processor' }
```
Now you can start consuming messages:
Expand Down
9 changes: 4 additions & 5 deletions docs/job_queue/run_sub_job.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ They will be executed in parallel.

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Client\MessageProducerInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Psr\Processor;
use Enqueue\JobQueue\JobRunner;
use Enqueue\JobQueue\Job;
use Enqueue\Util\JSON;

class RootJobMessageProcessor implements MessageProcessorInterface
class RootJobProcessor implements Processor
{
/** @var JobRunner */
private $jobRunner;
Expand All @@ -36,11 +35,11 @@ class RootJobMessageProcessor implements MessageProcessorInterface
return true;
});

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}

class SubJobMessageProcessor implements MessageProcessorInterface
class SubJobProcessor implements Processor
{
/** @var JobRunner */
private $jobRunner;
Expand Down
7 changes: 3 additions & 4 deletions docs/job_queue/run_unique_job.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ It shows how you can run unique job using job queue (The configuration is descri

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Psr\Processor;
use Enqueue\JobQueue\JobRunner;

class MessageProcessor implements MessageProcessorInterface
class UniqueJobProcessor implements Processor
{
/** @var JobRunner */
private $jobRunner;
Expand All @@ -31,7 +30,7 @@ class MessageProcessor implements MessageProcessorInterface
return true; // if you want to ACK message or false to REJECT
});

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}
```
Expand Down
14 changes: 7 additions & 7 deletions docs/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ The `consume` method starts the consumption process which last as long as it is
```php
<?php
use Enqueue\Psr\Message;
use Enqueue\Psr\Processor;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Result;

/** @var \Enqueue\Psr\Context $psrContext */

Expand All @@ -75,12 +75,12 @@ $queueConsumer = new QueueConsumer($psrContext);
$queueConsumer->bind('foo_queue', function(Message $message) {
// process messsage

return Result::ACK;
return Processor::ACK;
});
$queueConsumer->bind('bar_queue', function(Message $message) {
// process messsage

return Result::ACK;
return Processor::ACK;
});

$queueConsumer->consume();
Expand Down Expand Up @@ -167,16 +167,16 @@ Here's an example of how you can send and consume messages.
```php
<?php
use Enqueue\Client\SimpleClient;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Message;
use Enqueue\Psr\Processor;

/** @var \Enqueue\Psr\Context $psrClient */
/** @var \Enqueue\Psr\Context $psrContext */

$client = new SimpleClient($psrClient);
$client = new SimpleClient($psrContext);
$client->bind('foo_topic', function (Message $message) {
// process message

return Result::ACK;
return Processor::ACK;
});

$client->send('foo_topic', 'Hello there!');
Expand Down
14 changes: 7 additions & 7 deletions pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
namespace Enqueue\AmqpExt\Tests\Functional;

use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Psr\Context;
use Enqueue\Psr\Message;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Context;
use Enqueue\Psr\Message;
use Enqueue\Psr\Processor;
use Enqueue\Test\RabbitmqAmqpExtension;
use Enqueue\Test\RabbitmqManagmentExtensionTrait;

Expand Down Expand Up @@ -52,7 +52,7 @@ public function testConsumeOneMessageAndExit()
new LimitConsumptionTimeExtension(new \DateTime('+3sec')),
]));

$processor = new StubMessageProcessor();
$processor = new StubProcessor();
$queueConsumer->bind($queue, $processor);

$queueConsumer->consume();
Expand Down Expand Up @@ -81,10 +81,10 @@ public function testConsumeOneMessageAndSendReplyExit()

$replyMessage = $this->amqpContext->createMessage(__METHOD__.'.reply');

$processor = new StubMessageProcessor();
$processor = new StubProcessor();
$processor->result = Result::reply($replyMessage);

$replyProcessor = new StubMessageProcessor();
$replyProcessor = new StubProcessor();

$queueConsumer->bind($queue, $processor);
$queueConsumer->bind($replyQueue, $replyProcessor);
Expand All @@ -98,7 +98,7 @@ public function testConsumeOneMessageAndSendReplyExit()
}
}

class StubMessageProcessor implements MessageProcessorInterface
class StubProcessor implements Processor
{
public $result = Result::ACK;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

class BuildClientRoutingPass implements CompilerPassInterface
{
use ExtractMessageProcessorTagSubscriptionsTrait;
use ExtractProcessorTagSubscriptionsTrait;

/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$processorTagName = 'enqueue.client.message_processor';
$processorTagName = 'enqueue.client.processor';
$routerId = 'enqueue.client.router_processor';

if (false == $container->hasDefinition($routerId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;

class BuildMessageProcessorRegistryPass implements CompilerPassInterface
class BuildProcessorRegistryPass implements CompilerPassInterface
{
use ExtractMessageProcessorTagSubscriptionsTrait;
use ExtractProcessorTagSubscriptionsTrait;

/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$processorTagName = 'enqueue.client.message_processor';
$processorRegistryId = 'enqueue.client.message_processor_registry';
$processorTagName = 'enqueue.client.processor';
$processorRegistryId = 'enqueue.client.processor_registry';

if (false == $container->hasDefinition($processorRegistryId)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

class BuildQueueMetaRegistryPass implements CompilerPassInterface
{
use ExtractMessageProcessorTagSubscriptionsTrait;
use ExtractProcessorTagSubscriptionsTrait;

/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$processorTagName = 'enqueue.client.message_processor';
$processorTagName = 'enqueue.client.processor';
$queueMetaRegistryId = 'enqueue.client.meta.queue_meta_registry';
if (false == $container->hasDefinition($queueMetaRegistryId)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

class BuildTopicMetaSubscribersPass implements CompilerPassInterface
{
use ExtractMessageProcessorTagSubscriptionsTrait;
use ExtractProcessorTagSubscriptionsTrait;

/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$processorTagName = 'enqueue.client.message_processor';
$processorTagName = 'enqueue.client.processor';

$topicsSubscribers = [];
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\ParameterNotFoundException;

trait ExtractMessageProcessorTagSubscriptionsTrait
trait ExtractProcessorTagSubscriptionsTrait
{
/**
* @param ContainerBuilder $container
Expand Down
8 changes: 4 additions & 4 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
use Enqueue\AmqpExt\Symfony\RabbitMqTransportFactory;
use Enqueue\Symfony\DefaultTransportFactory;
use Enqueue\Symfony\NullTransportFactory;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildMessageProcessorRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
use Enqueue\Stomp\StompContext;
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
use Enqueue\Stomp\Symfony\StompTransportFactory;
use Enqueue\Symfony\DefaultTransportFactory;
use Enqueue\Symfony\NullTransportFactory;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Bundle\Bundle;

Expand All @@ -27,7 +27,7 @@ public function build(ContainerBuilder $container)
{
$container->addCompilerPass(new BuildExtensionsPass());
$container->addCompilerPass(new BuildClientRoutingPass());
$container->addCompilerPass(new BuildMessageProcessorRegistryPass());
$container->addCompilerPass(new BuildProcessorRegistryPass());
$container->addCompilerPass(new BuildTopicMetaSubscribersPass());
$container->addCompilerPass(new BuildQueueMetaRegistryPass());

Expand Down
Loading

0 comments on commit c6f3d06

Please sign in to comment.