Skip to content

Commit

Permalink
Merge pull request #314 from php-enqueue/kafka-set-offset
Browse files Browse the repository at this point in the history
[kafka] add ability to set offset.
  • Loading branch information
makasim authored Jan 5, 2018
2 parents ac8f74e + 3c1fd49 commit 9c83fa0
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 7 deletions.
21 changes: 21 additions & 0 deletions docs/transport/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ
* [Send message to queue](#send-message-to-queue)
* [Consume message](#consume-message)
* [Serialize message](#serialize-message)
* [Chnage offset](#change-offset)

## Installation

Expand Down Expand Up @@ -84,6 +85,9 @@ $fooQueue = $psrContext->createQueue('foo');

$consumer = $psrContext->createConsumer($fooQueue);

// Enable async commit to gain better performance.
//$consumer->setCommitAsync(true);

$message = $consumer->receive();

// process a message
Expand Down Expand Up @@ -115,4 +119,21 @@ class FooSerializer implements Serializer
$psrContext->setSerializer(new FooSerializer());
```

## Change offset

By default consumers starts from the beginning of the topic and updates the offset while you are processing messages.
There is an ability to change the current offset.

```php
<?php
/** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */

$fooQueue = $psrContext->createQueue('foo');

$consumer = $psrContext->createConsumer($fooQueue);
$consumer->setOffset(123);

$message = $consumer->receive(2000);
```

[back to index](index.md)
22 changes: 21 additions & 1 deletion pkg/rdkafka/RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrMessage;
use RdKafka\KafkaConsumer;
use RdKafka\TopicPartition;

class RdKafkaConsumer implements PsrConsumer
{
Expand Down Expand Up @@ -36,6 +37,11 @@ class RdKafkaConsumer implements PsrConsumer
*/
private $commitAsync;

/**
* @var int|null
*/
private $offset;

/**
* @param KafkaConsumer $consumer
* @param RdKafkaContext $context
Expand All @@ -49,6 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
$this->topic = $topic;
$this->subscribed = false;
$this->commitAsync = false;
$this->offset = null;

$this->setSerializer($serializer);
}
Expand All @@ -69,6 +76,15 @@ public function setCommitAsync($async)
$this->commitAsync = (bool) $async;
}

public function setOffset($offset)
{
if ($this->subscribed) {
throw new \LogicException('The consumer has already subscribed.');
}

$this->offset = $offset;
}

/**
* {@inheritdoc}
*/
Expand All @@ -83,7 +99,11 @@ public function getQueue()
public function receive($timeout = 0)
{
if (false == $this->subscribed) {
$this->consumer->subscribe([$this->topic->getTopicName()]);
$this->consumer->assign([new TopicPartition(
$this->getQueue()->getQueueName(),
$this->getQueue()->getPartition(),
$this->offset
)]);

$this->subscribed = true;
}
Expand Down
71 changes: 65 additions & 6 deletions pkg/rdkafka/Tests/RdKafkaConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
->expects($this->once())
->method('subscribe')
->with(['dest'])
->method('assign')
;
$kafkaConsumer
->expects($this->once())
Expand All @@ -70,6 +69,36 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
$this->assertNull($consumer->receive(1000));
}

public function testShouldPassProperlyConfiguredTopicPartitionOnAssign()
{
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
->expects($this->once())
->method('assign')
;
$kafkaConsumer
->expects($this->any())
->method('consume')
->willReturn($kafkaMessage)
;

$consumer = new RdKafkaConsumer(
$kafkaConsumer,
$this->createContextMock(),
$destination,
$this->createSerializerMock()
);

$consumer->receive(1000);
$consumer->receive(1000);
$consumer->receive(1000);
}

public function testShouldSubscribeOnFirstReceiveOnly()
{
$destination = new RdKafkaTopic('dest');
Expand All @@ -80,8 +109,7 @@ public function testShouldSubscribeOnFirstReceiveOnly()
$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
->expects($this->once())
->method('subscribe')
->with(['dest'])
->method('assign')
;
$kafkaConsumer
->expects($this->any())
Expand All @@ -101,6 +129,38 @@ public function testShouldSubscribeOnFirstReceiveOnly()
$consumer->receive(1000);
}

public function testThrowOnOffsetChangeAfterSubscribing()
{
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
->expects($this->once())
->method('assign')
;
$kafkaConsumer
->expects($this->any())
->method('consume')
->willReturn($kafkaMessage)
;

$consumer = new RdKafkaConsumer(
$kafkaConsumer,
$this->createContextMock(),
$destination,
$this->createSerializerMock()
);

$consumer->receive(1000);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The consumer has already subscribed.');
$consumer->setOffset(123);
}

public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
{
$destination = new RdKafkaTopic('dest');
Expand All @@ -114,8 +174,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
->expects($this->once())
->method('subscribe')
->with(['dest'])
->method('assign')
;
$kafkaConsumer
->expects($this->once())
Expand Down

0 comments on commit 9c83fa0

Please sign in to comment.