diff --git a/src/Driver/PhpAmqpDriver.php b/src/Driver/PhpAmqpDriver.php index 02211cf8..8ad7095e 100644 --- a/src/Driver/PhpAmqpDriver.php +++ b/src/Driver/PhpAmqpDriver.php @@ -87,24 +87,29 @@ public function pushMessage($queueName, $message) /** * Remove the next message in line. And if no message is available - * wait $interval seconds. + * wait $duration seconds. * * @param string $queueName - * @param int $interval + * @param int $duration * * @return array An array like array($message, $receipt); */ - public function popMessage($queueName, $interval = 10000) + public function popMessage($queueName, $duration = 5) { - $message = $this->getChannel()->basic_get($queueName); - if (!$message) { - // sleep for 10 ms to prevent hammering CPU - usleep($interval); + $runtime = microtime(true) + $duration; + + while (microtime(true) < $runtime) { + $message = $this->getChannel()->basic_get($queueName); - return [null, null]; + if ($message) { + return [$message->body, $message->get('delivery_tag')]; + } + + // sleep for 10 ms to prevent hammering CPU + usleep(10000); } - return [$message->body, $message->get('delivery_tag')]; + return [null, null]; } /** diff --git a/tests/Driver/PhpAmqpDriverTest.php b/tests/Driver/PhpAmqpDriverTest.php index 4ac7aff1..a04cb51b 100644 --- a/tests/Driver/PhpAmqpDriverTest.php +++ b/tests/Driver/PhpAmqpDriverTest.php @@ -137,13 +137,19 @@ public function testItPopsMessages() public function testItPopsArrayWithNullsWhenThereAreNoMessages() { + $startTime = microtime(true); + $this->phpAmqpChannel - ->expects($this->once()) + ->expects($this->any()) ->method('basic_get') ->with($this->equalTo('foo-queue')) ->willReturn(null); - $this->assertEquals([null, null], $this->driver->popMessage('foo-queue')); + $result = $this->driver->popMessage('foo-queue', 0.1); + $duration = microtime(true) - $startTime; + + $this->assertEquals([null, null], $result); + $this->assertGreaterThan(0.1, $duration); } public function testItAcknowledgesMessage()