Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RdKafka][Symfony] Kafka always consumes messages from the beginning #570

Closed
Steveb-p opened this issue Oct 19, 2018 · 5 comments
Closed

Comments

@Steveb-p
Copy link
Contributor

I have started to use Symfony Messenger component for inter-application communications and stumbled upon a minor issue with Kafka queue implementation. I'm not sure that core issue is part of either Symfony integration or Enqueue, but I've looked over source code and it all points to wrong offset selection in enqueue/rdkafka, so I wanted to ask for advice and / or contribute.

For my example, I've created a simple dispatch:

$bus->dispatch(new DisplayNotification('test'));

(obviously DisplayNotification is my custom message)

I have created a configuration containing:

enqueue:
    async_events:
        enabled: true
        # if you'd like to send send messages onTerminate use spool_producer (it makes response time even lesser):
        # spool_producer: true
    transport:
#        default: '%env(ENQUEUE_DSN)%'
        default: rdkafka
        rdkafka:
            global:
                group.id: app_name
                offset.store.method: file
                auto.commit.interval.ms: 10
                metadata.broker.list: '192.168.56.236:9092'
            topic:
                offset.store.method: file
                offset.store.sync.interval.ms: 60
    client: ~

After dispatching message a couple of times, I have started consuming them:
image

However, upon restarting, I have noticed that messages that were supposed to be already handled are popping up again. So I've tried other implementation: enqueue/fs - and sure enough messages were handled and removed afterwards.

Then, I've tried calling

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic messages --from-beginning --group app_name

as Kafka quickstart suggests (just added --group app_name). This showed me that offsets are stored on Kafka, just PHP process keeps ignoring it.

After some looking, I've noticed that RdKafkaConsumer and in particular RdKafkaConsumer::receive method are responsible for managing offset value. I've noticed that __construct method always sets $this->offset to null, and only other place where offset is actually set is in tests.

Then, I've looked into receive method itself. Excerpt of current code below:

public function receive($timeout = 0)
{
    if (false == $this->subscribed) {
        $this->consumer->assign([new TopicPartition(
            $this->getQueue()->getQueueName(),
            $this->getQueue()->getPartition(),
            $this->offset
        )]);

        $this->subscribed = true;
    }

    $message = null;
    if ($timeout > 0) {
        $message = $this->doReceive($timeout);
    } else {
        while (true) {
            if ($message = $this->doReceive(500)) {
                break;
            }
        }
    }

    return $message;
}

I've replaced $this->consumer->assign() call with:

$this->consumer->subscribe([$this->getQueue()->getQueueName()]);

which, according to https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-kafkaconsumer.subscribe.html, should allow library to automatically select partition for this consumer (am I right?).

Am I correct in thinking that this is a bug, not some sort of misconfiguration on my part? I'm not really adept in Kafka yet.

Is this solution acceptable, or will it cause issues I'm not able to see yet?

@makasim
Copy link
Member

makasim commented Oct 19, 2018

@Steveb-p as far as I know this was fixed in #508

I suggest back-porting the code to 0.8 branch (The fix went to 0.9 version which has not been released yet and is not supported by symfony messenger.

Steveb-p added a commit to Steveb-p/enqueue-dev that referenced this issue Oct 22, 2018
Allow either to use assign or subscribe

With `assign` it is not possible to use the rebalancing from Kafka. So if no offset is set we use `subscribe` and rebalancing is possible and otherwise if a offset is set we are using assign.

@see commit d2fa178
@see php-enqueue#570
@Steveb-p
Copy link
Contributor Author

@makasim Indeed it is, I just noticed it's on master. Sorry to bother :)

I've created a branch with code "backported" (more like copied :) ) to 0.8 branch for your convenience, if you're still going to release 0.8.x versions for a while (since Symfony Messenger component can't work with 0.9 yet as you've mentioned).

@Steveb-p
Copy link
Contributor Author

@makasim Sorry had difficulty creating PR due to Github failing ;)

@makasim
Copy link
Member

makasim commented Oct 22, 2018

@Steveb-p Feel free to open PR, the release process is completely automated. I could do it at any moment.

makasim pushed a commit to php-enqueue/rdkafka that referenced this issue Oct 22, 2018
Allow either to use assign or subscribe

With `assign` it is not possible to use the rebalancing from Kafka. So if no offset is set we use `subscribe` and rebalancing is possible and otherwise if a offset is set we are using assign.

@see commit d2fa17881fcc2c523b8277f3ba30d1d9caeff667
@see php-enqueue/enqueue-dev#570
@makasim
Copy link
Member

makasim commented Oct 22, 2018

@Steveb-p https://github.com/php-enqueue/rdkafka/releases/tag/0.8.40

ASKozienko pushed a commit that referenced this issue Nov 2, 2018
Allow either to use assign or subscribe

With `assign` it is not possible to use the rebalancing from Kafka. So if no offset is set we use `subscribe` and rebalancing is possible and otherwise if a offset is set we are using assign.

@see commit d2fa178
@see #570
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants