Skip to content

Commit

Permalink
fix: handle not retriable error cases (#48)
Browse files Browse the repository at this point in the history
This handles errors returning `:not_retriable`

- If it's `:message_too_large` and we only sent one message, we log the
message and drop it from the queue
- If it's `:message_too_large` and we sent multiple messages, we drop
down the max request size and retry. This results in smaller batches
until we only send one message, which is handled above. Hard set
smallest batch size of 500,000 bytes.
- If it's any other `not_retriable` erro, we log the batch we tried to
send, drop the head of the queue, and retry.

This also adds code to ensure we always send at least one message if the
queue has messages. This will prevent having a large message on the
queue above the batch size preventing sending.
  • Loading branch information
btkostner authored May 8, 2023
1 parent 7ae4738 commit 84f8807
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 7 deletions.
44 changes: 37 additions & 7 deletions lib/kafee/producer/async_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,41 @@ defmodule Kafee.Producer.AsyncWorker do
# We ran into an error sending messages to Kafka. We don't clear the queue,
# and we try again.
@doc false
def handle_info({task_ref, error}, %{send_task: %{ref: task_ref}} = state) do
case error do
{:error, :timeout} ->
Logger.error("Hit timeout when sending messages to Kafka")
def handle_info({task_ref, error}, %{queue: queue, send_task: %{ref: task_ref}} = state) do
sent_messages = build_message_batch(queue, state.max_request_size)

anything_else ->
Logger.error("Error when sending messages to Kafka", error: inspect(anything_else))
end
state =
case error do
{:error, {:producer_down, {:not_retriable, {_, _, _, _, :message_too_large}}}}
when length(sent_messages) == 1 ->
Logger.error("Message in queue is too large", data: sent_messages)
%{state | queue: :queue.drop(queue)}

{:error, {:producer_down, {:not_retriable, {_, _, _, _, :message_too_large}}}} ->
new_max_request_size = max(state.max_request_size - 1024, 500_000)

Logger.error(
"The configured `max_request_size` is larger than the Kafka cluster allows. Adjusting to #{new_max_request_size} bytes"
)

%{state | max_request_size: new_max_request_size}

{:error, {:producer_down, {:not_retriable, _}}} ->
Logger.error(
"Last sent batch is not retriable. Dropping the head of the queue and retrying",
data: sent_messages
)

%{state | queue: :queue.drop(queue)}

{:error, :timeout} ->
Logger.error("Hit timeout when sending messages to Kafka")
state

anything_else ->
Logger.error("Error when sending messages to Kafka", error: inspect(anything_else))
state
end

Process.send_after(self(), :send, state.send_throttle_time)
{:noreply, state}
Expand Down Expand Up @@ -385,6 +412,9 @@ defmodule Kafee.Producer.AsyncWorker do
# I know that `:erlang.external_size` won't match what we actually
# send, but it should be under the limit that would cause Kafka errors
case bytes + :erlang.external_size(message) do
total_bytes when batch == [] ->
{:cont, {total_bytes, [message]}}

total_bytes when total_bytes <= max_request_size ->
{:cont, {total_bytes, [message | batch]}}

Expand Down
27 changes: 27 additions & 0 deletions test/kafee/producer/async_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,25 @@ defmodule Kafee.Producer.AsyncWorkerTest do
assert_receive :send
end

@tag capture_log: true
test "any single message too large gets logged and dropped from queue", %{pid: pid, topic: topic} do
message = %Kafee.Producer.Message{
key: "key",
value: String.duplicate("Z", 10_000_000),
topic: topic,
partition: 0
}

log =
capture_log(fn ->
assert :ok = AsyncWorker.queue(pid, [message])
Process.sleep(@wait_timeout)
end)

assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, [^message]))
assert log =~ "Message in queue is too large"
end

test "any other tuple logs error and retries sending messages", %{state: state} do
task = make_fake_task()
state = %{state | send_task: task}
Expand Down Expand Up @@ -330,6 +349,14 @@ defmodule Kafee.Producer.AsyncWorkerTest do

assert 10_000 = length(batch) + length(remaining_batch)
end

test "always returns one message if the queue exists" do
messages = "topic" |> BrodApi.generate_producer_message_list(10) |> :queue.from_list()
expose(AsyncWorker, build_message_batch: 2)

batch = private(AsyncWorker.build_message_batch(messages, 1))
assert 1 = length(batch)
end
end

if Version.match?(System.version(), ">= 1.14.0") do
Expand Down

0 comments on commit 84f8807

Please sign in to comment.