Skip to content

Commit

Permalink
Merge pull request #114 from php-enqueue/rpc-cleanup
Browse files Browse the repository at this point in the history
RPC Deletes Reply Queue After Receive Message
  • Loading branch information
makasim authored Jun 14, 2017
2 parents cea5a50 + 863166b commit 7c3a5ff
Show file tree
Hide file tree
Showing 10 changed files with 754 additions and 185 deletions.
2 changes: 1 addition & 1 deletion docs/client/rpc_call.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ $promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);

$replyMessages = [];
foreach ($promises as $promise) {
$replyMessages[] = $promise->getMessage();
$replyMessages[] = $promise->receive();
}
```

Expand Down
2 changes: 1 addition & 1 deletion docs/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ $message = $psrContext->createMessage('Hi there!');
$rpcClient = new RpcClient($psrContext);

$promise = $rpcClient->callAsync($queue, $message, 1);
$replyMessage = $promise->getMessage();
$replyMessage = $promise->receive();
```

There is also extensions for the consumption component.
Expand Down
59 changes: 53 additions & 6 deletions pkg/enqueue/Client/RpcClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Rpc\Promise;
use Enqueue\Rpc\TimeoutException;
use Enqueue\Util\UUID;

class RpcClient
Expand Down Expand Up @@ -38,7 +39,7 @@ public function __construct(ProducerInterface $producer, PsrContext $context)
*/
public function call($topic, $message, $timeout)
{
return $this->callAsync($topic, $message, $timeout)->getMessage();
return $this->callAsync($topic, $message, $timeout)->receive();
}

/**
Expand All @@ -62,9 +63,11 @@ public function callAsync($topic, $message, $timeout)

if ($message->getReplyTo()) {
$replyQueue = $this->context->createQueue($message->getReplyTo());
$deleteReplyQueue = false;
} else {
$replyQueue = $this->context->createTemporaryQueue();
$message->setReplyTo($replyQueue->getQueueName());
$deleteReplyQueue = true;
}

if (false == $message->getCorrelationId()) {
Expand All @@ -73,10 +76,54 @@ public function callAsync($topic, $message, $timeout)

$this->producer->send($topic, $message);

return new Promise(
$this->context->createConsumer($replyQueue),
$message->getCorrelationId(),
$timeout
);
$correlationId = $message->getCorrelationId();

$receive = function () use ($replyQueue, $timeout, $correlationId) {
$endTime = time() + ((int) ($timeout / 1000));
$consumer = $this->context->createConsumer($replyQueue);

do {
if ($message = $consumer->receive($timeout)) {
if ($message->getCorrelationId() === $correlationId) {
$consumer->acknowledge($message);

return $message;
}

$consumer->reject($message, true);
}
} while (time() < $endTime);

throw TimeoutException::create($timeout, $correlationId);
};

$receiveNoWait = function () use ($replyQueue, $correlationId) {
static $consumer;

if (null === $consumer) {
$consumer = $this->context->createConsumer($replyQueue);
}

if ($message = $consumer->receiveNoWait()) {
if ($message->getCorrelationId() === $correlationId) {
$consumer->acknowledge($message);

return $message;
}

$consumer->reject($message, true);
}
};

$finally = function (Promise $promise) use ($replyQueue) {
if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) {
$this->context->deleteQueue($replyQueue);
}
};

$promise = new Promise($receive, $receiveNoWait, $finally);
$promise->setDeleteReplyQueue($deleteReplyQueue);

return $promise;
}
}
118 changes: 89 additions & 29 deletions pkg/enqueue/Rpc/Promise.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,135 @@

namespace Enqueue\Rpc;

use Enqueue\Psr\PsrConsumer;
use Enqueue\Psr\PsrMessage;

class Promise
{
/**
* @var PsrConsumer
* @var \Closure
*/
private $consumer;
private $receiveCallback;

/**
* @var int
* @var \Closure
*/
private $timeout;
private $receiveNoWaitCallback;

/**
* @var string
* @var \Closure
*/
private $correlationId;
private $finallyCallback;

/**
* @param PsrConsumer $consumer
* @param string $correlationId
* @param int $timeout
* @var bool
*/
public function __construct(PsrConsumer $consumer, $correlationId, $timeout)
private $deleteReplyQueue;

/**
* @var PsrMessage
*/
private $message;

/**
* @param \Closure $receiveCallback
* @param \Closure $receiveNoWaitCallback
* @param \Closure $finallyCallback
*/
public function __construct(\Closure $receiveCallback, \Closure $receiveNoWaitCallback, \Closure $finallyCallback)
{
$this->consumer = $consumer;
$this->timeout = $timeout;
$this->correlationId = $correlationId;
$this->receiveCallback = $receiveCallback;
$this->receiveNoWaitCallback = $receiveNoWaitCallback;
$this->finallyCallback = $finallyCallback;

$this->deleteReplyQueue = true;
}

/**
* Blocks until message received or timeout expired.
*
* @deprecated use "receive" instead
*
* @throws TimeoutException if the wait timeout is reached
*
* @return PsrMessage
*/
public function getMessage()
{
$endTime = time() + $this->timeout;

while (time() < $endTime) {
if ($message = $this->consumer->receive($this->timeout)) {
if ($message->getCorrelationId() === $this->correlationId) {
$this->consumer->acknowledge($message);
return $this->receive();
}

return $message;
/**
* Blocks until message received or timeout expired.
*
* @throws TimeoutException if the wait timeout is reached
*
* @return PsrMessage
*/
public function receive()
{
if (null == $this->message) {
try {
if ($message = $this->doReceive($this->receiveCallback)) {
$this->message = $message;
}
} finally {
call_user_func($this->finallyCallback, $this);
}
}

return $this->message;
}

/**
* Non blocking function. Returns message or null.
*
* @return PsrMessage|null
*/
public function receiveNoWait()
{
if (null == $this->message) {
if ($message = $this->doReceive($this->receiveNoWaitCallback)) {
$this->message = $message;

$this->consumer->reject($message, true);
call_user_func($this->finallyCallback, $this);
}
}

throw TimeoutException::create($this->timeout, $this->correlationId);
return $this->message;
}

/**
* @param int $timeout
* On TRUE deletes reply queue after getMessage call.
*
* @param bool $delete
*/
public function setTimeout($timeout)
public function setDeleteReplyQueue($delete)
{
$this->timeout = $timeout;
$this->deleteReplyQueue = (bool) $delete;
}

/**
* @return int
* @return bool
*/
public function getTimeout()
public function isDeleteReplyQueue()
{
return $this->timeout;
return $this->deleteReplyQueue;
}

/**
* @param \Closure $cb
*
* @return PsrMessage
*/
private function doReceive(\Closure $cb)
{
$message = call_user_func($cb, $this);

if (null !== $message && false == $message instanceof PsrMessage) {
throw new \RuntimeException(sprintf(
'Expected "%s" but got: "%s"', PsrMessage::class, is_object($message) ? get_class($message) : gettype($message)));
}

return $message;
}
}
58 changes: 52 additions & 6 deletions pkg/enqueue/Rpc/RpcClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function __construct(PsrContext $context)
*/
public function call(PsrDestination $destination, PsrMessage $message, $timeout)
{
return $this->callAsync($destination, $message, $timeout)->getMessage();
return $this->callAsync($destination, $message, $timeout)->receive();
}

/**
Expand All @@ -51,9 +51,11 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim

if ($message->getReplyTo()) {
$replyQueue = $this->context->createQueue($message->getReplyTo());
$deleteReplyQueue = false;
} else {
$replyQueue = $this->context->createTemporaryQueue();
$message->setReplyTo($replyQueue->getQueueName());
$deleteReplyQueue = true;
}

if (false == $message->getCorrelationId()) {
Expand All @@ -62,10 +64,54 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim

$this->context->createProducer()->send($destination, $message);

return new Promise(
$this->context->createConsumer($replyQueue),
$message->getCorrelationId(),
$timeout
);
$correlationId = $message->getCorrelationId();

$receive = function () use ($replyQueue, $timeout, $correlationId) {
$endTime = time() + ((int) ($timeout / 1000));
$consumer = $this->context->createConsumer($replyQueue);

do {
if ($message = $consumer->receive($timeout)) {
if ($message->getCorrelationId() === $correlationId) {
$consumer->acknowledge($message);

return $message;
}

$consumer->reject($message, true);
}
} while (time() < $endTime);

throw TimeoutException::create($timeout, $correlationId);
};

$receiveNoWait = function () use ($replyQueue, $correlationId) {
static $consumer;

if (null === $consumer) {
$consumer = $this->context->createConsumer($replyQueue);
}

if ($message = $consumer->receiveNoWait()) {
if ($message->getCorrelationId() === $correlationId) {
$consumer->acknowledge($message);

return $message;
}

$consumer->reject($message, true);
}
};

$finally = function (Promise $promise) use ($replyQueue) {
if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) {
$this->context->deleteQueue($replyQueue);
}
};

$promise = new Promise($receive, $receiveNoWait, $finally);
$promise->setDeleteReplyQueue($deleteReplyQueue);

return $promise;
}
}
Loading

0 comments on commit 7c3a5ff

Please sign in to comment.