Skip to content

Commit

Permalink
UPDATE: move the message triaging out of terminate_send because it's …
Browse files Browse the repository at this point in the history
…a recursive function
  • Loading branch information
seungjinstord committed Sep 18, 2024
1 parent a56a651 commit 36c6415
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions lib/kafee/producer/async_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ defmodule Kafee.Producer.AsyncWorker do
# developers to handle.
@doc false
def terminate(_reason, %{send_task: nil} = state) do
# We only focus on triaging the queue in state. If there are messages too big, we log and don't send.
# Update state with queue just with messages that are acceptable
state = %{state | queue: state_queue_without_large_messages(state)}

terminate_send(state)
end

Expand All @@ -274,10 +278,6 @@ defmodule Kafee.Producer.AsyncWorker do

@spec terminate_send(State.t()) :: :ok
defp terminate_send(state) do
# We only focus on triaging the queue in state. If there are messages too big, we log and don't send.
# Update state with queue just with messages that are acceptable
state = %{state | queue: state_queue_without_large_messages(state)}

case send_messages(state) do
{:ok, 0, offset} ->
Logger.info("Successfully sent all remaining messages to Kafka before termination")
Expand Down

0 comments on commit 36c6415

Please sign in to comment.