From 345b6ac57edfdff3fd53df460a9444b1a4e79d1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= <3183926+Steveb-p@users.noreply.github.com> Date: Mon, 22 Oct 2018 08:35:47 +0200 Subject: [PATCH] Backport changes to topic subscription Allow either to use assign or subscribe With `assign` it is not possible to use the rebalancing from Kafka. So if no offset is set we use `subscribe` and rebalancing is possible and otherwise if a offset is set we are using assign. @see commit d2fa17881fcc2c523b8277f3ba30d1d9caeff667 @see https://github.com/php-enqueue/enqueue-dev/issues/570 --- pkg/rdkafka/RdKafkaConsumer.php | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 9d30be2a7..37bdb8d97 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -99,11 +99,15 @@ public function getQueue() public function receive($timeout = 0) { if (false == $this->subscribed) { - $this->consumer->assign([new TopicPartition( - $this->getQueue()->getQueueName(), - $this->getQueue()->getPartition(), - $this->offset - )]); + if (null === $this->offset) { + $this->consumer->subscribe([$this->getQueue()->getQueueName()]); + } else { + $this->consumer->assign([new TopicPartition( + $this->getQueue()->getQueueName(), + $this->getQueue()->getPartition(), + $this->offset + )]); + } $this->subscribed = true; }