diff --git a/docs/transport/amqp.md b/docs/transport/amqp.md index 7c073d407..80845e71a 100644 --- a/docs/transport/amqp.md +++ b/docs/transport/amqp.md @@ -235,7 +235,7 @@ use Interop\Queue\PsrConsumer; $fooConsumer = $psrContext->createConsumer($fooQueue); $barConsumer = $psrContext->createConsumer($barQueue); -$subscriptionConsumer =$psrContext->createSubscriptionConsumer(); +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { // process message diff --git a/docs/transport/amqp_bunny.md b/docs/transport/amqp_bunny.md index 241cfe836..c0f68534b 100644 --- a/docs/transport/amqp_bunny.md +++ b/docs/transport/amqp_bunny.md @@ -227,7 +227,7 @@ use Interop\Queue\PsrConsumer; $fooConsumer = $psrContext->createConsumer($fooQueue); $barConsumer = $psrContext->createConsumer($barQueue); -$subscriptionConsumer =$psrContext->createSubscriptionConsumer(); +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { // process message diff --git a/docs/transport/amqp_lib.md b/docs/transport/amqp_lib.md index 6f749233b..b3e7e055a 100644 --- a/docs/transport/amqp_lib.md +++ b/docs/transport/amqp_lib.md @@ -235,7 +235,7 @@ use Interop\Queue\PsrConsumer; $fooConsumer = $psrContext->createConsumer($fooQueue); $barConsumer = $psrContext->createConsumer($barQueue); -$subscriptionConsumer =$psrContext->createSubscriptionConsumer(); +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { // process message diff --git a/docs/transport/dbal.md b/docs/transport/dbal.md index 9fcaab04a..6e72d7953 100644 --- a/docs/transport/dbal.md +++ b/docs/transport/dbal.md @@ -11,6 +11,7 @@ It creates a table there. Pushes and pops messages to\from that table. * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) * [Consume message](#consume-message) +* [Subscription consumer](#subscription-consumer) ## Installation @@ -103,4 +104,37 @@ $message = $consumer->receive(); // process a message ``` +## Subscription consumer + +```php +createConsumer($fooQueue); +$barConsumer = $psrContext->createConsumer($barQueue); + +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); +$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { + // process message + + $consumer->acknowledge($message); + + return true; +}); +$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) { + // process message + + $consumer->acknowledge($message); + + return true; +}); + +$subscriptionConsumer->consume(2000); // 2 sec +``` + [back to index](../index.md) \ No newline at end of file diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 2f0976625..c8e1b02d4 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -142,7 +142,7 @@ protected function receiveMessage(): ?DbalMessage $this->dbal->commit(); if (empty($dbalMessage['time_to_live']) || ($dbalMessage['time_to_live'] / 1000) > microtime(true)) { - return $this->convertMessage($dbalMessage); + return $this->context->convertMessage($dbalMessage); } return null; @@ -153,27 +153,6 @@ protected function receiveMessage(): ?DbalMessage } } - protected function convertMessage(array $dbalMessage): DbalMessage - { - /** @var DbalMessage $message */ - $message = $this->context->createMessage(); - - $message->setBody($dbalMessage['body']); - $message->setPriority((int) $dbalMessage['priority']); - $message->setRedelivered((bool) $dbalMessage['redelivered']); - $message->setPublishedAt((int) $dbalMessage['published_at']); - - if ($dbalMessage['headers']) { - $message->setHeaders(JSON::decode($dbalMessage['headers'])); - } - - if ($dbalMessage['properties']) { - $message->setProperties(JSON::decode($dbalMessage['properties'])); - } - - return $message; - } - private function fetchPrioritizedMessage(int $now): ?array { $query = $this->dbal->createQueryBuilder(); diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 91fa667ff..5a715b288 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -11,7 +11,6 @@ use Interop\Queue\Context; use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; -use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException; use Interop\Queue\Exception\TemporaryQueueNotSupportedException; use Interop\Queue\Message; use Interop\Queue\Producer; @@ -126,7 +125,31 @@ public function close(): void public function createSubscriptionConsumer(): SubscriptionConsumer { - throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt(); + return new DbalSubscriptionConsumer($this); + } + + /** + * @internal It must be used here and in the consumer only + */ + public function convertMessage(array $dbalMessage): DbalMessage + { + $dbalMessageObj = $this->createMessage( + $dbalMessage['body'], + $dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [], + $dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : [] + ); + + if (isset($dbalMessage['redelivered'])) { + $dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']); + } + if (isset($dbalMessage['priority'])) { + $dbalMessageObj->setPriority((int) $dbalMessage['priority']); + } + if (isset($dbalMessage['published_at'])) { + $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); + } + + return $dbalMessageObj; } /** diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php new file mode 100644 index 000000000..a822cee04 --- /dev/null +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -0,0 +1,198 @@ +context = $context; + $this->dbal = $this->context->getDbalConnection(); + $this->subscribers = []; + } + + public function consume(int $timeout = 0): void + { + if (empty($this->subscribers)) { + throw new \LogicException('No subscribers'); + } + + $timeout = (int) ceil($timeout / 1000); + $endAt = time() + $timeout; + + $queueNames = []; + foreach (array_keys($this->subscribers) as $queueName) { + $queueNames[$queueName] = $queueName; + } + + $currentQueueNames = []; + while (true) { + if (empty($currentQueueNames)) { + $currentQueueNames = $queueNames; + } + + $message = $this->fetchPrioritizedMessage($currentQueueNames) ?: $this->fetchMessage($currentQueueNames); + + if ($message) { + $this->dbal->delete($this->context->getTableName(), ['id' => $message['id']], ['id' => Type::GUID]); + + $dbalMessage = $this->context->convertMessage($message); + + /** + * @var DbalConsumer + * @var callable $callback + */ + list($consumer, $callback) = $this->subscribers[$message['queue']]; + + if (false === call_user_func($callback, $dbalMessage, $consumer)) { + return; + } + + unset($currentQueueNames[$message['queue']]); + } else { + $currentQueueNames = []; + + usleep(200000); // 200ms + } + + if ($timeout && microtime(true) >= $endAt) { + return; + } + } + } + + /** + * @param DbalConsumer $consumer + */ + public function subscribe(Consumer $consumer, callable $callback): void + { + if (false == $consumer instanceof DbalConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + if (array_key_exists($queueName, $this->subscribers)) { + if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { + return; + } + + throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); + } + + $this->subscribers[$queueName] = [$consumer, $callback]; + } + + /** + * @param DbalConsumer $consumer + */ + public function unsubscribe(Consumer $consumer): void + { + if (false == $consumer instanceof DbalConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + + if (false == array_key_exists($queueName, $this->subscribers)) { + return; + } + + if ($this->subscribers[$queueName][0] !== $consumer) { + return; + } + + unset($this->subscribers[$queueName]); + } + + public function unsubscribeAll(): void + { + $this->subscribers = []; + } + + private function fetchMessage(array $queues): ?array + { + $query = $this->dbal->createQueryBuilder(); + $query + ->select('*') + ->from($this->context->getTableName()) + ->andWhere('queue IN (:queues)') + ->andWhere('priority IS NULL') + ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') + ->addOrderBy('published_at', 'asc') + ->setMaxResults(1) + ; + + $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); + + $result = $this->dbal->executeQuery( + $sql, + [ + 'queues' => array_keys($queues), + 'delayedUntil' => time(), + ], + [ + 'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY, + 'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER, + ] + )->fetch(); + + return $result ?: null; + } + + private function fetchPrioritizedMessage(array $queues): ?array + { + $query = $this->dbal->createQueryBuilder(); + $query + ->select('*') + ->from($this->context->getTableName()) + ->andWhere('queue IN (:queues)') + ->andWhere('priority IS NOT NULL') + ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') + ->addOrderBy('published_at', 'asc') + ->addOrderBy('priority', 'desc') + ->setMaxResults(1) + ; + + $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); + + $result = $this->dbal->executeQuery( + $sql, + [ + 'queues' => array_keys($queues), + 'delayedUntil' => time(), + ], + [ + 'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY, + 'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER, + ] + )->fetch(); + + return $result ?: null; + } +} diff --git a/pkg/dbal/Tests/DbalContextTest.php b/pkg/dbal/Tests/DbalContextTest.php index 0cddfdf71..b505b0dde 100644 --- a/pkg/dbal/Tests/DbalContextTest.php +++ b/pkg/dbal/Tests/DbalContextTest.php @@ -64,6 +64,21 @@ public function testShouldCreateMessage() $this->assertFalse($message->isRedelivered()); } + public function testShouldConvertArrayToDbalMessage() + { + $arrayData = [ + 'body' => 'theBody', + 'properties' => json_encode(['barProp' => 'barPropVal']), + 'headers' => json_encode(['fooHeader' => 'fooHeaderVal']), + ]; + $context = new DbalContext($this->createConnectionMock()); + $message = $context->convertMessage($arrayData); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['barProp' => 'barPropVal'], $message->getProperties()); + $this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); + } + public function testShouldCreateTopic() { $context = new DbalContext($this->createConnectionMock()); diff --git a/pkg/dbal/Tests/DbalMessageTest.php b/pkg/dbal/Tests/DbalMessageTest.php index 05f4c89fb..19f234582 100644 --- a/pkg/dbal/Tests/DbalMessageTest.php +++ b/pkg/dbal/Tests/DbalMessageTest.php @@ -1,5 +1,7 @@ assertTrue($rc->implementsInterface(SubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithDbalContextAsFirstArgument() + { + new DbalSubscriptionConsumer($this->createDbalContextMock()); + } + + public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + + $this->assertAttributeSame([ + 'foo_queue' => [$fooConsumer, $fooCallback], + 'bar_queue' => [$barConsumer, $barCallback], + ], 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"'); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + } + + public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + } + + public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $fooConsumer = $this->createConsumerStub('foo_queue'); + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, function () {}); + $subscriptionConsumer->subscribe($barConsumer, function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($fooConsumer); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + $subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribeAll(); + + $this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTryConsumeWithoutSubscribers() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('No subscribers'); + + $subscriptionConsumer->consume(); + } + + /** + * @return DbalContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createDbalContextMock() + { + return $this->createMock(DbalContext::class); + } + + /** + * @param null|mixed $queueName + * + * @return Consumer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumerStub($queueName = null) + { + $queueMock = $this->createMock(Queue::class); + $queueMock + ->expects($this->any()) + ->method('getQueueName') + ->willReturn($queueName); + + $consumerMock = $this->createMock(DbalConsumer::class); + $consumerMock + ->expects($this->any()) + ->method('getQueue') + ->willReturn($queueMock); + + return $consumerMock; + } +} diff --git a/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php new file mode 100644 index 000000000..4f31958d3 --- /dev/null +++ b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php @@ -0,0 +1,41 @@ +createDbalContext(); + } + + /** + * @param DbalContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeUntilUnsubscribedTest.php b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeUntilUnsubscribedTest.php new file mode 100644 index 000000000..23590144e --- /dev/null +++ b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeUntilUnsubscribedTest.php @@ -0,0 +1,41 @@ +createDbalContext(); + } + + /** + * @param DbalContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerStopOnFalseTest.php b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerStopOnFalseTest.php new file mode 100644 index 000000000..c303b0c29 --- /dev/null +++ b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerStopOnFalseTest.php @@ -0,0 +1,41 @@ +createDbalContext(); + } + + /** + * @param DbalContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +}