-
Notifications
You must be signed in to change notification settings - Fork 138
[FEATURE] Support kafka LogValidator validate inner records and compression codec when handle producer request with entryFormat=kafka #791
[FEATURE] Support kafka LogValidator validate inner records and compression codec when handle producer request with entryFormat=kafka #791
Conversation
…dec when handle producer request
If we can use this pr. Then we can use direct memory to rebuild kafka records like #673. |
@BewareMyPower PTAL if you have free time. Thanks. :) |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Outdated
Show resolved
Hide resolved
Please rebase to master since it has conflicts with your previous PR. |
Thanks for the reminder, I happen to be doing this job and the conflict has been resolved. :) |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Outdated
Show resolved
Hide resolved
…afkaServiceConfiguration.java Co-authored-by: Yunze Xu <[email protected]>
The failed test does not seem to be related to this pr. |
Here's my test result for encoding before and after this PR. Before (Kafka) represents
We can see the encoding time of I think we cannot accept this performance lost just for compatibility with some special cases for Kafka 0.10 client. If you insisted on this change, please add a new |
Okay, if we add a new |
…ference count refCnt
@BewareMyPower |
This is a known flaky test #809 |
There's a |
Good. Thanks. :) |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeResult.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeResult.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/DecodeResult.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/DecodeResult.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/DecodeResult.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeRequest.java
Outdated
Show resolved
Hide resolved
...impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/KafkaMixedEntryFormatTest.java
Outdated
Show resolved
Hide resolved
...impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/KafkaMixedEntryFormatTest.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KopLogValidator.java
Outdated
Show resolved
Hide resolved
...-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaMixedEntryFormatter.java
Show resolved
Hide resolved
Left some comments. In addition, I cannot see any different tests between We already have Kafka 0.10 client for tests. Does it mean only some older version Golang sarama client can make a difference between Or could you do some simple tests just for |
If the newer sarama client has fixed the problem. Could you give the version? |
here sarama-1002 v1.16 @BewareMyPower |
All comments done. PTAL |
I will review it soon. |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeRequest.java
Outdated
Show resolved
Hide resolved
...-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaMixedEntryFormatter.java
Show resolved
Hide resolved
tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Show resolved
Hide resolved
Overall LGTM. Please check the comments. |
…ession codec when handle producer request with entryFormat=kafka (#791) fixes #687 ### Motivation Previously, in order to support low version kafka clients, such as 0.10 version clients, we verified the data when processing fetch requests. Because the lower message format would maintain the internal message set, according to the production situation I encountered, in a certain sometimes there is an error in the offset of the internal message set, such as the bug in the low version of the sarama client. The problem is that for the low version message format, no matter whether there is a problem with the internal message set, we will regenerate the message records, which is unnecessary in some cases. ### Modifications Support message set verification in production like kafka, and only need to verify when entryFormat=kafka. Because when entryFormat=pulsar, we still cannot avoid doing message conversion during consumption. Co-authored-by: Yunze Xu <[email protected]> Co-authored-by: Kai Wang <[email protected]> Co-authored-by: Huanli Meng <[email protected]>
fixes #687
Motivation
Previously, in order to support low version kafka clients, such as 0.10 version clients, we verified the data when processing fetch requests. Because the lower message format would maintain the internal message set, according to the production situation I encountered, in a certain sometimes there is an error in the offset of the internal message set, such as the bug in the low version of the sarama client.
The problem is that for the low version message format, no matter whether there is a problem with the internal message set, we will regenerate the message records, which is unnecessary in some cases.
Modifications
Support message set verification in production like kafka, and only need to verify when entryFormat=kafka.
Because when entryFormat=pulsar, we still cannot avoid doing message conversion during consumption.