Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SIGNAL-7060 filter out large messages during termination instead of sending to Kafka #95

Merged
56 changes: 49 additions & 7 deletions lib/kafee/producer/async_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,10 @@ defmodule Kafee.Producer.AsyncWorker do
# developers to handle.
@doc false
def terminate(_reason, %{send_task: nil} = state) do
count = :queue.len(state.queue)
Logger.info("Attempting to send #{count} messages to Kafka before terminate")
# We only focus on triaging the queue in state. If there are messages too big, we log and don't send.
# Update state with queue just with messages that are acceptable
state = %{state | queue: state_queue_without_large_messages(state)}

terminate_send(state)
end

Expand Down Expand Up @@ -314,6 +316,42 @@ defmodule Kafee.Producer.AsyncWorker do
:ok
end

defp state_queue_without_large_messages(state) do
# messages_beyond_max_bytes are going to be logged and not processed,
# as they are individually already over max_request_bytes in size.

{messages_within_max_bytes_queue, messages_beyond_max_bytes_reversed} =
:queue.fold(
fn message, {acc_queue_messages_within_limit, acc_messages_beyond_limit} ->
if message_within_max_bytes?(message, state.max_request_bytes) do
{:queue.in(message, acc_queue_messages_within_limit), acc_messages_beyond_limit}
else
{acc_queue_messages_within_limit, [message | acc_messages_beyond_limit]}
end
end,
{:queue.new(), []},
state.queue
)

messages_beyond_max_bytes = Enum.reverse(messages_beyond_max_bytes_reversed)

Enum.each(messages_beyond_max_bytes, fn message ->
Logger.error("Message in queue is too large, will not push to Kafka", data: message)
end)

count = :queue.len(messages_within_max_bytes_queue)

if count > 0 do
Logger.info("Attempting to send #{count} messages to Kafka before terminate")
end

messages_within_max_bytes_queue
end

defp message_within_max_bytes?(message, max_request_bytes) do
max_request_bytes > kafka_message_size_bytes(message)
end

@spec emit_queue_telemetry(State.t(), non_neg_integer()) :: :ok
defp emit_queue_telemetry(state, count) do
:telemetry.execute([:kafee, :queue], %{count: count}, %{
Expand Down Expand Up @@ -379,11 +417,7 @@ defmodule Kafee.Producer.AsyncWorker do
queue
|> :queue.to_list()
|> Enum.reduce_while({0, []}, fn message, {bytes, batch} ->
# I know that `:erlang.external_size` won't match what we actually
# send, but it should be under the limit that would cause Kafka errors
kafka_message = Map.take(message, [:key, :value, :headers])

case bytes + :erlang.external_size(kafka_message) do
case bytes + kafka_message_size_bytes(message) do
total_bytes when batch == [] ->
{:cont, {total_bytes, [message]}}

Expand All @@ -401,4 +435,12 @@ defmodule Kafee.Producer.AsyncWorker do

batch_messages
end

# `:erlang.external_size` won't match what we actually
# send, but it should be under the limit that would cause Kafka errors
defp kafka_message_size_bytes(message_from_queue) do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small refactoring as this logic is used in two functions

message_from_queue
|> Map.take([:key, :value, :headers])
|> :erlang.external_size()
end
end
76 changes: 76 additions & 0 deletions test/kafee/producer/async_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,76 @@ defmodule Kafee.Producer.AsyncWorkerTest do
assert logs =~ "exception was raised trying to send the remaining messages to Kafka"
assert 11 = logs |> String.split("Unsent Kafka message") |> length()
end

@tag capture_log: true
test "any leftover messages that are large during shutdown gets logged and will not publish", %{
Comment on lines +355 to +356
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work on the tests 👍 I hope it wasn't too hard to write an accurate representation here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was fine, other examples were really helpful :)

pid: pid,
topic: topic,
state: state
} do
[small_message_1, small_message_2] = BrodApi.generate_producer_message_list(topic, 2)
message_fixture = File.read!("test/support/example/large_message.json")
large_message_fixture = String.duplicate(message_fixture, 10)

# This message will skip being sent to Kafka, and only be logged
large_message_1 =
topic
|> BrodApi.generate_producer_message()
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_1")

remaining_messages = [small_message_1, large_message_1, small_message_2]
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}

log =
capture_log(fn ->
assert :ok = AsyncWorker.terminate(:normal, state)
Process.sleep(@wait_timeout)
end)

remaining_brod_messages = BrodApi.to_kafka_message([small_message_1, small_message_2])
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))

async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()
assert 0 == :queue.len(async_worker_state.queue)

assert log =~ "Message in queue is too large, will not push to Kafka"
assert log =~ "send 2 messages to Kafka before terminate"
refute log =~ "exception was raised trying to send the remaining messages to Kafka"
refute log =~ "Unsent Kafka message"
end

@tag capture_log: true
test "should handle leftover messages which are each small sized but have a total size exceeding max_request_bytes",
%{
pid: pid,
topic: topic,
state: %{max_request_bytes: max_request_bytes} = state
} do
[small_message] = BrodApi.generate_producer_message_list(topic, 1)
small_message_unit_size = kafka_message_size_bytes(small_message)

small_message_total = Kernel.ceil(max_request_bytes / small_message_unit_size) * 2
remaining_messages = BrodApi.generate_producer_message_list(topic, small_message_total)

state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}

log =
capture_log(fn ->
assert :ok = AsyncWorker.terminate(:normal, state)
Process.sleep(@wait_timeout)
end)

# just assert if called; lower level :brod code might split up the messages into more then one call
assert_called(:brod.produce(_client_id, ^topic, 0, _key, _messages))

async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()
assert 0 == :queue.len(async_worker_state.queue)

assert log =~ "send #{small_message_total} messages to Kafka before terminate"
refute log =~ "exception was raised trying to send the remaining messages to Kafka"
refute log =~ "Unsent Kafka message"
end
end

describe "build_message_batch/1" do
Expand Down Expand Up @@ -425,4 +495,10 @@ defmodule Kafee.Producer.AsyncWorkerTest do
}
end
end

defp kafka_message_size_bytes(message) do
message
|> Map.take([:key, :value, :headers])
|> :erlang.external_size()
end
end