Skip to content

Commit

Permalink
chore: refactor some code and rollback produce batch async to task
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Nov 16, 2024
1 parent da38c97 commit ab61197
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 99 deletions.
53 changes: 32 additions & 21 deletions lib/klife.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,7 @@ defmodule Klife do

@doc false
def produce_batch([%Record{} | _] = records, client, opts \\ []) do
records =
records
|> Enum.with_index(1)
|> Enum.map(fn {rec, idx} ->
rec
|> Map.replace!(:__estimated_size, Record.estimate_size(rec))
|> Map.replace!(:__batch_index, idx)
|> maybe_add_partition(client, opts)
end)
records = prepare_records(records, client, opts)

if TxnProducerPool.in_txn?(client),
do: TxnProducerPool.produce(records, client, opts),
Expand All @@ -74,22 +66,25 @@ defmodule Klife do

@doc false
def produce_async(%Record{} = record, client, opts \\ []) do
produce_batch_async([record], client, opts)
prepared_rec = prepare_records(record, client, opts)
Producer.produce_async([prepared_rec], client, opts)
end

@doc false
def produce_batch_async([%Record{} | _] = records, client, opts \\ []) do
records =
records
|> Enum.with_index(1)
|> Enum.map(fn {rec, idx} ->
rec
|> Map.replace!(:__estimated_size, Record.estimate_size(rec))
|> Map.replace!(:__batch_index, idx)
|> maybe_add_partition(client, opts)
end)

Producer.produce_async(records, client, opts)
case opts[:callback] do
nil ->
records = prepare_records(records, client, opts)
Producer.produce_async(records, client, opts)

{m, f, args} ->
Task.start(fn -> apply(m, f, [produce_batch(records, client, opts) | args]) end)
:ok

fun when is_function(fun, 1) ->
Task.start(fn -> fun.(produce_batch(records, client, opts)) end)
:ok
end
end

@doc false
Expand Down Expand Up @@ -126,4 +121,20 @@ defmodule Klife do
record
end
end

defp prepare_records(%Record{} = rec, client, opts) do
[new_rec] = prepare_records([rec], client, opts)
new_rec
end

defp prepare_records(recs, client, opts) when is_list(recs) do
recs
|> Enum.with_index(1)
|> Enum.map(fn {rec, idx} ->
rec
|> Map.replace!(:__estimated_size, Record.estimate_size(rec))
|> Map.replace!(:__batch_index, idx)
|> maybe_add_partition(client, opts)
end)
end
end
23 changes: 13 additions & 10 deletions lib/klife/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -286,18 +286,21 @@ defmodule Klife.Client do
@doc """
Produce a batch of records asynchronoulsy.
The same as [`produce_batch/2`](c:produce_batch/2) but do not wait for the response. Accepts a callback
option to execute arbitrary code for each record in the batch.
> #### Beware of callback execution {: .warning}
> The callback is called for each record, so if the batch contains 3 records the callback will be
> executed up to 3 times.
The same as [`produce_batch/2`](c:produce_batch/2) but returns immediately. Accepts a callback
option to execute arbitrary code after response is obtained
> #### Semantics and guarantees {: .info}
>
> This functions calls the callback for each record in the batch using `Task.start/1`.
> When callback is provided this functions is implemented as `Task.start/1`
> calling [`produce_batch/2`](c:produce_batch/2) and executing the callback right after.
> Therefore there is no guarantees about record delivery or callback execution.
> #### Beware of process limits {: .warning}
> Because this function spawns a new process for every new call with a callback defined
> it may lead to a high number of processes to be spawned if it is executed inside loops.
>
> In order to avoid this you can increase the batch size you are calling it or
> increase the system's process limit erlang flag.
## Options
#{NimbleOptions.docs(Klife.get_produce_opts() ++ Klife.get_async_opts())}
Expand All @@ -310,7 +313,7 @@ defmodule Klife.Client do
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: fn resp ->
...> {:ok, _some_resp} = resp
...> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end)
Using MFA:
Expand All @@ -319,7 +322,7 @@ defmodule Klife.Client do
...> def exec(resp, my_arg1, my_arg2) do
...> "arg1" = my_arg1
...> "arg2" = my_arg2
...> {:ok, _some_resp} = resp
...> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end
...> end
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
Expand Down
52 changes: 19 additions & 33 deletions lib/klife/producer/batcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -111,58 +111,44 @@ defmodule Klife.Producer.Batcher do
broker_id,
producer_name,
batcher_id,
mfa_or_fun
callback
) do
client_name
|> get_process_name(broker_id, producer_name, batcher_id)
|> GenServer.cast({:produce_async, records, mfa_or_fun})
|> GenServer.cast({:produce_async, records, callback})
end

def handle_call(
{:produce, [%Record{} | _] = recs, callback_pid},
_from,
%__MODULE__{} = state
) do
def handle_call({:produce, [%Record{} | _] = recs, cb_pid}, _from, %__MODULE__{} = state) do
{new_state, delivery_timeout} = do_produce(state, recs, cb_pid)
{:reply, {:ok, delivery_timeout}, new_state}
end

def handle_cast({:produce_async, [%Record{} | _] = recs, callback}, %__MODULE__{} = state) do
{new_state, _timeout} = do_produce(state, recs, callback)
{:noreply, new_state}
end

defp do_produce(%__MODULE__{} = state, recs, cb_or_pid) do
%{
producer_config: %{linger_ms: linger_ms, delivery_timeout_ms: delivery_timeout},
last_batch_sent_at: last_batch_sent_at,
next_send_msg_ref: next_ref
} = state

now = System.monotonic_time(:millisecond)

on_time? = now - last_batch_sent_at >= linger_ms

new_state =
Enum.reduce(recs, state, fn rec, acc_state ->
add_record(acc_state, rec, callback_pid)
add_record(acc_state, rec, cb_or_pid)
end)

if on_time? and next_ref == nil,
do: {:reply, {:ok, delivery_timeout}, schedule_send_if_earlier(new_state, 0)},
else: {:reply, {:ok, delivery_timeout}, new_state}
end

def handle_cast(
{:produce_async, [%Record{} | _] = recs, mfa_or_fun},
%__MODULE__{} = state
) do
%{
producer_config: %{linger_ms: linger_ms},
last_batch_sent_at: last_batch_sent_at,
next_send_msg_ref: next_ref
} = state

now = System.monotonic_time(:millisecond)

on_time? = now - last_batch_sent_at >= linger_ms

new_state =
Enum.reduce(recs, state, fn rec, acc_state -> add_record(acc_state, rec, mfa_or_fun) end)
if on_time? and next_ref == nil,
do: schedule_send_if_earlier(new_state, 0),
else: new_state

if on_time? and next_ref == nil,
do: {:noreply, schedule_send_if_earlier(new_state, 0)},
else: {:noreply, new_state}
{new_state, delivery_timeout}
end

def handle_info(:send_to_broker, %__MODULE__{} = state) do
Expand Down Expand Up @@ -314,7 +300,7 @@ defmodule Klife.Producer.Batcher do
state
|> move_current_data_to_batch_queue()
|> add_record_to_current_data(record, pid, estimated_size)
|> schedule_send_if_earlier(5),
|> schedule_send_if_earlier(1),
else:
state
|> add_record_to_current_data(record, pid, estimated_size)
Expand Down
4 changes: 2 additions & 2 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ defmodule Klife.Producer.Dispatcher do
records_map
|> Map.fetch!({topic, partition})
|> Map.fetch!(batch_offset)
|> Map.put(:offset, base_offset + batch_offset)
|> Map.replace!(:offset, base_offset + batch_offset)

Task.start(m, f, [{:ok, callback_rec} | a])

Expand All @@ -185,7 +185,7 @@ defmodule Klife.Producer.Dispatcher do
records_map
|> Map.fetch!({topic, partition})
|> Map.fetch!(batch_offset)
|> Map.put(:offset, base_offset + batch_offset)
|> Map.replace!(:offset, base_offset + batch_offset)

Task.start(fn -> fun.({:ok, callback_rec}) end)
end)
Expand Down
49 changes: 19 additions & 30 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -400,23 +400,7 @@ defmodule Klife.Producer do

delivery_timeout_ms =
records
|> Enum.group_by(fn r -> {r.topic, r.partition} end)
|> Enum.map(fn {{t, p}, recs} ->
%{
broker_id: broker_id,
producer_name: default_producer,
batcher_id: default_batcher_id
} = ProducerController.get_topics_partitions_metadata(client_name, t, p)

new_key =
if opt_producer,
do: {broker_id, opt_producer, get_batcher_id(client_name, opt_producer, t, p)},
else: {broker_id, default_producer, default_batcher_id}

{new_key, recs}
end)
|> Enum.group_by(fn {key, _recs} -> key end, fn {_key, recs} -> recs end)
|> Enum.map(fn {k, v} -> {k, List.flatten(v)} end)
|> group_records_by_batcher(client_name, opt_producer)
|> Enum.reduce(0, fn {key, recs}, acc ->
{broker_id, producer, batcher_id} = key

Expand Down Expand Up @@ -452,6 +436,24 @@ defmodule Klife.Producer do
opt_producer = Keyword.get(opts, :producer)
callback = Keyword.get(opts, :callback)

records
|> group_records_by_batcher(client_name, opt_producer)
|> Enum.each(fn {key, recs} ->
{broker_id, producer, batcher_id} = key

:ok =
Batcher.produce_async(
recs,
client_name,
broker_id,
producer,
batcher_id,
callback
)
end)
end

defp group_records_by_batcher(records, client_name, opt_producer) do
records
|> Enum.group_by(fn r -> {r.topic, r.partition} end)
|> Enum.map(fn {{t, p}, recs} ->
Expand All @@ -470,19 +472,6 @@ defmodule Klife.Producer do
end)
|> Enum.group_by(fn {key, _recs} -> key end, fn {_key, recs} -> recs end)
|> Enum.map(fn {k, v} -> {k, List.flatten(v)} end)
|> Enum.each(fn {key, recs} ->
{broker_id, producer, batcher_id} = key

:ok =
Batcher.produce_async(
recs,
client_name,
broker_id,
producer,
batcher_id,
callback
)
end)
end

defp wait_produce_response(timeout_ms, max_resps) do
Expand Down
4 changes: 1 addition & 3 deletions test/producer/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -602,9 +602,7 @@ defmodule Klife.ProducerTest do
callback: fn resp -> send(parent, {:ping, resp}) end
)

assert_receive({:ping, {:ok, %Record{key: ^key1} = new_rec1}}, 1000)
assert_receive({:ping, {:ok, %Record{key: ^key2} = new_rec2}}, 1000)
assert_receive({:ping, {:ok, %Record{key: ^key3} = new_rec3}}, 1000)
assert_receive({:ping, [{:ok, new_rec1}, {:ok, new_rec2}, {:ok, new_rec3}]}, 1000)

assert :ok = TestUtils.assert_offset(MyClient, rec1, new_rec1.offset)
assert :ok = TestUtils.assert_offset(MyClient, rec2, new_rec2.offset)
Expand Down

0 comments on commit ab61197

Please sign in to comment.