Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SIGNAL-7060 preemtively drop large messages from queue #101

Merged
merged 16 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions lib/kafee/producer/async_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,13 @@ defmodule Kafee.Producer.AsyncWorker do
@doc false
def handle_info(_, state), do: {:noreply, state}

# A simple request to add more messages to the queue. Nothing fancy here.
# A simple request to add more messages to the queue.
# Note: will drop large messages and not add it to queue.
@doc false
def handle_cast({:queue, messages}, state) do
new_queue = :queue.join(state.queue, :queue.from_list(messages))
new_messages_queue = messages |> :queue.from_list() |> queue_without_large_messages(state.max_request_bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand, the primary change here is that before this pr, we would still attempt to publish messages that are too large unless the async worker process was terminating in which case we would filter them out.

Whereas, with this change we will pre-emptively drop the large messages and never try to publish them. Is that correct?

I think that makes sense from a functional perspective, but I want to double check my understanding before approving!

Copy link
Contributor Author

@seungjinstord seungjinstord Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup that's 100% correct. Originally we would ignore the state.max_request_bytes on the first send attempt, and just push first, and then handle the errors bubbling up from brod.

Adding to the context: due to increased traffic and size - we are observing brod's related processes failing and the AsyncWorker layer isn't getting the "raw" error messages with the actualy message payload (it seems brod is eating it ahead of bubbling up).

Therefore the other way would be if we already know and have set the state.max_request_byte, we should be safe in filtering the large messges out ahead of time, so the queue won't even have them.

new_queue = :queue.join(state.queue, new_messages_queue)

emit_queue_telemetry(state, :queue.len(new_queue))

Process.send_after(self(), :send, state.throttle_ms)
Expand All @@ -252,7 +255,13 @@ defmodule Kafee.Producer.AsyncWorker do
def terminate(_reason, %{send_task: nil} = state) do
# We only focus on triaging the queue in state. If there are messages too big, we log and don't send.
# Update state with queue just with messages that are acceptable
state = %{state | queue: state_queue_without_large_messages(state)}
state = %{state | queue: queue_without_large_messages(state.queue, state.max_request_bytes)}

count = :queue.len(state.queue)

if count > 0 do
Logger.info("Attempting to send #{count} messages to Kafka before terminate")
end

terminate_send(state)
end
Expand Down Expand Up @@ -316,21 +325,21 @@ defmodule Kafee.Producer.AsyncWorker do
:ok
end

defp state_queue_without_large_messages(state) do
defp queue_without_large_messages(queue, max_request_bytes) do
# messages_beyond_max_bytes are going to be logged and not processed,
# as they are individually already over max_request_bytes in size.

{messages_within_max_bytes_queue, messages_beyond_max_bytes_reversed} =
:queue.fold(
fn message, {acc_queue_messages_within_limit, acc_messages_beyond_limit} ->
if message_within_max_bytes?(message, state.max_request_bytes) do
if message_within_max_bytes?(message, max_request_bytes) do
{:queue.in(message, acc_queue_messages_within_limit), acc_messages_beyond_limit}
else
{acc_queue_messages_within_limit, [message | acc_messages_beyond_limit]}
end
end,
{:queue.new(), []},
state.queue
queue
)

messages_beyond_max_bytes = Enum.reverse(messages_beyond_max_bytes_reversed)
Expand All @@ -339,12 +348,6 @@ defmodule Kafee.Producer.AsyncWorker do
Logger.error("Message in queue is too large, will not push to Kafka", data: message)
end)

count = :queue.len(messages_within_max_bytes_queue)

if count > 0 do
Logger.info("Attempting to send #{count} messages to Kafka before terminate")
end

messages_within_max_bytes_queue
end

Expand Down
115 changes: 86 additions & 29 deletions test/kafee/producer/async_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ defmodule Kafee.Producer.AsyncWorkerTest do
test ":ok removes sent messages from the queue", %{state: state, topic: topic} do
task = make_fake_task()
send_messages = BrodApi.generate_producer_message_list(topic, 4)
remaining_messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ remaining_messages)}
messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ messages)}

assert {:noreply, new_state} = AsyncWorker.handle_info({task.ref, {:ok, 4, 0}}, state)
assert ^remaining_messages = :queue.to_list(new_state.queue)
assert ^messages = :queue.to_list(new_state.queue)
end

test ":ok emits telemetry of remaining messages", %{state: state, topic: topic} do
task = make_fake_task()
send_messages = BrodApi.generate_producer_message_list(topic, 4)
remaining_messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ remaining_messages)}
messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ messages)}

assert {:noreply, _new_state} = AsyncWorker.handle_info({task.ref, {:ok, 4, 0}}, state)
assert_receive {:telemetry_event, [:kafee, :queue], %{count: 3}, %{partition: 0, topic: ^topic}}
Expand All @@ -133,7 +133,7 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

@tag capture_log: true
test "any single message too large gets logged and dropped from queue", %{pid: pid, topic: topic} do
test "any single message too large gets logged and not added to queue", %{pid: pid, topic: topic} do
message_fixture = File.read!("test/support/example/large_message.json")
large_message = String.duplicate(message_fixture, 10)

Expand All @@ -149,8 +149,65 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end)

brod_message = BrodApi.to_kafka_message(message)
refute_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
assert log =~ "Message in queue is too large, will not push to Kafka"

async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()
assert 0 == :queue.len(async_worker_state.queue)
end

@tag capture_log: true
test "any messages too large gets logged and dropped from queue", %{pid: pid, topic: topic} do
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved
[small_message_1] = BrodApi.generate_producer_message_list(topic, 1)
message_fixture = File.read!("test/support/example/large_message.json")
large_message_fixture = String.duplicate(message_fixture, 10)

# This message will skip being sent to Kafka, and only be logged
large_message_1 =
topic
|> BrodApi.generate_producer_message()
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_1")

large_message_2 =
topic
|> BrodApi.generate_producer_message()
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_2")

# scenario: small message first in queue
messages = [small_message_1, large_message_1, large_message_2]

log =
capture_log(fn ->
assert :ok = AsyncWorker.queue(pid, messages)
Process.sleep(@wait_timeout)
end)

expected_large_message_error_log = "Message in queue is too large, will not push to Kafka"
brod_message = BrodApi.to_kafka_message(small_message_1)
assert_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
assert log =~ "Message in queue is too large"

assert 2 == (log |> String.split(expected_large_message_error_log) |> length()) - 1
async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()

# all of the messages in queue are processed or dropped
assert 0 == :queue.len(async_worker_state.queue)

# scenario: large message first in queue
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved
messages = [large_message_1, small_message_1, large_message_2]

log =
capture_log(fn ->
assert :ok = AsyncWorker.queue(pid, messages)
Process.sleep(@wait_timeout)
end)

brod_message = BrodApi.to_kafka_message(large_message_1)
refute_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
brod_message = BrodApi.to_kafka_message(large_message_2)
refute_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
assert 2 == (log |> String.split(expected_large_message_error_log) |> length()) - 1

async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()
assert 0 == :queue.len(async_worker_state.queue)
Expand Down Expand Up @@ -271,21 +328,21 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

test "waits for in flight tasks to complete", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

state = %{state | queue: :queue.from_list(remaining_messages)}
state = %{state | queue: :queue.from_list(messages)}

assert :ok = AsyncWorker.terminate(:normal, state)
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))
end

test "waits for in flight send and sends remaining messages", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

task = make_fake_task()
state = %{state | queue: :queue.from_list(remaining_messages), send_task: task, send_timeout: :infinity}
state = %{state | queue: :queue.from_list(messages), send_task: task, send_timeout: :infinity}

Process.send_after(self(), {task.ref, {:ok, 0, 0}}, 10)
assert :ok = AsyncWorker.terminate(:normal, state)
Expand All @@ -294,31 +351,31 @@ defmodule Kafee.Producer.AsyncWorkerTest do

@tag capture_log: true
test "waits for in flight error and retries sending messages", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

task = make_fake_task()
state = %{state | queue: :queue.from_list(remaining_messages), send_task: task, send_timeout: :infinity}
state = %{state | queue: :queue.from_list(messages), send_task: task, send_timeout: :infinity}

Process.send_after(self(), {task.ref, {:error, :internal_error}}, 10)
assert :ok = AsyncWorker.terminate(:normal, state)
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))
end

test "waits for timeout and retries sending messages", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

task = make_fake_task()
state = %{state | queue: :queue.from_list(remaining_messages), send_task: task, send_timeout: 10}
state = %{state | queue: :queue.from_list(messages), send_task: task, send_timeout: 10}

assert :ok = AsyncWorker.terminate(:normal, state)
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))
end

test "any brod errors are logged before terminate", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

patch(:brod, :sync_produce_request_offset, fn _ref, _timeout ->
{:error, :timeout}
Expand All @@ -335,8 +392,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

test "any raised errors are logged before terminate", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

patch(:brod, :sync_produce_request_offset, fn _ref, _timeout ->
raise RuntimeError, message: "test"
Expand Down Expand Up @@ -369,8 +426,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_1")

remaining_messages = [small_message_1, large_message_1, small_message_2]
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
messages = [small_message_1, large_message_1, small_message_2]
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

log =
capture_log(fn ->
Expand Down Expand Up @@ -401,9 +458,9 @@ defmodule Kafee.Producer.AsyncWorkerTest do
small_message_unit_size = kafka_message_size_bytes(small_message)

small_message_total = Kernel.ceil(max_request_bytes / small_message_unit_size) * 2
remaining_messages = BrodApi.generate_producer_message_list(topic, small_message_total)
messages = BrodApi.generate_producer_message_list(topic, small_message_total)

state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

log =
capture_log(fn ->
Expand Down Expand Up @@ -440,8 +497,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
batch = private(AsyncWorker.build_message_batch(messages, 1_040_384))
assert length(batch) in 10_000..15_000

{_batched_messages, remaining_messages} = batch |> length() |> :queue.split(messages)
remaining_batch = private(AsyncWorker.build_message_batch(remaining_messages, 1_040_384))
{_batched_messages, messages} = batch |> length() |> :queue.split(messages)
remaining_batch = private(AsyncWorker.build_message_batch(messages, 1_040_384))
assert length(remaining_batch) in 5_000..10_000

assert 20_000 = length(batch) + length(remaining_batch)
Expand Down
1 change: 0 additions & 1 deletion test/kafee/producer_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ defmodule Kafee.ProducerIntegrationTest do
end)

refute log =~ "Message in queue is too large"
assert log =~ "brod producer process is currently down"
assert log =~ "Successfully sent messages to Kafka"
end
end
Expand Down