-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: SIGNAL-7060 filter out large messages during termination instead of sending to Kafka #95
Changes from 5 commits
ad705d2
74f0582
afd284e
74ec06f
a56a651
36c6415
a7bc084
2b96d34
3fb13cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -250,8 +250,6 @@ defmodule Kafee.Producer.AsyncWorker do | |
# developers to handle. | ||
@doc false | ||
def terminate(_reason, %{send_task: nil} = state) do | ||
count = :queue.len(state.queue) | ||
Logger.info("Attempting to send #{count} messages to Kafka before terminate") | ||
terminate_send(state) | ||
end | ||
|
||
|
@@ -276,6 +274,10 @@ defmodule Kafee.Producer.AsyncWorker do | |
|
||
@spec terminate_send(State.t()) :: :ok | ||
defp terminate_send(state) do | ||
# We only focus on triaging the queue in state. If there are messages too big, we log and don't send. | ||
# Update state with queue just with messages that are acceptable | ||
state = %{state | queue: state_queue_without_large_messages(state)} | ||
|
||
case send_messages(state) do | ||
{:ok, 0, offset} -> | ||
Logger.info("Successfully sent all remaining messages to Kafka before termination") | ||
|
@@ -314,6 +316,32 @@ defmodule Kafee.Producer.AsyncWorker do | |
:ok | ||
end | ||
|
||
defp state_queue_without_large_messages(state) do | ||
# messages_beyond_max_bytes are going to be logged and not processed, | ||
# as they are individually already over max_request_bytes in size. | ||
|
||
{messages_within_max_bytes, messages_beyond_max_bytes} = | ||
state.queue | ||
|> :queue.to_list() | ||
|> split_messages_by_max_bytes(state.max_request_bytes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Micro optimization here if you want. Instead of moving to a list then to two lists, do it in one operation with a reduce. Shouldn't be a huge issue though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ooh I think that's a pretty significant optimization given this library is gonna be called many times haha I will work on using that approach. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to use |
||
|
||
Enum.each(messages_beyond_max_bytes, fn message -> | ||
Logger.error("Message in queue is too large, will not push to Kafka", data: message) | ||
end) | ||
|
||
count = length(messages_within_max_bytes) | ||
|
||
if count > 0 do | ||
Logger.info("Attempting to send #{count} messages to Kafka before terminate") | ||
end | ||
|
||
:queue.from_list(messages_within_max_bytes) | ||
end | ||
|
||
defp split_messages_by_max_bytes(messages, max_request_bytes) do | ||
Enum.split_with(messages, &(max_request_bytes > kafka_message_size_bytes(&1))) | ||
end | ||
|
||
@spec emit_queue_telemetry(State.t(), non_neg_integer()) :: :ok | ||
defp emit_queue_telemetry(state, count) do | ||
:telemetry.execute([:kafee, :queue], %{count: count}, %{ | ||
|
@@ -379,11 +407,7 @@ defmodule Kafee.Producer.AsyncWorker do | |
queue | ||
|> :queue.to_list() | ||
|> Enum.reduce_while({0, []}, fn message, {bytes, batch} -> | ||
# I know that `:erlang.external_size` won't match what we actually | ||
# send, but it should be under the limit that would cause Kafka errors | ||
kafka_message = Map.take(message, [:key, :value, :headers]) | ||
|
||
case bytes + :erlang.external_size(kafka_message) do | ||
case bytes + kafka_message_size_bytes(message) do | ||
total_bytes when batch == [] -> | ||
{:cont, {total_bytes, [message]}} | ||
|
||
|
@@ -401,4 +425,12 @@ defmodule Kafee.Producer.AsyncWorker do | |
|
||
batch_messages | ||
end | ||
|
||
# `:erlang.external_size` won't match what we actually | ||
# send, but it should be under the limit that would cause Kafka errors | ||
defp kafka_message_size_bytes(message_from_queue) do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small refactoring as this logic is used in two functions |
||
message_from_queue | ||
|> Map.take([:key, :value, :headers]) | ||
|> :erlang.external_size() | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -351,6 +351,76 @@ defmodule Kafee.Producer.AsyncWorkerTest do | |
assert logs =~ "exception was raised trying to send the remaining messages to Kafka" | ||
assert 11 = logs |> String.split("Unsent Kafka message") |> length() | ||
end | ||
|
||
@tag capture_log: true | ||
test "any leftover messages that are large during shutdown gets logged and will not publish", %{ | ||
Comment on lines
+355
to
+356
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice work on the tests 👍 I hope it wasn't too hard to write an accurate representation here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was fine, other examples were really helpful :) |
||
pid: pid, | ||
topic: topic, | ||
state: state | ||
} do | ||
[small_message_1, small_message_2] = BrodApi.generate_producer_message_list(topic, 2) | ||
message_fixture = File.read!("test/support/example/large_message.json") | ||
large_message_fixture = String.duplicate(message_fixture, 10) | ||
|
||
# This message will skip being sent to Kafka, and only be logged | ||
large_message_1 = | ||
topic | ||
|> BrodApi.generate_producer_message() | ||
|> Map.put(:value, large_message_fixture) | ||
|> Map.put(:key, "large_msg_1") | ||
|
||
remaining_messages = [small_message_1, large_message_1, small_message_2] | ||
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity} | ||
|
||
log = | ||
capture_log(fn -> | ||
assert :ok = AsyncWorker.terminate(:normal, state) | ||
Process.sleep(@wait_timeout) | ||
end) | ||
|
||
remaining_brod_messages = BrodApi.to_kafka_message([small_message_1, small_message_2]) | ||
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages)) | ||
|
||
async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state() | ||
assert 0 == :queue.len(async_worker_state.queue) | ||
|
||
assert log =~ "Message in queue is too large, will not push to Kafka" | ||
assert log =~ "send 2 messages to Kafka before terminate" | ||
refute log =~ "exception was raised trying to send the remaining messages to Kafka" | ||
refute log =~ "Unsent Kafka message" | ||
end | ||
|
||
@tag capture_log: true | ||
test "should handle leftover messages which are each small sized but have a total size exceeds max_request_bytes", | ||
%{ | ||
pid: pid, | ||
topic: topic, | ||
state: %{max_request_bytes: max_request_bytes} = state | ||
} do | ||
[small_message] = BrodApi.generate_producer_message_list(topic, 1) | ||
small_message_unit_size = kafka_message_size_bytes(small_message) | ||
|
||
small_message_total = Kernel.ceil(max_request_bytes / small_message_unit_size) * 2 | ||
remaining_messages = BrodApi.generate_producer_message_list(topic, small_message_total) | ||
|
||
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity} | ||
|
||
log = | ||
capture_log(fn -> | ||
assert :ok = AsyncWorker.terminate(:normal, state) | ||
Process.sleep(@wait_timeout) | ||
end) | ||
|
||
# just asswert if called; lower level :brod code might split up the messages into more then one call | ||
seungjinstord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert_called(:brod.produce(_client_id, ^topic, 0, _key, _messages)) | ||
|
||
async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state() | ||
assert 0 == :queue.len(async_worker_state.queue) | ||
|
||
assert log =~ "send #{small_message_total} messages to Kafka before terminate" | ||
refute log =~ "exception was raised trying to send the remaining messages to Kafka" | ||
refute log =~ "Unsent Kafka message" | ||
end | ||
end | ||
|
||
describe "build_message_batch/1" do | ||
|
@@ -425,4 +495,10 @@ defmodule Kafee.Producer.AsyncWorkerTest do | |
} | ||
end | ||
end | ||
|
||
defp kafka_message_size_bytes(message) do | ||
message | ||
|> Map.take([:key, :value, :headers]) | ||
|> :erlang.external_size() | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heads up, this will be called recursively from line 293. Shouldn't be an issue but might take more time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah then we can maybe take the message triaging out of
terminate_send
. Totally missed this recursion step, thanks!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved out in 36c6415