Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header support for kafka #955

Merged
merged 6 commits into from
Sep 25, 2019

Conversation

TiMESPLiNTER
Copy link
Contributor

This bumps the dependency on rdkafka extension from version 3.0.3 to 3.1.0.

@makasim
Copy link
Member

makasim commented Sep 25, 2019

looks good to me. @Steveb-p wdyt?

@TiMESPLiNTER
Copy link
Contributor Author

I get the following (I guess) non-related error running the functional tests:

1) Enqueue\Dbal\Tests\Spec\Mysql\DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest::test

Failed asserting that two arrays are equal.

--- Expected

+++ Actual

@@ @@

 Array (

     0 => 'fooBody'

-    1 => 'barBody'

 )

/mqdev/vendor/queue-interop/queue-spec/src/SubscriptionConsumerConsumeFromAllSubscribedQueuesSpec.php:63

@Steveb-p
Copy link
Contributor

I was sure I did open a PR earlier with it: #880. Closed it since a lot has changed.

However, I'd suggest getting the part with backward compatibility ported here?

// Merge headers passed from Kafka with possible earlier serialized payload headers. Prefer Kafka's.
// Note: Requires phprdkafka >= 3.1.0
if (isset($kafkaMessage->headers)) {
    $message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers));
}

This way messages produced by earlier versions of php-enqueue would still be handled.

I'll give it a look after work, but it should be fine :)

@TiMESPLiNTER
Copy link
Contributor Author

TiMESPLiNTER commented Sep 25, 2019

So should I just copy over the code from your closed PR into this PR? Do we still need the segfault check?

        // Note: Topic::producev method exists in phprdkafka > 3.1.0
        // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version
        if (method_exists($topic, 'producev')) {
            // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault
            if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=')
                && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) {
                trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING);
            }
            $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
        } else {
            $topic->produce($partition, 0 /* must be 0 */, $payload, $key);
        }

@Steveb-p
Copy link
Contributor

So should I just copy over the code from your closed PR into this PR?

        // Note: Topic::producev method exists in phprdkafka > 3.1.0
        // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version
        if (method_exists($topic, 'producev')) {
            // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault
            if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=')
                && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) {
                trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING);
            }
            $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
        } else {
            $topic->produce($partition, 0 /* must be 0 */, $payload, $key);
        }

I'll be ok with enforcing higher level of phprdkafka/librdkafka instead and keeping your code that changes this to call producev by default.

Yes actually. That would result in still allowing to work with lower phprdkafka versions, while using the new producev method when available. There is a lot of recent contributions to phprdkafka (Thanks to Nick), but there are also reports about some segfaults happening sometimes, so I'm not keen to forcing usage of those versions yet (but that's only my opinion).

My original code should probably be changed to throw an Exception or trigger a full error instead, because phprdkafka 3.1.0 will simply not producev message properly with librdkafka 1.0.0 (segfault was guaranteed to happen). Or it should drop to produce call instead. WDYT @TiMESPLiNTER ?

@TiMESPLiNTER
Copy link
Contributor Author

@Steveb-p how about dropping to produce and trigger a PHP warning in the 3.1.0/1.0.0 case?

@Steveb-p
Copy link
Contributor

@TiMESPLiNTER 👍

@TiMESPLiNTER
Copy link
Contributor Author

TiMESPLiNTER commented Sep 25, 2019

@Steveb-p @makasim I added the functionality as discussed. Please have a look over the code again if you find some time. 😃

I think we could include this in the next bugfix release for 0.9.x? What would I need to do to include it there? Also make a PR against the 0.9 branch?

@Steveb-p
Copy link
Contributor

I think we could include this in the next bugfix release for 0.9.x? What would I need to do to include it there? Also make a PR against the 0.9 branch?

0.9 branch afair is actually a "tag" for initial 0.9 release ;)

master branch is the base for new releases, so it's fine as it is.

@Steveb-p
Copy link
Contributor

@TiMESPLiNTER would you mind adding

https://github.com/php-enqueue/enqueue-dev/pull/880/files#diff-30b559bffa4a02ebf46fbef2767f7a01R175-R180

as well?

It would provide cross-compatibility with earlier versions (which might actually be important since Kafka can store all historical messages, right? 😄 )

And the inclusion of that code for produce fallback means that version from ^3.0.3 to ^3.1.0 is no longer required.

pkg/rdkafka/composer.json Outdated Show resolved Hide resolved
@TiMESPLiNTER
Copy link
Contributor Author

@TiMESPLiNTER would you mind adding

https://github.com/php-enqueue/enqueue-dev/pull/880/files#diff-30b559bffa4a02ebf46fbef2767f7a01R175-R180

as well?

This code is currently already in master from what I see. Just the producing functionality is currently missing header support.

@Steveb-p
Copy link
Contributor

@makasim IMO it's ready. As usual I'm not merging it since you're the owner 😈

@makasim makasim merged commit b11fd8a into php-enqueue:master Sep 25, 2019
@TiMESPLiNTER TiMESPLiNTER deleted the feat/kafka-header-support branch September 25, 2019 13:39
@TiMESPLiNTER
Copy link
Contributor Author

When is the next released planned (including this "bug fix")?

@Steveb-p
Copy link
Contributor

Steveb-p commented Sep 26, 2019

When is the next released planned (including this "bug fix")?

@TiMESPLiNTER until @makasim creates a new version (which should be soon™) you can use this functionality of composer to force it to use your fork:
https://getcomposer.org/doc/05-repositories.md#loading-a-package-from-a-vcs-repository

There are a few use cases for this. The most common one is maintaining your own fork of a third party library. If you are using a certain library for your project and you decide to change something in the library, you will want your project to use the patched version. If the library is on GitHub (this is the case most of the time), you can simply fork it there and push your changes to your fork. After that you update the project's composer.json. All you have to do is add your fork as a repository and update the version constraint to point to your custom branch. In composer.json, you should prefix your custom branch name with "dev-". For version constraint naming conventions see Libraries for more information.

So, in your case it might be:

{
    "repositories": [
        {
            "type": "vcs",
            "url": "https://github.com/TiMESPLIiNTER/enqueue/rdkafka"
        }
    ],
    "require": {
        "enqueue/rdkafka": "^0.9.13 | dev->>BRANCH-NAME<<"
    }
}

Adding ^0.9.13 will allow you to update to a newer version once it's available (by using composer update enqueue/rdkafka), if you have prefer-stable:true in composer.json config (https://getcomposer.org/doc/04-schema.md#prefer-stable)

@makasim
Copy link
Member

makasim commented Sep 26, 2019

Ths fix already in enqueue/rdkafka, you require dev version for now.

I'll do release when I have time

@TiMESPLiNTER
Copy link
Contributor Author

Okay, just was wondering if something is planned. All good I'll use dev-master in the meantime.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants