Skip to content

Commit

Permalink
feat: SIGNAL-7190 Task.await_many for batch operations (#110)
Browse files Browse the repository at this point in the history
## Related Ticket(s)
SIGNAL-7190
<!--
Enter the Jira issue below in the following format: PROJECT-##
-->

## Checklist

<!--
For each bullet, ensure your pr meets the criteria and write a note
explaining how this PR relates. Mark them as complete as they are done.
All top-level checkboxes should be checked regardless of their relevance
to the pr with a note explaining whether they are relevant or not.
-->

- [x] Code conforms to the [Elixir
Styleguide](https://github.com/christopheradams/elixir_style_guide)

## Problem

Task.async operations to finish before finishing up the batch because
it’s “going too fast” and DB can’t keep up (afraid same thing might
happen in prod).

## Details

We’ll use Task.await_many as the back pressure, as that delays batching
function to finish, and increasing the time to finish is the back
pressure onto receiving more messages.
  • Loading branch information
seungjinstord authored Oct 18, 2024
1 parent e302bf0 commit 79c9f42
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion lib/kafee/consumer/broadway_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ defmodule Kafee.Consumer.BroadwayAdapter do
if batch_config[:async_run] do
# No need for Task.Supervisor as it is not running under a GenServer,
# and Kafee.Consumer.Adapter.push_message does already have error handling.
Enum.each(messages, &Task.async(fn -> do_consumer_work(&1, consumer, options) end))
tasks = Enum.map(messages, &Task.async(fn -> do_consumer_work(&1, consumer, options) end))
Task.await_many(tasks)
else
Enum.each(messages, fn message -> do_consumer_work(message, consumer, options) end)
end
Expand Down

0 comments on commit 79c9f42

Please sign in to comment.