-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: SIGNAL-7060 preemtively drop large messages from queue #101
feat: SIGNAL-7060 preemtively drop large messages from queue #101
Conversation
This reverts commit 6c64425.
FYI the multi-partition test is flaky due to the termination race condition. I haven't fully caught it yet - will probably deal with it in a separate ticket. UPDATE: actually the fix was easy - so it's included here, in 44f397b |
@doc false | ||
def handle_cast({:queue, messages}, state) do | ||
new_queue = :queue.join(state.queue, :queue.from_list(messages)) | ||
new_messages_queue = messages |> :queue.from_list() |> queue_without_large_messages(state.max_request_bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure I understand, the primary change here is that before this pr, we would still attempt to publish messages that are too large unless the async worker process was terminating in which case we would filter them out.
Whereas, with this change we will pre-emptively drop the large messages and never try to publish them. Is that correct?
I think that makes sense from a functional perspective, but I want to double check my understanding before approving!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup that's 100% correct. Originally we would ignore the state.max_request_bytes
on the first send attempt, and just push first, and then handle the errors bubbling up from brod
.
Adding to the context: due to increased traffic and size - we are observing brod
's related processes failing and the AsyncWorker layer isn't getting the "raw" error messages with the actualy message payload (it seems brod is eating it ahead of bubbling up).
Therefore the other way would be if we already know and have set the state.max_request_byte
, we should be safe in filtering the large messges out ahead of time, so the queue won't even have them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how urgent this is, but perhaps it's worth waiting to get a review from @btkostner as well 🧠
Yup, hopefully with the compression tweak at |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good and it looks like it includes logging for when something is dropped from queue which is my biggest concern. I might add a bit more docs on silent failures when adding to queue just so it does not surprise devs.
Added a comment to hopefully clean up the test and make it a bit easier to understand / test what we want.
changes were done; see comments
An automated release has been created for you. --- ## [3.2.0](v3.1.2...v3.2.0) (2024-10-01) ### Features * SIGNAL-7060 preemtively drop large messages from queue ([#101](#101)) ([0baeae8](0baeae8)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Related Ticket(s)
SIGNAL-7060
Checklist
Problem
We added logic to drop large messages during termination, but we still observed raw
:message_too_large
errors bubbling up from brod code. Due to the sheer traffic in topicwms-service--firehose
, these errors are observed once or twice every other day.Details
Preemptively drop large messages from even being attempted. Currently prior to this PR,
AsyncWorker.build_message_batch()
would attempt to push the message out to Kafka without checking the size.This PR will check at the
AsyncWorker.queue()
lifecycle, so when request comes into this GenServer to put a new message to the queue, it will check and drop the large message - if it goes overstate.max_request_bytes
.The dropped messages follow the same logging code which should push out the message to DataDog.