-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: SIGNAL-7060 filter out large messages during termination instead of sending to Kafka #95
feat: SIGNAL-7060 filter out large messages during termination instead of sending to Kafka #95
Conversation
… log them instead of pushing to Kafka
|
||
# `:erlang.external_size` won't match what we actually | ||
# send, but it should be under the limit that would cause Kafka errors | ||
defp kafka_message_size_bytes(message_from_queue) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small refactoring as this logic is used in two functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work finding, fixing, and writing tests to confirm. This should be fine to merge as is but I did add two small optimization notes.
@tag capture_log: true | ||
test "any leftover messages that are large during shutdown gets logged and will not publish", %{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work on the tests 👍 I hope it wasn't too hard to write an accurate representation here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was fine, other examples were really helpful :)
lib/kafee/producer/async_worker.ex
Outdated
{messages_within_max_bytes, messages_beyond_max_bytes} = | ||
state.queue | ||
|> :queue.to_list() | ||
|> split_messages_by_max_bytes(state.max_request_bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Micro optimization here if you want. Instead of moving to a list then to two lists, do it in one operation with a reduce. Shouldn't be a huge issue though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooh I think that's a pretty significant optimization given this library is gonna be called many times haha I will work on using that approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use :queue.fold
in a7bc084
lib/kafee/producer/async_worker.ex
Outdated
@@ -276,6 +274,10 @@ defmodule Kafee.Producer.AsyncWorker do | |||
|
|||
@spec terminate_send(State.t()) :: :ok | |||
defp terminate_send(state) do | |||
# We only focus on triaging the queue in state. If there are messages too big, we log and don't send. | |||
# Update state with queue just with messages that are acceptable | |||
state = %{state | queue: state_queue_without_large_messages(state)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heads up, this will be called recursively from line 293. Shouldn't be an issue but might take more time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah then we can maybe take the message triaging out of terminate_send
. Totally missed this recursion step, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved out in 36c6415
…a recursive function
…doing queue -> list -> queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀🚀🚀
I don't have much to contribute here 😅 — thanks for tackling this @seungjinstord and for the quick reviews @btkostner !
Co-authored-by: Sam Hunter <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
An automated release has been created for you. --- ## [3.1.0](v3.0.3...v3.1.0) (2024-09-19) ### Features * SIGNAL-7060 filter out large messages during termination instead of sending to Kafka ([#95](#95)) ([3bf5f81](3bf5f81)) ### Miscellaneous * Sync files with stordco/common-config-elixir ([#85](#85)) ([c377a8e](c377a8e)) * Sync files with stordco/common-config-elixir ([#87](#87)) ([5dcc51d](5dcc51d)) * Sync files with stordco/common-config-elixir ([#88](#88)) ([51f6add](51f6add)) * Sync files with stordco/common-config-elixir ([#89](#89)) ([f6b8668](f6b8668)) * Sync files with stordco/common-config-elixir ([#90](#90)) ([ba97bf3](ba97bf3)) * Sync files with stordco/common-config-elixir ([#91](#91)) ([2c013fe](2c013fe)) * Sync files with stordco/common-config-elixir ([#92](#92)) ([5da1a94](5da1a94)) * Sync files with stordco/common-config-elixir ([#94](#94)) ([33ed4d8](33ed4d8)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Related Ticket(s)
SIGNAL-7060
Checklist
Problem
Messages larger than what Kafka allows will create an error that is not captured by existing
case
clause, mainly because it is happening during process termination flow.The error tuple is as follows:
{:not_retriable, {:produce_response_error, "magnam-totam-nobis-est", 0, -1, :message_too_large}}
GenServer crashing with that error seems to be the Brod producer or the supervisor.
It only happens when
terminate_send
has to deal with message(s) in queue that are larger than allowed, and have pushed it to:brod.produce()
.Details
I tried to catch it as another pattern match in the place where we do this check in AsyncWorker, but to no avail.
Now during when the processes are live and working, the existing case clause will handle large messages and emit them out to DataDog logs - the only case where this happens appears to be when
terminate_send
works with the large message (at least that's how I was able to repro the issue in tests).Therefore the large messages are filtered out of the queue because they are already beyond what Kafka allows, and rest is passed into Kafka during termination.
We can move this logic down to
send_message
, but I wanted to be conservative in our changes and limit the blast radius.End expected result is that we should be getting the "message too large" DataDog log ( we weren't getting them with this edge case scenario) with the actual message.
Then, next step is to try to trim out the messages in those messages when they are created.