From d2fa17881fcc2c523b8277f3ba30d1d9caeff667 Mon Sep 17 00:00:00 2001 From: Alexander Miehe Date: Wed, 15 Aug 2018 15:49:35 +0200 Subject: [PATCH] Allow either to use assign or subscribe With `assign` it is not possible to use the rebalancing from Kafka. So if no offset is set we use `subscribe` and rebalancing is possible and otherwise if a offset is set we are using assign. --- pkg/rdkafka/RdKafkaConsumer.php | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 9d30be2a7..991ba210c 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -55,8 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd $this->topic = $topic; $this->subscribed = false; $this->commitAsync = false; - $this->offset = null; - + $this->setSerializer($serializer); } @@ -99,12 +98,15 @@ public function getQueue() public function receive($timeout = 0) { if (false == $this->subscribed) { - $this->consumer->assign([new TopicPartition( - $this->getQueue()->getQueueName(), - $this->getQueue()->getPartition(), - $this->offset - )]); - + if ($this->offset === null) { + $this->consumer->subscribe([$this->getQueue()->getQueueName()]); + } else { + $this->consumer->assign([new TopicPartition( + $this->getQueue()->getQueueName(), + $this->getQueue()->getPartition(), + $this->offset + ), ]); + } $this->subscribed = true; }