Skip to content

Commit

Permalink
Allow either to use assign or subscribe
Browse files Browse the repository at this point in the history
With `assign` it is not possible to use the rebalancing from Kafka. So if no offset is set we use `subscribe` and rebalancing is possible and otherwise if a offset is set we are using assign.
  • Loading branch information
Engerim authored Aug 15, 2018
1 parent c52ba7c commit d2fa178
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions pkg/rdkafka/RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
$this->topic = $topic;
$this->subscribed = false;
$this->commitAsync = false;
$this->offset = null;


$this->setSerializer($serializer);
}

Expand Down Expand Up @@ -99,12 +98,15 @@ public function getQueue()
public function receive($timeout = 0)
{
if (false == $this->subscribed) {
$this->consumer->assign([new TopicPartition(
$this->getQueue()->getQueueName(),
$this->getQueue()->getPartition(),
$this->offset
)]);

if ($this->offset === null) {
$this->consumer->subscribe([$this->getQueue()->getQueueName()]);
} else {
$this->consumer->assign([new TopicPartition(
$this->getQueue()->getQueueName(),
$this->getQueue()->getPartition(),
$this->offset
), ]);
}
$this->subscribed = true;
}

Expand Down

0 comments on commit d2fa178

Please sign in to comment.