diff --git a/pkg/enqueue/Consumption/FallbackSubscriptionConsumer.php b/pkg/enqueue/Consumption/FallbackSubscriptionConsumer.php index cdd6fb13a..15e2f273b 100644 --- a/pkg/enqueue/Consumption/FallbackSubscriptionConsumer.php +++ b/pkg/enqueue/Consumption/FallbackSubscriptionConsumer.php @@ -25,13 +25,13 @@ public function __construct() $this->subscribers = []; } - public function consume(int $timeout = 0): void + public function consume(int $timeoutMs = 0): void { - if (empty($this->subscribers)) { + if (!$subscriberCount = \count($this->subscribers)) { throw new \LogicException('No subscribers'); } - $timeout /= 1000; + $timeout = $timeoutMs / 1000; $endAt = microtime(true) + $timeout; while (true) { @@ -41,13 +41,13 @@ public function consume(int $timeout = 0): void * @var callable $processor */ foreach ($this->subscribers as $queueName => list($consumer, $callback)) { - $message = $consumer->receiveNoWait(); + $message = 1 === $subscriberCount ? $consumer->receive($timeoutMs) : $consumer->receiveNoWait(); if ($message) { if (false === call_user_func($callback, $message, $consumer)) { return; } - } else { + } elseif (1 !== $subscriberCount) { if ($timeout && microtime(true) >= $endAt) { return; }