Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

badmatch in NetworkClient #449

Open
TC-aLi opened this issue Jan 25, 2022 · 2 comments
Open

badmatch in NetworkClient #449

TC-aLi opened this issue Jan 25, 2022 · 2 comments

Comments

@TC-aLi
Copy link
Contributor

TC-aLi commented Jan 25, 2022

We're using kafka_ex as the driver for a 6 broker kafka cluster. It's integrated in an Erlang app. We also introduced worker_pool.
When reviewing logs, I found this:

=ERROR REPORT====
** Generic server 'wpool_pool-kafka_ex_pool-9' terminating
** Last message in was update_consumer_metadata
** When Server state == #{'__struct__' => 'Elixir.KafkaEx.Server.State', ...,...},...],...},...]},...}
registered_name: 'wpool_pool-kafka_ex_pool-9'
ancestors: ['wpool_pool-kafka_ex_pool-process-sup',kafka_ex_pool,<0.2763.0>]
Supervisor: {local,'wpool_pool-kafka_ex_pool-process-sup'}
Offender: [{pid,<0.3007.0>},{id,'wpool_pool-kafka_ex_pool-9'},{mfargs,{wpool_process,start_link,['wpool_pool-kafka_ex_pool-9','Elixir.KafkaEx.Server0P10AndLater',[[{uris,[{url,9094}]},{consumer_group,<<"kafka_ex">>},{use_ssl,true},{ssl_options,[{certfile,"cert.pem"},{keyfile,"key.pem"}]}]]}},{overrun_warning,1000},{overrun_handler,{'Elixir.KafkaEx.WorkerPool',producer_overrun_handler}},{overrun_warning,infinity},{max_overrun_warnings,infinity},{workers,N},{worker_opt,[]},{queue_type,fifo}]]}},{restart_type,permanent},{shutdown,5000},{child_type,worker}]

Supervisor 'wpool_pool-kafka_ex_pool-process-sup' had child 'wpool_pool-kafka_ex_pool-9' started with wpool_process:start_link('wpool_pool-kafka_ex_pool-9', 'Elixir.KafkaEx.Server0P10AndLater', [...], [{queue_manager,'wpool_pool-kafka_ex_pool-queue-manager'},{time_checker,'wpool_pool-kafka_ex_pool-time-checker'},...]) at <0.2832.0> exit with reason no case clause matching {badmatch,{error,closed}} in wpool_process:handle_info/2 line 126 in context child_terminated

Shutting down worker #PID<0.2832.0>, reason: {{:case_clause, {:badmatch, {:error, :closed}}}, [{:wpool_process, :handle_info, 2, [file: './lib/worker_pool/src/wpool_process.erl', line: 126]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}

The error messages relate these code in worker_pool because there's try catch:
https://github.com/inaka/worker_pool/blob/4.0.1/src/wpool_process.erl#L126
https://github.com/inaka/worker_pool/blob/4.0.1/src/wpool_utils.erl#L58

After reviewing kafka_ex code, I think the only possible place where could cause {:badmatch, {:error, :closed}} is the following code:
https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/network_client.ex#L65-L73

@dantswain
Copy link
Collaborator

Hi @TC-aLi ! Hmm I think the code you linked would handle the {:error, :closed} pattern? Either here https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/network_client.ex#L77 or here https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/network_client.ex#L90

If you're getting this value then it seems like the broker closed the connection and it may be prudent for the network client to crash and reconnect? Is there any way you can get a full stack trace?

@TC-aLi
Copy link
Contributor Author

TC-aLi commented Apr 2, 2022

Hi Dan, thank you for your reply. I made a fix on L65, and the crash disappeared. https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/network_client.ex#L65

I think you're right that the broker closed the connection. I was seeing this issue relative frequently because the use case has:

  1. KAFKA_CONNECTIONS_MAX_IDLE_MS is set to 10 mins.
  2. https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/server.ex#L991 uses Enum.find_value/2. If more brokers are in a cluster, it's more likely that one of the connections will be closed on the broker side.
  3. we're using a worker pool.

But still, I think L65 should be handled just L77 and L90 if the connection is closed remotely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants