Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High Rate Producer Corruption With Large Messages #962

Closed
wingedpig opened this issue Oct 5, 2017 · 22 comments
Closed

High Rate Producer Corruption With Large Messages #962

wingedpig opened this issue Oct 5, 2017 · 22 comments

Comments

@wingedpig
Copy link

wingedpig commented Oct 5, 2017

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: 1.13.0
Kafka Version: kafka_2.11-0.11.0.1
Go Version: 1.9

Configuration

What configuration values are you using for Sarama and Kafka?

Default installation for Kafka, except that delete.topic.enable=true. One instance.

Problem Description

Run the test producer at https://gist.github.com/wingedpig/c635e77fcd42d95cfa7923a541724f6b. It creates a test partition and fills it with 15 messages. Then run the test consumer, at https://gist.github.com/wingedpig/6d4999591fc305b683b9fd7cbeaed240, which tries to read the partition. It fails with the following errors:

[sarama] 2017/10/05 09:45:53 Initializing new client
[sarama] 2017/10/05 09:45:53 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2017/10/05 09:45:53 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2017/10/05 09:45:53 client/metadata fetching metadata for all topics from broker 127.0.0.1:9092
[sarama] 2017/10/05 09:45:53 Connected to broker at 127.0.0.1:9092 (unregistered)
[sarama] 2017/10/05 09:45:53 client/brokers registered new broker #0 at localhost:9092
[sarama] 2017/10/05 09:45:53 Successfully initialized new client
[sarama] 2017/10/05 09:45:53 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2017/10/05 09:45:53 Connected to broker at localhost:9092 (registered as #0)
[sarama] 2017/10/05 09:45:53 consumer/broker/0 added subscription to test/0
[sarama] 2017/10/05 09:45:53 consumer/broker/0 disconnecting due to error processing FetchRequest: kafka: error decoding packet: unknown magic byte (2)
[sarama] 2017/10/05 09:45:53 Closed connection to broker localhost:9092
[sarama] 2017/10/05 09:45:53 kafka: error while consuming test/0: kafka: error decoding packet: unknown magic byte (2)

If you delete the test partition, uncomment the Sleep line in the test producer, and re-run the test, the consumer is able to read the messages without issue.

If you delete the test partition, change the size of the message in the producer (change the 3000 to 2000 in line 50), and re-run the test, the consumer is able to read the messages without issue.

I am new to Kafka, so I wouldn't be surprised if I'm doing something wrong. But I haven't been able to figure out what.

@eapache
Copy link
Contributor

eapache commented Oct 5, 2017

This is weird. The error message isn't the greatest, but it looks like Kafka is sending us back a newer message format than we can handle. I'm not sure if this is a Kafka bug or if we're somehow indicating that we support that version accidentally, or...

What's even weirder is that this version switch seems to be triggered by message size. Is there any difference in the broker-side logs between the two cases?

@wingedpig
Copy link
Author

I'm not an expert Kafka log reader, but I think the logs are the same. I've attached them below.

What's also weird is that adding the Sleep is another way of fixing the problem.

Sending messages of size 3000:

Oct  5 13:42:20 localhost zookeeper-server-start.sh: [2017-10-05 13:42:20,484] INFO Got user-level KeeperException when processing sessionid:0x15eca81f7c00005 type:create cxid:0x5a0 zxid:0x2fa txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,486] INFO Topic creation {"version":1,"partitions":{"0":[0]}} (kafka.admin.AdminUtils$)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,488] INFO [KafkaApi-0] Auto creation of topic test with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
Oct  5 13:42:20 localhost zookeeper-server-start.sh: [2017-10-05 13:42:20,495] INFO Got user-level KeeperException when processing sessionid:0x15eca81f7c00005 type:create cxid:0x5a8 zxid:0x2fd txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor)
Oct  5 13:42:20 localhost zookeeper-server-start.sh: [2017-10-05 13:42:20,496] INFO Got user-level KeeperException when processing sessionid:0x15eca81f7c00005 type:create cxid:0x5a9 zxid:0x2fe txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions (org.apache.zookeeper.server.PrepRequestProcessor)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,504] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions test-0 (kafka.server.ReplicaFetcherManager)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,506] INFO Loading producer state from offset 0 for partition test-0 with message format version 2 (kafka.log.Log)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,507] INFO Completed load of log test-0 with 1 log segments, log start offset 0 and log end offset 0 in 1 ms (kafka.log.Log)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,507] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,507] INFO Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition test-0 (kafka.cluster.Partition)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,507] INFO Replica loaded for partition test-0 with initial high watermark 0 (kafka.cluster.Replica)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,508] INFO Partition [test,0] on broker 0: test-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
Oct  5 13:42:20 localhost kafka-server-start.sh: [2017-10-05 13:42:20,749] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: test-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)

Sending messages of size 2000:

Oct  5 13:43:21 localhost zookeeper-server-start.sh: [2017-10-05 13:43:21,733] INFO Got user-level KeeperException when processing sessionid:0x15eca81f7c00005 type:create cxid:0x5c6 zxid:0x30d txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor)
Oct  5 13:43:21 localhost kafka-server-start.sh: [2017-10-05 13:43:21,736] INFO Topic creation {"version":1,"partitions":{"0":[0]}} (kafka.admin.AdminUtils$)
Oct  5 13:43:21 localhost kafka-server-start.sh: [2017-10-05 13:43:21,740] INFO [KafkaApi-0] Auto creation of topic test with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
Oct  5 13:43:21 localhost zookeeper-server-start.sh: [2017-10-05 13:43:21,745] INFO Got user-level KeeperException when processing sessionid:0x15eca81f7c00005 type:create cxid:0x5ce zxid:0x310 txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor)
Oct  5 13:43:21 localhost zookeeper-server-start.sh: [2017-10-05 13:43:21,748] INFO Got user-level KeeperException when processing sessionid:0x15eca81f7c00005 type:create cxid:0x5cf zxid:0x311 txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions (org.apache.zookeeper.server.PrepRequestProcessor)
Oct  5 13:43:21 localhost kafka-server-start.sh: [2017-10-05 13:43:21,758] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions test-0 (kafka.server.ReplicaFetcherManager)
Oct  5 13:43:21 localhost kafka-server-start.sh: [2017-10-05 13:43:21,761] INFO Loading producer state from offset 0 for partition test-0 with message format version 2 (kafka.log.Log)
Oct  5 13:43:21 localhost kafka-server-start.sh: [2017-10-05 13:43:21,761] INFO Completed load of log test-0 with 1 log segments, log start offset 0 and log end offset 0 in 0 ms (kafka.log.Log)
Oct  5 13:43:21 localhost kafka-server-start.sh: [2017-10-05 13:43:21,762] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
Oct  5 13:43:21 localhost kafka-server-start.sh: [2017-10-05 13:43:21,762] INFO Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition test-0 (kafka.cluster.Partition)
Oct  5 13:43:21 localhost kafka-server-start.sh: [2017-10-05 13:43:21,762] INFO Replica loaded for partition test-0 with initial high watermark 0 (kafka.cluster.Replica)
Oct  5 13:43:21 localhost kafka-server-start.sh: [2017-10-05 13:43:21,762] INFO Partition [test,0] on broker 0: test-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
Oct  5 13:43:22 localhost kafka-server-start.sh: [2017-10-05 13:43:22,000] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: test-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)

@eapache
Copy link
Contributor

eapache commented Oct 6, 2017

The logs look the same to me too.

The fact that it can be fixed by reducing the size or the frequency makes me think it's some sort of volume-of-bytes trigger. If Kafka accumulates enough data in a short enough period of time maybe it is falling back to a different internal format and that's leaking out in the API somehow? I'm kind of guessing. To figure out why Kafka is changing the format I think you'll have to ask upstream.

The obvious solution to this right now is for Sarama to implement the new message format (documented at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets) so that if Kafka sends us that we can just read it. I know @wladh mentioned he was working on it (#901 (comment)) but I don't know how far he got.

@wingedpig
Copy link
Author

Thanks for your response. I did some more digging (I didn't know about the new message format introduced in 0.11.0) and I have found a work-around. Add the following line to server.properties:

log.message.format.version=0.10.2

And it all seems to work.

@eapache
Copy link
Contributor

eapache commented Oct 6, 2017

Good to know. The other ticket is tracking the implementation of the message format in Sarama, so I don't think there's anything else to do here.

@eapache eapache closed this as completed Oct 6, 2017
@aaronlehmann
Copy link

Just wanted to note that the Kafka 0.11 support in #901 doesn't seem to fix this. I was getting the same error reported here, first with sarama c765017, and again with sarama 1.14. For some reason, v1.11.0-44-g3efb95d (which is what I've been using up to now) doesn't seem to be affected.

2017-11-30T15:56:02-08:00 ERROR Error consuming message log_stream/postgres_log_consumer: kafka: error while consuming log_stream_shared/0: kafka: error decoding packet: unknown magic byte (2)

I noticed #990, which looked related, so I tried the master version of sarama. With master, it's still not working, but the error message has changed:

2017-11-30T16:08:06-08:00 ERROR Error consuming message log_stream/postgres_log_consumer: kafka: error while consuming log_stream_shared/0: kafka: insufficient data to decode packet, more bytes expected

Please let me know if I should file a new issue to debug this. The Kafka version is 0.11.0.1. I don't have direct access to the broker's configuration, but I can dig it up if necessary.

cc @wladh

@wladh
Copy link
Contributor

wladh commented Dec 1, 2017

This sounds like the message is corrupted somewhere.
I don't think Kafka would send messages in a different format just because the producer sends messages at higher speed. Also, the initial report was before FetchRequest/Response v4 implementation so Kafka shouldn't have sent us a v2 record anyway.
Is this happening frequently for you right now? Unfortunately we don't seem to have a lot of instrumentation to debug these kind of problems. If you could log the packet that failed the decode in sendAndReceive that would probably be helpful as a starting point.
I don't think a new issue is needed since it looks like the same problem.

@aaronlehmann
Copy link

I managed to capture the packet causing this. I can't share the full packet, but I can share the header.

This is the base64 representation:

AAAAAQARbG9nX3N0cmVhbV9zaGFyZWQAAAABAAAAAAAAAAAAAAIpEOgAAIAAAAAAAAHfjvUAASTU
AAAACQK37iHoAAAAAAAA////////////////////////////////////////AAAAAcCSCQAAAEg2
NzBmMTUxZi0wNmVmLTRmMzEtOWUyOC1kZjM1NjY0NWQ5ZDHokQk=

It's followed by 32661 bytes of payload. In case it's significant, it looks like the payload is not a full message (it's in JSON format and the JSON object isn't complete).

@eapache
Copy link
Contributor

eapache commented Dec 4, 2017

I find the hex-dump version easier to read. Hopefully this converted from base64 properly:

00 00 00 01 00 11 6c 6f
67 5f 73 74 72 65 61 6d
5f 73 68 61 72 65 64 00
00 00 01 00 00 00 00 00
00 00 00 00 00 02 29 10
e8 00 00 80 00 00 00 00
00 01 df 8e f5 00 01 24
d4 00 00 00 09 02 b7 ee
21 e8 00 00 00 00 00 00
ff ff ff ff ff ff ff ff
ff ff ff ff ff ff ff ff
ff ff ff ff ff ff ff ff
ff ff ff ff ff ff 00 00
00 01 c0 92 09 00 00 00
48 36 37 30 66 31 35 31
66 2d 30 36 65 66 2d 34
66 33 31 2d 39 65 32 38
2d 64 66 33 35 36 36 34
35 64 39 64 31 e8 91 09

@eapache
Copy link
Contributor

eapache commented Dec 4, 2017

(FWIW, Wireshark does understand the kafka protocol. If you have the original pcap and could share a screenshot of the decoded header that might save us some headache manually lining up the bytes)

@wladh
Copy link
Contributor

wladh commented Dec 4, 2017

Assuming these are FetchResponse headers and that topic is "log_stream_shared" then the fetch response looks to be in a version older than 4 (it looks like version 0, unless you chopped off an int32 from the beginning), so there shouldn't be any v2 records in it (as I understand it). The whole headers would help as they would indicate the version.
Assuming a RecordBatch, the encoding looks "plausible", although I haven't checked all the details. Furthermore, it seems that the response size is capped at 32768 bytes (which is the default limit), but the indicated record size is 74964. First, it would be cool if you check if you're setting a max limit (Fetch.Max) in your client config.

@aaronlehmann
Copy link

I am not explicitly setting a max fetch limit anywhere, so it should be using the defaults.

@wladh
Copy link
Contributor

wladh commented Dec 4, 2017

What Kafka version are you using? I've tried producing a large (80KB) message on Kafka 0.11.0.1 and it doesn't even need to retry because despite having the 32KB per partition limit the broker sends the entire message.

@aaronlehmann
Copy link

I am also using Kafka 0.11.0.1.

Does it looks like the root cause of the problem is the broker truncating the message to fit in the 32 kb limit?

Would it be useful to try setting the Fetch.Max parameter to something large enough, and see if that makes a difference?

It's curious that this problem doesn't occur with v1.11.0-44-g3efb95d. When I switch back to that version and try to consume the same topic, it seems to work fine.

@wladh
Copy link
Contributor

wladh commented Dec 4, 2017

This is what I could infer from the packet fragment and seems to match the fact that you saw that JSON object was truncated. You can try increasing Fetch.Default to see if it makes any difference.
What are you setting config.Version field to?

One change that looks relevant between v1.11.0-44-g3efb95d and HEAD is PR #905 but it always sets MaxBytes to MaxResponseSize which is 100MB. So I can't think how could it cause issues.

@aaronlehmann
Copy link

Setting Fetch.Default to a large value does seem to work around this. Perhaps sarama isn't recognizing messages that don't fit within the limit correctly?

I am setting Version to V0_9_0_0.

@aaronlehmann
Copy link

Setting Version to V0_10_2_0 instead also appears to work around the problem.

@eapache
Copy link
Contributor

eapache commented Dec 4, 2017

Given you are actually using kafka 0.11, why are you not just setting Version to V0_11_0_0???

@aaronlehmann
Copy link

I'm using sarama-cluster, which sets it to 0.9 by default: https://github.com/bsm/sarama-cluster/blob/5668d10e60245e88009d5cc914f93f73a2b2e9c2/config.go#L96

Until now, I didn't know there was any value in changing this default.

The reason I used 0.10.2.0 in my workaround is that sarama-cluster hasn't updated its dependencies for sarama 1.14. I can manually copy in sarama 1.14 for testing, but dep is very unhappy if I try to vendor both sarama 1.14 and sarama-cluster at once.

@wladh
Copy link
Contributor

wladh commented Dec 5, 2017

But it seems there's something fishy about having version V0_9_0_0 and v2 records. I will have a look into it.

@tcrayford
Copy link
Contributor

@wladh even if your client sets an earlier version, kafka can send back v2 records in protocol versions before 0.10.1.0, unless your fetch size is set to larger than the record batch. This is noted in http://kafka.apache.org/documentation.html#upgrade_11_message_format

@wladh
Copy link
Contributor

wladh commented Dec 5, 2017

There's a change in behaviour in between different API version in Kafka. So, for v0 of Fetch it will send a partial message that will be truncated at the specified limit. Additionally, Kafka will not perform a downgrade on this message, hence the reason why you're seeing a v2 record in v0 FetchResponse. This seems to be a limitation: http://kafka.apache.org/documentation.html#upgrade_11_message_format

The sarama version you said doesn't have the issue is not checking that version of message is less than 2, so it will try to decode the record batch as a message set and will likely end up trying to interpret last byte of last offset delta (which would be always 0) and first 3 bytes of first timestamp (which in your config is -1) as key and will try to read 0x00FFFFFF bytes of data as the key which would fail with insufficient data and the retry mechanism will start and eventually the request limit will be big enough that the whole message will go through and will be downgraded correctly and then correctly consumed by sarama. PR #940 introduced the check that magic is < 2 (before support for records was introduced).
In later versions of Fetch if the limit is lower than the message, the message is sent in its entirety anyway (and the downgrade happens correctly).

In RecordBatch we aren't checking for insufficient data condition when we try to get the encoded records and that will cause the insufficient data error in your case (and that it stops retrying). Although this shouldn't happen during normal operation, I'll send a fix for this case.

(edit: I just saw that @tcrayford mentioned the limitation as well)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants