Skip to content

Commit

Permalink
feat: async api
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Aug 17, 2024
1 parent ffeb7b1 commit a4b4e26
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 32 deletions.
54 changes: 47 additions & 7 deletions lib/klife.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@ defmodule Klife do
doc:
"Producer's name that will override the `default_producer` configuration. Ignored inside transactions."
],
async: [
type: :boolean,
required: false,
default: false,
doc:
"Makes the produce asynchronous. When `true` the return value will be `:ok`. Ignored inside transactions."
],
partitioner: [
type: :atom,
required: false,
doc: "Module that will override `default_partitioner` configuration."
]
]

@async_opts [
callback: [
type: :any,
required: false,
doc:
"MFA or function/1 that will be called with the produce result. The result is injected as the first argument on MFA and is the only argument for anonymous functions"
]
]

@txn_opts [
pool_name: [
type: :atom,
Expand All @@ -42,6 +44,7 @@ defmodule Klife do

def get_produce_opts(), do: @produce_opts
def get_txn_opts(), do: @txn_opts
def get_async_opts(), do: @async_opts

def produce(%Record{} = record, client, opts \\ []) do
case produce_batch([record], client, opts) do
Expand All @@ -66,6 +69,43 @@ defmodule Klife do
else: Producer.produce(records, client, opts)
end

# The async implementation is non optimal because it may copy a lot of
# data to the new task process. Ideally we could solve this by making
# Dispatcher start the callback task instead of send message to the
# waiting pid but it would be hard to keep the same API this way
# because inside Dispatcher we do not have the same data and since
# we want the async callback to receive the exact same output
# as the sync counter part this is the easiest for now.
def produce_async(%Record{} = record, client, opts \\ []) do
{:ok, _task_pid} =
Task.start(fn ->
resp = produce(record, client, opts)

case opts[:callback] do
{m, f, args} -> apply(m, f, [resp | args])
fun when is_function(fun, 1) -> fun.(resp)
_ -> :noop
end
end)

:ok
end

def produce_batch_async([%Record{} | _] = records, client, opts \\ []) do
{:ok, _task_pid} =
Task.start(fn ->
resp = produce_batch(records, client, opts)

case opts[:callback] do
{m, f, args} -> apply(m, f, [resp | args])
fun when is_function(fun, 1) -> fun.(resp)
_ -> :noop
end
end)

:ok
end

def produce_batch_txn([%Record{} | _] = records, client, opts \\ []) do
transaction(
fn -> records |> produce_batch(client, opts) |> Record.verify_batch() end,
Expand Down
93 changes: 93 additions & 0 deletions lib/klife/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,50 @@ defmodule Klife.Client do
"""
@callback produce(record, opts :: Keyword.t()) :: {:ok, record} | {:error, record}

@doc group: "Producer API"
@doc """
Produce a single record asynchronoulsy.
The same as [`produce/2`](c:produce/2) but returns immediately. Accepts a callback
option to execute arbitrary code after response is obtained.
> #### Semantics and guarantees {: .info}
>
> This functions is implemented as `Task.start/1` calling [`produce/2`](c:produce/2).
> Therefore there is no guarantees about record delivery or callback execution.
## Options
#{NimbleOptions.docs(Klife.get_produce_opts() ++ Klife.get_async_opts())}
## Examples
Anonymous Function:
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> callback = fn resp ->
...> {:ok, enriched_rec} = resp
...> true = is_number(enriched_rec.offset)
...> true = is_number(enriched_rec.partition)
...> end
iex> :ok = MyClient.produce_async(rec, callback: callback)
Using MFA:
iex> defmodule CB do
...> def exec(resp, my_arg1, my_arg2) do
...> "my_arg1" = my_arg1
...> "my_arg2" = my_arg2
...> {:ok, enriched_rec} = resp
...> true = is_number(enriched_rec.offset)
...> true = is_number(enriched_rec.partition)
...> end
...> end
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> :ok = MyClient.produce_async(rec, callback: {CB, :exec, ["my_arg1", "my_arg2"]})
"""
@callback produce_async(record, opts :: Keyword.t()) :: :ok

@doc group: "Producer API"
@doc """
Produce a batch of records.
Expand Down Expand Up @@ -251,6 +295,52 @@ defmodule Klife.Client do
"""
@callback produce_batch(list_of_records, opts :: Keyword.t()) :: list({:ok | :error, record})


@doc group: "Producer API"
@doc """
Produce a batch of records asynchronoulsy.
The same as [`produce_batch/2`](c:produce_batch/2) but returns immediately. Accepts a callback
option to execute arbitrary code after response is obtained.
> #### Semantics and guarantees {: .info}
>
> This functions is implemented as `Task.start/1` calling [`produce_batch/2`](c:produce_batch/2).
> Therefore there is no guarantees about record delivery or callback execution.
## Options
#{NimbleOptions.docs(Klife.get_produce_opts() ++ Klife.get_async_opts())}
## Examples
Anonymous Function:
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: fn resp ->
...> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end)
Using MFA:
iex> defmodule CB2 do
...> def exec(resp, my_arg1, my_arg2) do
...> "arg1" = my_arg1
...> "arg2" = my_arg2
...> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end
...> end
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: {CB2, :exec, ["arg1", "arg2"]})
"""
@callback produce_batch_async(record, opts :: Keyword.t()) :: :ok

@doc group: "Transaction API"
@doc """
Runs the given function inside a transaction.
Expand Down Expand Up @@ -373,6 +463,7 @@ defmodule Klife.Client do
[name: default_txn_pool_name],
Klife.TxnProducerPool.get_opts()
)

Enum.uniq_by(l ++ [default_txn_pool], fn p -> p[:name] end)
end)
|> Keyword.update(:topics, [], fn l ->
Expand Down Expand Up @@ -412,6 +503,8 @@ defmodule Klife.Client do

def produce(%Record{} = rec, opts \\ []), do: Klife.produce(rec, __MODULE__, opts)
def produce_batch(recs, opts \\ []), do: Klife.produce_batch(recs, __MODULE__, opts)
def produce_async(%Record{} = rec, opts \\ []), do: Klife.produce_async(rec, __MODULE__, opts)
def produce_batch_async(recs, opts \\ []), do: Klife.produce_batch_async(recs, __MODULE__, opts)
def produce_batch_txn(recs, opts \\ []), do: Klife.produce_batch_txn(recs, __MODULE__, opts)
def transaction(fun, opts \\ []), do: Klife.transaction(fun, __MODULE__, opts)
end
Expand Down
29 changes: 13 additions & 16 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ defmodule Klife.Producer do
@doc false
def produce([%Record{} | _] = records, client_name, opts) do
opt_producer = Keyword.get(opts, :producer)
callback_pid = if Keyword.get(opts, :async, false), do: nil, else: self()
callback_pid = self()

delivery_timeout_ms =
records
Expand Down Expand Up @@ -307,23 +307,19 @@ defmodule Klife.Producer do
if acc < delivery_timeout_ms, do: delivery_timeout_ms, else: acc
end)

if callback_pid do
max_resps = List.last(records).__batch_index
responses = wait_produce_response(delivery_timeout_ms, max_resps)
max_resps = List.last(records).__batch_index
responses = wait_produce_response(delivery_timeout_ms, max_resps)

records
|> Enum.map(fn %Record{} = rec ->
case Map.get(responses, rec.__batch_index) do
{:ok, offset} ->
{:ok, %{rec | offset: offset}}
records
|> Enum.map(fn %Record{} = rec ->
case Map.get(responses, rec.__batch_index) do
{:ok, offset} ->
{:ok, %{rec | offset: offset}}

{:error, ec} ->
{:error, %{rec | error_code: ec}}
end
end)
else
:ok
end
{:error, ec} ->
{:error, %{rec | error_code: ec}}
end
end)
end

defp wait_produce_response(timeout_ms, max_resps) do
Expand Down Expand Up @@ -410,6 +406,7 @@ defmodule Klife.Producer do
# main metadata ets table, therefore we need a way to
# find out it's value.
put_batcher_id(client_name, producer_name, t_name, p_idx, b_id)

if ProducerController.get_default_producer(client_name, t_name, p_idx) == producer_name do
ProducerController.update_batcher_id(client_name, t_name, p_idx, b_id)
end
Expand Down
47 changes: 38 additions & 9 deletions test/producer/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ defmodule Klife.ProducerTest do
{:ok, _} = MyClient.produce(rec, client: client)
end

defp now_unix(), do: DateTime.utc_now() |> DateTime.to_unix()

setup_all do
:ok = TestUtils.wait_producer(MyClient)
%{}
Expand Down Expand Up @@ -572,7 +570,7 @@ defmodule Klife.ProducerTest do
assert length(record_batch) == 1
end

test "produce message async no batching" do
test "produce message async no batching - anon fun" do
rec = %Record{
value: :rand.bytes(10),
key: :rand.bytes(10),
Expand All @@ -581,15 +579,46 @@ defmodule Klife.ProducerTest do
partition: 1
}

base_ts = now_unix()
assert :ok = MyClient.produce(rec, async: true)
parent = self()

assert :ok = MyClient.produce_async(rec, callback: fn resp -> send(parent, {:ping, resp}) end)

assert_receive {:ping, resp}

Process.sleep(10)
assert {:ok, new_rec} = resp

assert :ok = assert_offset(MyClient, rec, new_rec.offset)
record_batch = TestUtils.get_record_batch_by_offset(MyClient, rec.topic, 1, new_rec.offset)
assert length(record_batch) == 1
end

offset = TestUtils.get_latest_offset(MyClient, rec.topic, rec.partition, base_ts)
test "produce message async no batching - mfa" do
defmodule CB do
def exec(resp, arg1, arg2) do
send(arg1, {:ping, resp, arg2})
end
end

rec = %Record{
value: :rand.bytes(10),
key: :rand.bytes(10),
headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}],
topic: "test_async_topic",
partition: 1
}

parent = self()

assert :ok = MyClient.produce_async(rec, callback: {CB, :exec, [parent, "arg2"]})

assert_receive {:ping, resp, "arg2"}

assert {:ok, new_rec} = resp
assert :ok = assert_offset(MyClient, rec, new_rec.offset)

record_batch =
TestUtils.get_record_batch_by_offset(MyClient, rec.topic, 1, new_rec.offset)

assert :ok = assert_offset(MyClient, rec, offset)
record_batch = TestUtils.get_record_batch_by_offset(MyClient, rec.topic, 1, offset)
assert length(record_batch) == 1
end

Expand Down

0 comments on commit a4b4e26

Please sign in to comment.