Skip to content

Commit

Permalink
Backport changes to topic subscription
Browse files Browse the repository at this point in the history
Allow either to use assign or subscribe

With `assign` it is not possible to use the rebalancing from Kafka. So if no offset is set we use `subscribe` and rebalancing is possible and otherwise if a offset is set we are using assign.

@see commit d2fa17881fcc2c523b8277f3ba30d1d9caeff667
@see php-enqueue/enqueue-dev#570
  • Loading branch information
Steveb-p authored Oct 22, 2018
1 parent e17743a commit 3debee6
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,15 @@ public function getQueue()
public function receive($timeout = 0)
{
if (false == $this->subscribed) {
$this->consumer->assign([new TopicPartition(
$this->getQueue()->getQueueName(),
$this->getQueue()->getPartition(),
$this->offset
)]);
if (null === $this->offset) {
$this->consumer->subscribe([$this->getQueue()->getQueueName()]);
} else {
$this->consumer->assign([new TopicPartition(
$this->getQueue()->getQueueName(),
$this->getQueue()->getPartition(),
$this->offset
)]);
}

$this->subscribed = true;
}
Expand Down

0 comments on commit 3debee6

Please sign in to comment.