-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ConsumerMessage timestamp is inconsistent with clients in other languages when producer using compression #885
Comments
That is really weird and off the top of my head I have no idea, sorry. What is filebeat? |
Hmm, googled around a bit. Filebeat will set the timestamp to something specific in some cases: https://github.com/elastic/beats/blob/3865f220cbb5ed3d03ee632aa00f5413dead882e/libbeat/outputs/kafka/client.go#L158-L163. Don't know if that is being hit in your case. Sarama picks the timestamp of the first inner message if relevant when producing: https://github.com/Shopify/sarama/blob/c01858abb625b73a3af51d0798e4ad42c8147093/produce_set.go#L88-L107 |
Filebeat setting timestamp when configured to use v0.10+ of Kafka. (CreateTime timestamp) This part is fine. Kafka is configured to ignore producer timestamp and generate its own timestamp (LogAppendTime). When consuming such recursive messages sarama behavior is different form other clients. That's part is not fine :) All other clients returns outer message timestamp. Probably it is something to do with Kafka itself, i can check 0.10.1 and 0.10.2. May be Kafka should recursively update all nested timestamps with LogAppendTime option configured. |
How about if i rephrase issue like: can we include Semantic will stay backward compatible with existing sarama users and new field can be used to provide parity with other clients? |
rdkafka overrides the inner timestamps at https://github.com/edenhill/librdkafka/blame/6a735dd3b86a93b78f61855a7691439e89864368/src/rdkafka_broker.c#L3779-L3781 because of confluentinc/librdkafka@6aa20e1 but I'm not entirely convinced that logic is correct in all cases? @edenhill I don't think your intention was to completely blow away the inner message timestamps when both are provided? Or is the fact that both are provided at all a bug?
That will work, but I'd like to get to the bottom of this issue, something seems fishy. |
Ok, figured it out, when producer using compression, sarama will pick up inner message timestamp (which is not updated by Kafka - to be verified with latest versions). Other clients respects outer compressed message timestamp. |
@eapache , tested latest v0.10.2.1, same broker behavior. After asking in kafka user list, it seems to be expected, check it out: http://search-hadoop.com/m/Kafka/uyzND1IGfjBm6AxA?subj=Re+Kafka+LogAppendTime+compressed+messages So, do you want pull request or prefer to fix yourself? |
OK, I guess it's intentional. I still find it kind of weird though :) I'm happy to accept a PR for this. |
@eapache , here is pull request. Sorry for delay. I wasn't able to figure out how to write test for that. It looks there are no tests for timestamps at all. |
#885: added BlockTimestamp to ConsumerMessage
Versions
Sarama Version: 1.12.0
Kafka Version: 0.10.0.1
Go Version: go1.8.1 darwin/amd64
Configuration
Kafka configured to use broker timestamps:
log.message.timestamp.type=LogAppendTime
Kafka producer is configured to use compression (other than
none
)Problem Description
Probably more like a question, but i noticed that sarama returns inner message timestamps instead of timestamp of outer compression block when using
ConsumePartition()
function. Relevant code to populateConsumerMessage
struct from FetchResponse: https://github.com/Shopify/sarama/blob/master/consumer.go#L528Which is by some reason different from results by other clients (java, librdkafka). It's only reproducing when Kafka broker is configured to timestamp incoming messages itself (LogAppendTime).
Below is test go snippet which shows difference:
, with my test data it produces:
Probably there is something filebeat specific (which is using sarama as well), i wasn't able to reproduce it with sarama producer myself, but probably somebody can shed light on it?
The text was updated successfully, but these errors were encountered: