Skip to content

Commit

Permalink
Merge pull request #575 from Steveb-p/Steveb-p-patch-1
Browse files Browse the repository at this point in the history
[rdkafka] Backport changes to topic subscription
  • Loading branch information
makasim authored Oct 22, 2018
2 parents 79f1821 + 345b6ac commit 0b091aa
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions pkg/rdkafka/RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,15 @@ public function getQueue()
public function receive($timeout = 0)
{
if (false == $this->subscribed) {
$this->consumer->assign([new TopicPartition(
$this->getQueue()->getQueueName(),
$this->getQueue()->getPartition(),
$this->offset
)]);
if (null === $this->offset) {
$this->consumer->subscribe([$this->getQueue()->getQueueName()]);
} else {
$this->consumer->assign([new TopicPartition(
$this->getQueue()->getQueueName(),
$this->getQueue()->getPartition(),
$this->offset
)]);
}

$this->subscribed = true;
}
Expand Down

0 comments on commit 0b091aa

Please sign in to comment.