Skip to content

Commit

Permalink
Merge pull request #129 from php-enqueue/bundle-do-not-use-container-…
Browse files Browse the repository at this point in the history
…aware-event-dispatcher

[bundle] Extend EventDispatcher instead of container aware one.
  • Loading branch information
makasim authored Jul 6, 2017
2 parents 77124b6 + 9e2054c commit 40033ce
Show file tree
Hide file tree
Showing 55 changed files with 1,296 additions and 372 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ install:

script:
# misssing pkg/amqp-ext pkg/job-queue pkg/redis
- if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/enqueue pkg/psr-queue pkg/fs pkg/simple-client; fi
- if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/stomp pkg/dbal pkg/enqueue-bundle pkg/null pkg/sqs pkg/test; fi
- if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/psr-queue pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test; fi
- if [ "$PHP_CS_FIXER" = true ]; then IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS; fi
- if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer fix --config=.php_cs.dist -v --dry-run --stop-on-violation --using-cache=no --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}"; fi
- if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi
Expand Down
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"enqueue/job-queue": "*@dev",
"enqueue/simple-client": "*@dev",
"enqueue/test": "*@dev",
"enqueue/async-event-dispatcher": "*@dev",

"phpunit/phpunit": "^5",
"doctrine/doctrine-bundle": "~1.2",
Expand Down Expand Up @@ -98,6 +99,10 @@
{
"type": "path",
"url": "pkg/simple-client"
},
{
"type": "path",
"url": "pkg/async-event-dispatcher"
}
]
}
112 changes: 112 additions & 0 deletions docs/async_event_dispatcher/quick_tour.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Async event dispatcher (Symfony)

The doc shows how you can setup async event dispatching in plain PHP.
If you are looking for the ways to use it in Symfony application [read this post instead](../bundle/async_events.md)

* [Installation](#installation)
* [Configuration](#configuration)
* [Dispatch event](#dispatch-event)
* [Process async events](#process-async-events)

## Installation

You need the async dispatcher library and a one of [the supported transports](../transport)

```bash
$ composer require enqueue/async-event-dispatcher enqueue/fs
```

## Configuration

```php
<?php

// config.php

use Enqueue\AsyncEventDispatcher\AsyncListener;
use Enqueue\AsyncEventDispatcher\AsyncProcessor;
use Enqueue\AsyncEventDispatcher\PhpSerializerEventTransformer;
use Enqueue\AsyncEventDispatcher\AsyncEventDispatcher;
use Enqueue\AsyncEventDispatcher\SimpleRegistry;
use Enqueue\Fs\FsConnectionFactory;
use Symfony\Component\EventDispatcher\EventDispatcher;

require_once __DIR__.'/vendor/autoload.php';

// it could be any other enqueue/psr-queue compatible context.
$context = (new FsConnectionFactory('file://'.__DIR__.'/queues'))->createContext();
$eventQueue = $context->createQueue('symfony_events');

$registry = new SimpleRegistry(
['the_event' => 'default'],
['default' => new PhpSerializerEventTransformer($context, true)]
);

$asyncListener = new AsyncListener($context, $registry, $eventQueue);

$dispatcher = new EventDispatcher();

// the listener sends even as a message through MQ
$dispatcher->addListener('the_event', $asyncListener);

$asyncDispatcher = new AsyncEventDispatcher($dispatcher, $asyncListener);

// the listener is executed on consumer side.
$asyncDispatcher->addListener('the_event', function() {
});

$asyncProcessor = new AsyncProcessor($registry, $asyncDispatcher);
```

## Dispatch event

```php
<?php

// send.php

use Symfony\Component\EventDispatcher\GenericEvent;

require_once __DIR__.'/vendor/autoload.php';

include __DIR__.'/config.php';

$dispatcher->dispatch('the_event', new GenericEvent('theSubject'));
```

## Process async events

```php
<?php

// consume.php

use Enqueue\Psr\PsrProcessor;

require_once __DIR__.'/vendor/autoload.php';
include __DIR__.'/config.php';

$consumer = $context->createConsumer($eventQueue);

while (true) {
if ($message = $consumer->receive(5000)) {
$result = $asyncProcessor->process($message, $context);

switch ((string) $result) {
case PsrProcessor::ACK:
$consumer->acknowledge($message);
break;
case PsrProcessor::REJECT:
$consumer->reject($message);
break;
case PsrProcessor::REQUEUE:
$consumer->reject($message, true);
break;
default:
throw new \LogicException('Result is not supported');
}
}
}
```

[back to index](../index.md)
57 changes: 4 additions & 53 deletions docs/bundle/async_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,67 +66,18 @@ You can also add an async listener directly and register a custom message proces
services:
acme.async_foo_listener:
class: 'Enqueue\Bundle\Events\AsyncListener'
class: 'Enqueue\AsyncEventDispatcher\AsyncListener'
public: false
arguments: ['@enqueue.client.producer', '@enqueue.events.registry']
arguments: ['@enqueue.transport.default.context', '@enqueue.events.registry', 'a_queue_name']
tags:
- { name: 'kernel.event_listener', event: 'foo', method: 'onEvent' }
```

The message processor must subscribe to `event.foo` topic. The message queue topics names for event follow this patter `event.{eventName}`.

```php
<?php
use Enqueue\Bundle\Events\Registry;
use Enqueue\Client\TopicSubscriberInterface;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
class FooEventProcessor implements PsrProcessor, TopicSubscriberInterface
{
/**
* @var Registry
*/
private $registry;
/**
* @param Registry $registry
*/
public function __construct(Registry $registry)
{
$this->registry = $registry;
}
public function process(PsrMessage $message, PsrContext $context)
{
if (false == $eventName = $message->getProperty('event_name')) {
return self::REJECT;
}
if (false == $transformerName = $message->getProperty('transformer_name')) {
return self::REJECT;
}
// do what you want with the event.
$event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message);
return self::ACK;
}
public static function getSubscribedTopics()
{
return ['event.foo'];
}
}
```


## Event transformer

The bundle uses [php serializer](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Events/PhpSerializerEventTransformer.php) transformer by default to pass events through MQ.
You could create a transformer for the given event type. The transformer must implement `Enqueue\Bundle\Events\EventTransformer` interface.
You could create a transformer for the given event type. The transformer must implement `Enqueue\AsyncEventDispatcher\EventTransformer` interface.
Consider the next example. It shows how to send an event that contains Doctrine entity as a subject

```php
Expand All @@ -140,7 +91,7 @@ use Enqueue\Consumption\Result;
use Enqueue\Psr\PsrMessage;
use Enqueue\Util\JSON;
use Symfony\Component\EventDispatcher\Event;
use Enqueue\Bundle\Events\EventTransformer;
use Enqueue\AsyncEventDispatcher\EventTransformer;
use Doctrine\Bundle\DoctrineBundle\Registry;
use Symfony\Component\EventDispatcher\GenericEvent;
Expand Down
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
- [Production settings](bundle/production_settings.md)
- [Debuging](bundle/debuging.md)
- [Functional testing](bundle/functional_testing.md)
* Async event dispatcher (Symfony)
- [Quick tour](async_event_dispatcher/quick_tour.md)
* Magento
- [Quick tour](magento/quick_tour.md)
- [Cli commands](magento/cli_commands.md)
Expand Down
9 changes: 8 additions & 1 deletion phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
parameters:
excludes_analyse:
- pkg/enqueue/Util/UUID.php
- pkg/enqueue/Util/UUID.php
- pkg/enqueue-bundle/Tests/Functional/App
- pkg/job-queue/Test/JobRunner.php
- pkg/job-queue/Tests/Functional/app/AppKernel.php
- pkg/redis/PhpRedis.php
- pkg/redis/RedisConnectionFactory.php
- pkg/gearman
- pkg/amqp-ext/AmqpConsumer.php
4 changes: 4 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
<testsuite name="simple-client">
<directory>pkg/simple-client/Tests</directory>
</testsuite>

<testsuite name="async-event-dispatcher">
<directory>pkg/async-event-dispatcher/Tests</directory>
</testsuite>
</testsuites>

<filter>
Expand Down
7 changes: 7 additions & 0 deletions pkg/async-event-dispatcher/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*~
/composer.lock
/composer.phar
/phpunit.xml
/vendor/
/.idea/
Tests/Functional/queues
21 changes: 21 additions & 0 deletions pkg/async-event-dispatcher/.travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
sudo: false

git:
depth: 1

language: php

php:
- '5.6'
- '7.0'

cache:
directories:
- $HOME/.composer/cache

install:
- composer self-update
- composer install --prefer-source --ignore-platform-reqs

script:
- vendor/bin/phpunit --exclude-group=functional
57 changes: 57 additions & 0 deletions pkg/async-event-dispatcher/AsyncEventDispatcher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

namespace Enqueue\AsyncEventDispatcher;

use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class AsyncEventDispatcher extends EventDispatcher
{
/**
* @var EventDispatcherInterface
*/
private $trueEventDispatcher;

/**
* @var AsyncListener
*/
private $asyncListener;

/**
* @param EventDispatcherInterface $trueEventDispatcher
* @param AsyncListener $asyncListener
*/
public function __construct(EventDispatcherInterface $trueEventDispatcher, AsyncListener $asyncListener)
{
$this->trueEventDispatcher = $trueEventDispatcher;
$this->asyncListener = $asyncListener;
}

/**
* This method dispatches only those listeners that were marked as async.
*
* @param string $eventName
* @param Event|null $event
*/
public function dispatchAsyncListenersOnly($eventName, Event $event = null)
{
try {
$this->asyncListener->syncMode($eventName);

parent::dispatch($eventName, $event);
} finally {
$this->asyncListener->resetSyncMode();
}
}

/**
* {@inheritdoc}
*/
public function dispatch($eventName, Event $event = null)
{
parent::dispatch($eventName, $event);

$this->trueEventDispatcher->dispatch($eventName, $event);
}
}
Loading

0 comments on commit 40033ce

Please sign in to comment.