Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer timeouts #425

Open
ulutomaz opened this issue Nov 17, 2020 · 4 comments
Open

Consumer timeouts #425

ulutomaz opened this issue Nov 17, 2020 · 4 comments

Comments

@ulutomaz
Copy link

Versions:

OS: Ubuntu 16.04 LTS
Erlang/OTP: 22.2.2
Elixir: 1.9.4
Kafka_ex: 0.11.0
Kafka cluster (AWS MSK): 2.2.1
Kafka topic (usual setup): 1 topic with 9 partitions

Overview:

We run our setup on AWS (managed Kafka - MSK) and our codebase lives on Linux based EC2. Our setup is distributed, where out of 3 nodes each takes 3 partitions. We run kafka_ex for a few years now, but lately, we experience this strange problem where we lose messages. We did some support / debugging with the AWS support team where on the side of managed Kafka (brokers, interconnection, etc) nothing seems to be wrong. No connection timeouts or anything can be found in logs.

Though on our side in logs we get errors like the one below (from time to time). Out of my head... the last error like this was about two weeks ago.

We did put some effort already into debugging but all things point to this due to the fact that missing messages somehow correlate with the time of this error.

Our codebase can handle this "interrupt" and runs further, but as said we experience message loss.

Error:

2020-11-17 10:06:27.594 [error] Receiving data from broker "<HERE IS OUR KAFKA ADDRESS>":9092 failed with :timeout
2020-11-17 10:06:28.810 [error] GenServer #PID<0.4413.0> terminating
** (MatchError) no match of right hand side value: {:error, :timeout}
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:847: KafkaEx.GenConsumer.commit/1
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:643: KafkaEx.GenConsumer.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :timeout
State: %KafkaEx.GenConsumer.State{acked_offset: 197475917, api_versions: %{fetch: 0, offset_commit: 0, offset_fetch: 0}, auto_offset_reset: :earliest, commit_interval: 1000, commit_threshold: 100, committed_offset: 197475917, consumer_module: OurKafka.Consumer.Consumer, consumer_state: %{partition: 1, topic: "aws-connect"}, current_offset: 197475917, fetch_options: [auto_commit: false, worker_name: #PID<0.4414.0>], generation_id: 1, group: "aws-connect_consumer_group", last_commit: -576066016834, member_id: "kafka_ex-e2e8bd1c-25a3-4c7a-be69-6cbc12a6223d", partition: 1, topic: "aws-connect", worker_name: #PID<0.4414.0>}

Other

  • Our supervision tree is built in a way that under application supervisor there are consumer supervisor and next to it producer supervisoer and those two handles all the consumers and producers.
  • We use Tasks for handling Consumers within the code
@BlueCollarChris
Copy link

@ulutomaz We are also experiencing the same issue when using MSK with ssl enabled

@joshuawscott
Copy link
Member

We've fixed a number of issues around how the consumers and their supervision works, so it wouldn't surprise me to learn there are more issues.

That said, Kafka should not allow you to lose messages if you are not committing an offset until you fully process that offset. I would ensure this is not happening (often an issue is using something like Tasks or GenServer.cast, and allowing the handle_message_set function to return before ensuring all the messages are processed)

I've also seen a number of issues where KafkaEx will detect timeouts where the kafka server doesn't log them. I'm frankly not sure why this happens, but increasing the sync_timeout setting to something much higher than default (say 10-15 seconds) avoids this.

@TC-aLi
Copy link
Contributor

TC-aLi commented Mar 9, 2022

@ulutomaz If you still have the issue, you can try an OTP upgrade #389 (comment)
BTW, we're also on AWS MSK (2.3.1).

@erapert-simplifi
Copy link

erapert-simplifi commented Mar 29, 2023

Encountering this issue

01:09:37.043 [error] Receiving data from broker "REDACTED":9092 failed with :timeout
01:09:37.077 [error] GenServer #PID<0.2312.0> terminating
** (MatchError) no match of right hand side value: {:error, :timeout}
(kafka_ex 0.13.0) lib/kafka_ex/gen_consumer.ex:877: KafkaEx.GenConsumer.commit/1
(kafka_ex 0.13.0) lib/kafka_ex/gen_consumer.ex:668: KafkaEx.GenConsumer.handle_info/2
(stdlib 3.17.2) gen_server.erl:695: :gen_server.try_dispatch/4
(stdlib 3.17.2) gen_server.erl:771: :gen_server.handle_msg/6
(stdlib 3.17.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :timeout

This appears to also be the culprit for sockets not being cleaned up and so the file descriptor count climbs eventually to the ulimit which then prevents more sockets from being opened and the program crashes.

This is happening on relatively recent versions of elixir + erlang on version 0.13 of KafkaEx:

Erlang/OTP 24 [erts-12.3.2.1] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1]
Interactive Elixir (1.14.2) - press Ctrl+C to exit (type h() ENTER for help)

Update: using sync_timeout: 15000 reduces the number of timeouts but does not eliminate it in my case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants