Skip to content

Commit

Permalink
Merge pull request #198 from php-enqueue/dbal-add-priority-support
Browse files Browse the repository at this point in the history
[dbal] add priority support on transport level.
  • Loading branch information
makasim authored Sep 13, 2017
2 parents 392e4ae + 5eae7c6 commit f7e4321
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 99 deletions.
2 changes: 1 addition & 1 deletion pkg/dbal/DbalConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ protected function receiveMessage()
->where('queue = :queue')
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
->orderBy('priority', 'desc')
->addOrderBy('id', 'asc')
->addOrderBy('id', 'desc')
->setMaxResults(1)
;

Expand Down
22 changes: 15 additions & 7 deletions pkg/dbal/DbalProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

class DbalProducer implements PsrProducer
{
/**
* @var int|null
*/
private $priority;

/**
* @var DbalContext
*/
Expand All @@ -29,14 +34,19 @@ public function __construct(DbalContext $context)
/**
* {@inheritdoc}
*
* @param PsrDestination $destination
* @param PsrMessage $message
* @param DbalDestination $destination
* @param DbalMessage $message
*
* @throws Exception
*/
public function send(PsrDestination $destination, PsrMessage $message)
{
InvalidDestinationException::assertDestinationInstanceOf($destination, DbalDestination::class);
InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);

if (null !== $this->priority && null === $message->getPriority()) {
$message->setPriority($this->priority);
}

$body = $message->getBody();
if (is_scalar($body) || null === $body) {
Expand Down Expand Up @@ -111,19 +121,17 @@ public function getDeliveryDelay()
*/
public function setPriority($priority)
{
if (null === $priority) {
return;
}
$this->priority = $priority;

throw new \LogicException('Not implemented');
return $this;
}

/**
* {@inheritdoc}
*/
public function getPriority()
{
return null;
return $this->priority;
}

/**
Expand Down
91 changes: 0 additions & 91 deletions pkg/dbal/Tests/DbalSendPriorityMessagesTest.php

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace Enqueue\Dbal\Tests\Spec;

use Enqueue\Dbal\DbalConnectionFactory;
use Enqueue\Dbal\DbalDestination;
use Enqueue\Dbal\DbalMessage;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendAndReceivePriorityMessagesFromQueueSpec;

/**
* @group functional
*/
class DbalSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceivePriorityMessagesFromQueueSpec
{
/**
* @return PsrContext
*/
protected function createContext()
{
$factory = new DbalConnectionFactory([
'lazy' => true,
'connection' => [
'dbname' => getenv('SYMFONY__DB__NAME'),
'user' => getenv('SYMFONY__DB__USER'),
'password' => getenv('SYMFONY__DB__PASSWORD'),
'host' => getenv('SYMFONY__DB__HOST'),
'port' => getenv('SYMFONY__DB__PORT'),
'driver' => getenv('SYMFONY__DB__DRIVER'),
],
]);

$context = $factory->createContext();
$context->createDataBaseTable();

return $context;
}

/**
* {@inheritdoc}
*
* @return DbalMessage
*/
protected function createMessage(PsrContext $context, $priority)
{
/** @var DbalMessage $message */
$message = $context->createMessage('priority'.$priority);
$message->setPriority($priority);

return $message;
}

/**
* {@inheritdoc}
*
* @return DbalDestination
*/
protected function createQueue(PsrContext $context, $queueName)
{
return parent::createQueue($context, $queueName.time());
}
}

0 comments on commit f7e4321

Please sign in to comment.