Skip to content

Commit

Permalink
wamp
Browse files Browse the repository at this point in the history
  • Loading branch information
ASKozienko committed Oct 23, 2018
1 parent b8f0453 commit 2db7fe8
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 2 deletions.
30 changes: 30 additions & 0 deletions docs/transport/wamp.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ It uses internally Thruway PHP library [voryx/thruway](https://github.com/voryx/
* [Start the WAMP router](#start-the-wamp-router)
* [Create context](#create-context)
* [Consume message](#consume-message)
* [Subscription consumer](#subscription-consumer)
* [Send message to topic](#send-message-to-topic)

## Installation
Expand Down Expand Up @@ -58,6 +59,35 @@ while (true) {
}
```

## Subscription consumer

```php
<?php
use Interop\Queue\Message;
use Interop\Queue\Consumer;

/** @var \Enqueue\Wamp\WampContext $context */
/** @var \Enqueue\Wamp\WampDestination $fooQueue */
/** @var \Enqueue\Wamp\WampDestination $barQueue */

$fooConsumer = $context->createConsumer($fooQueue);
$barConsumer = $context->createConsumer($barQueue);

$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
// process message

return true;
});
$subscriptionConsumer->subscribe($barConsumer, function(Message $message, Consumer $consumer) {
// process message

return true;
});

$subscriptionConsumer->consume(2000); // 2 sec
```

## Send message to topic

```php
Expand Down
4 changes: 4 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@
<testsuite name="dsn">
<directory>pkg/dsn/Tests</directory>
</testsuite>

<testsuite name="wamp transport">
<directory>pkg/wamp/Tests</directory>
</testsuite>
</testsuites>

<php>
Expand Down
2 changes: 1 addition & 1 deletion pkg/enqueue/Resources.php
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public static function getKnownConnections(): array
'package' => 'enqueue/mongodb',
];
$map[WampConnectionFactory::class] = [
'schemes' => ['wamp'],
'schemes' => ['wamp', 'ws'],
'supportedSchemeExtensions' => [],
'package' => 'enqueue/wamp',
];
Expand Down
19 changes: 19 additions & 0 deletions pkg/enqueue/Tests/ResourcesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Resources;
use Enqueue\Wamp\WampConnectionFactory;
use Interop\Queue\ConnectionFactory;
use PHPUnit\Framework\TestCase;

Expand Down Expand Up @@ -127,4 +128,22 @@ public function testShouldAllowGetPreviouslyRegisteredConnection()
$this->assertArrayHasKey('package', $connectionInfo);
$this->assertSame('foo/bar', $connectionInfo['package']);
}

public function testShouldHaveRegisteredWampConfiguration()
{
$availableConnections = Resources::getKnownConnections();

$this->assertInternalType('array', $availableConnections);
$this->assertArrayHasKey(WampConnectionFactory::class, $availableConnections);

$connectionInfo = $availableConnections[WampConnectionFactory::class];
$this->assertArrayHasKey('schemes', $connectionInfo);
$this->assertSame(['wamp', 'ws'], $connectionInfo['schemes']);

$this->assertArrayHasKey('supportedSchemeExtensions', $connectionInfo);
$this->assertSame([], $connectionInfo['supportedSchemeExtensions']);

$this->assertArrayHasKey('package', $connectionInfo);
$this->assertSame('enqueue/wamp', $connectionInfo['package']);
}
}
2 changes: 1 addition & 1 deletion pkg/wamp/WampConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private function parseDsn(string $dsn): array
{
$dsn = new Dsn($dsn);

if ('wamp' !== $dsn->getSchemeProtocol()) {
if (false === in_array($dsn->getSchemeProtocol(), ['wamp', 'ws'], true)) {
throw new \LogicException(sprintf(
'The given scheme protocol "%s" is not supported. It must be "wamp"',
$dsn->getSchemeProtocol()
Expand Down

0 comments on commit 2db7fe8

Please sign in to comment.