Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaEx.stream timing out trying to fetch more messages even with no_wait_at_logend #357

Open
yogodoshi opened this issue Aug 9, 2019 · 1 comment

Comments

@yogodoshi
Copy link

Issue description

After banging my head trying to understand why I was getting timeouts from Kafka I noticed that the timeout was on a fetch call where the offset param was exactly the last offset of the given partition.

Error:

15:38:51.553 [error] GenServer #PID<0.306.0> terminating
** (stop) exited in: GenServer.call(:kafka_ex, {:fetch, %KafkaEx.Protocol.Fetch.Request{auto_commit: false, client_id: nil, correlation_id: nil, max_bytes: 1000000, min_bytes: 1, offset: 748734, partition: 2, topic: "sms_4pt_3kk_1kk", wait_time: 60000}}, 60000)
    ** (EXIT) time out
    (elixir) lib/gen_server.ex:1010: GenServer.call/3
    (kafka_ex) lib/kafka_ex/stream.ex:149: Enumerable.KafkaEx.Stream.fetch_response/2
    (kafka_ex) lib/kafka_ex/stream.ex:28: anonymous fn/3 in Enumerable.KafkaEx.Stream.reduce/3
    (elixir) lib/stream.ex:1407: Stream.do_resource/5
    (gen_stage) lib/gen_stage/streamer.ex:24: GenStage.Streamer.handle_demand/2
    (gen_stage) lib/gen_stage.ex:2099: GenStage.noreply_callback/3
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
Last message: {:"$gen_producer", {#PID<0.307.0>, #Reference<0.1745854737.2488270849.18939>}, {:ask, 500}}
State: #Function<20.35756501/1 in Stream.do_list_resource/6>

Checking that:

iex(3)> KafkaEx.latest_offset("sms_4pt_3kk_1kk", 2)  
[
  %KafkaEx.Protocol.Offset.Response{
    partition_offsets: [
      %{error_code: :no_error, offset: [748734], partition: 2}
    ],
    topic: "sms_4pt_3kk_1kk"
  }
]

Debugging

So I went to check how KafkaEx.stream worked to see if I was doing something wrong and here is what I found out:

In the Stream implementation, there is the next_fun which:

  1. fetches more messages from Kafka
  2. commits them or not
  3. calls stream_control which decides if it should continue the Stream process or not

And the issue seems to be in the order these things are done because at one moment the stream will reach the offset and this is what looks like it is happening:

  1. try to fetch messages from the latest offset (748734 in my case)
  2. seems like Kafka doesn't answer as it doesn't have any new message to send
  3. Genserver times out

Maybe there should be a check before doing the fetch call if the given offset is the latest and simply not do it if so? 🤔

I don't know if this is an edge case because of a combination of timeout settings and Kafka consumer settings, if it is something related to Flow, or if this is all wrong 😆

Code and dependencies versions

I'm using:

  • kafka_ex from the commit 08e91caaaf2e79b676b822efedbcd90c3bbe518e (because I wanted something that was merged after the last release;
  • Kafka 0.11.0
  • Flow 0.14

My code on a high level (let me know if you want details of any part):

topic_name
|> retrieve_partitions_ids
|> create_streams(topic_name)
|> Flow.from_enumerables()
|> Flow.map(&SmsEtl.Flow.GenConsumer.prepare_for_ets/1)
|> Flow.partition(key: {:elem, 0})
|> Flow.reduce(fn -> ets_table_ref end, &save_kafka_message_to_ets/2)
|> Flow.run()
|> create_csvs_and_manifest(table_name)
|> SmsEtl.S3.upload_manifest_and_its_csvs()
|> persist_on_redshift(insert_method)

defp create_streams(partitions_ids, topic_name) do
  Enum.map(partitions_ids, fn partition_id ->
  KafkaEx.stream(topic_name, partition_id, no_wait_at_logend: true, auto_commit: false, wait_time: 30_000, offset: 0)
  end)
end
@joshuawscott
Copy link
Member

in the latest release 0.11 we have fixed the stream API to support the global sync_timeout setting. Perhaps you need the wait_time to be less than the sync_timeout?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants