Skip to content

Commit

Permalink
feat(producer): refactor txn feature
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Jun 6, 2024
1 parent 5768210 commit af97649
Show file tree
Hide file tree
Showing 11 changed files with 758 additions and 473 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
- SASL

- Producer System
- Transaction api
- Accept more versions of the protocol
- Improve input errors handling
- Standardize options handling
- Accept more versions of the protocol
- Improve test coverage

- Consumer System (TBD)
Expand Down
17 changes: 14 additions & 3 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ config :klife,
[
cluster_name: :my_test_cluster_1,
txn_pools: [
%{name: :my_pool_1},
%{name: :my_pool_2}
%{name: :my_test_pool_1, pool_size: 1}
],
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
Expand Down Expand Up @@ -44,7 +43,19 @@ config :klife,
],
topics: [
%{
name: "benchmark_topic",
name: "benchmark_topic_0",
producer: :benchmark_producer,
num_partitions: 30,
replication_factor: 2
},
%{
name: "benchmark_topic_1",
producer: :benchmark_producer,
num_partitions: 30,
replication_factor: 2
},
%{
name: "benchmark_topic_2",
producer: :benchmark_producer,
num_partitions: 30,
replication_factor: 2
Expand Down
35 changes: 30 additions & 5 deletions lib/klife.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Klife do
alias Klife.Record
alias Klife.Producer
alias Klife.TxnProducer
alias Klife.TxnProducerPool
alias Klife.Producer.Controller, as: PController

def produce(record_or_records, opts \\ [])
Expand Down Expand Up @@ -29,18 +29,43 @@ defmodule Klife do
|> maybe_add_partition(cluster, opts)
end)

if TxnProducer.in_txn?(cluster),
do: TxnProducer.produce(records, cluster, opts),
else: Producer.produce(records, cluster, opts)
in_txn? = TxnProducerPool.in_txn?(cluster)
with_txn_opt = Keyword.get(opts, :with_txn, false)

cond do
in_txn? ->
TxnProducerPool.produce(records, cluster, opts)

with_txn_opt ->
transaction(
fn ->
resp = produce(records, opts)

# Do we really need this match?
if Enum.all?(resp, &match?({:ok, _}, &1)),
do: {:ok, resp},
else: {:error, :abort}
end,
opts
)
|> case do
{:ok, resp} -> resp
err -> err
end

true ->
Producer.produce(records, cluster, opts)
end
end

def transaction(fun, opts \\ []) do
cluster = get_cluster(opts)
TxnProducer.run_txn(cluster, fun)
TxnProducerPool.run_txn(cluster, get_txn_pool(opts), fun)
end

defp default_cluster(), do: :persistent_term.get(:klife_default_cluster)
defp get_cluster(opts), do: Keyword.get(opts, :cluster, default_cluster())
defp get_txn_pool(opts), do: Keyword.get(opts, :txn_pool, :klife_txn_pool)

defp maybe_add_partition(%Record{} = record, cluster, opts) do
case record do
Expand Down
1 change: 0 additions & 1 deletion lib/klife/producer/batcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ defmodule Klife.Producer.Batcher do
end

defp get_attributes_byte(%Producer{} = pconfig, _opts) do
# TODO: Handle different attributes opts
[
compression: pconfig.compression_type,
is_transactional: pconfig.txn_id != nil
Expand Down
93 changes: 81 additions & 12 deletions lib/klife/producer/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,53 @@ defmodule Klife.Producer.Controller do
alias Klife.Utils
alias Klife.Producer.ProducerSupervisor

alias Klife.TxnProducerPool

alias Klife.Producer

@default_producer %{name: :default_producer}
@default_producer %{name: :klife_default_producer}

@default_txn_pool %{name: :klife_txn_pool}

@check_metadata_delay :timer.seconds(10)

@txn_pool_options [
name: [type: :atom, required: true, default: :klife_txn_pool],
pool_size: [type: :non_neg_integer, default: 10],
delivery_timeout_ms: [type: :non_neg_integer, default: :timer.seconds(30)],
request_timeout_ms: [type: :non_neg_integer, default: :timer.seconds(15)],
retry_backoff_ms: [type: :non_neg_integer, default: :timer.seconds(1)],
txn_timeout_ms: [type: :non_neg_integer, default: :timer.seconds(60)],
base_txn_id: [type: :string, default: ""],
compression_type: [type: {:in, [:none, :gzip, :snappy]}, default: :none]
]

defstruct [
:cluster_name,
:producers,
:topics,
:check_metadata_waiting_pids,
:check_metadata_timer_ref,
:txn_config
:txn_pools
]

def start_link(args) do
cluster_name = Keyword.fetch!(args, :cluster_name)
GenServer.start_link(__MODULE__, args, name: via_tuple({__MODULE__, cluster_name}))

validated_pool_args =
args
|> Keyword.get(:txn_pools, [])
|> Enum.concat([@default_txn_pool])
|> Enum.map(fn pool_config ->
pool_config
|> Map.to_list()
|> NimbleOptions.validate!(@txn_pool_options)
|> Map.new()
end)

validated_args = Keyword.put(args, :txn_pools, validated_pool_args)

GenServer.start_link(__MODULE__, validated_args, name: via_tuple({__MODULE__, cluster_name}))
end

@impl true
Expand All @@ -41,7 +70,7 @@ defmodule Klife.Producer.Controller do
state = %__MODULE__{
cluster_name: cluster_name,
producers: [@default_producer] ++ Keyword.get(args, :producers, []),
txn_config: Keyword.get(args, :txn_config, []),
txn_pools: Keyword.get(args, :txn_pools, []),
topics: Enum.filter(topics_list, &Map.get(&1, :enable_produce, true)),
check_metadata_waiting_pids: [],
check_metadata_timer_ref: timer_ref
Expand Down Expand Up @@ -106,19 +135,62 @@ defmodule Klife.Producer.Controller do
end
end

send(self(), :handle_txn_producers)

{:noreply, state}
end

def handle_info(:handle_txn_producers, %__MODULE__{} = state) do
result =
for txn_pool <- state.txn_pools,
txn_producer_count <- 1..txn_pool.pool_size do
txn_id =
if txn_pool.base_txn_id != "" do
txn_pool.base_txn_id <> "_#{txn_producer_count}"
else
:crypto.strong_rand_bytes(15)
|> Base.url_encode64()
|> binary_part(0, 15)
|> Kernel.<>("_#{txn_producer_count}")
end

txn_producer_configs = %{
cluster_name: state.cluster_name,
name: :"klife_txn_producer.#{txn_pool.name}.#{txn_producer_count}",
acks: :all,
linger_ms: 0,
delivery_timeout_ms: txn_pool.delivery_timeout_ms,
request_timeout_ms: txn_pool.request_timeout_ms,
retry_backoff_ms: txn_pool.retry_backoff_ms,
max_in_flight_requests: 1,
batchers_count: 1,
enable_idempotence: true,
compression_type: txn_pool.compression_type,
txn_id: txn_id,
txn_timeout_ms: txn_pool.txn_timeout_ms
}

DynamicSupervisor.start_child(
via_tuple({ProducerSupervisor, state.cluster_name}),
{Producer, Map.to_list(txn_producer_configs)}
)
|> case do
{:ok, _pid} -> :ok
{:error, {:already_started, pid}} -> send(pid, :handle_batchers)
end
end

for txn_pool <- state.txn_pools do
DynamicSupervisor.start_child(
via_tuple({ProducerSupervisor, state.cluster_name}),
{Klife.TxnProducer, state.txn_config ++ [cluster_name: state.cluster_name]}
{TxnProducerPool, Map.to_list(txn_pool) ++ [cluster_name: state.cluster_name]}
)
|> case do
{:ok, _pid} ->
:ok

case result do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
{:error, {:already_started, _pid}} ->
:ok
end
end

{:noreply, state}
Expand Down Expand Up @@ -272,9 +344,6 @@ defmodule Klife.Producer.Controller do

if Enum.any?(results, &(&1 == :new)) do
send(self(), :handle_producers)
# TODO: This need to be done here? Or can it
# be handled on the supervisor level?
send(self(), :handle_txn_producers)
end

:ok
Expand Down
7 changes: 4 additions & 3 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ defmodule Klife.Producer.Dispatcher do
data =
%__MODULE__.Request{
request_ref: request_ref,
producer_config: %{retry_ms: retry_ms},
producer_config: %Producer{retry_backoff_ms: retry_ms},
pool_idx: pool_idx,
batch_to_send: batch_to_send
} = Map.fetch!(state.requests, req_ref)
Expand Down Expand Up @@ -256,7 +256,8 @@ defmodule Klife.Producer.Dispatcher do
delivery_timeout_ms: delivery_timeout,
cluster_name: cluster_name,
client_id: client_id,
acks: acks
acks: acks,
txn_id: txn_id
},
batch_to_send: batch_to_send,
base_time: base_time,
Expand All @@ -267,7 +268,7 @@ defmodule Klife.Producer.Dispatcher do
headers = %{client_id: client_id}

content = %{
transactional_id: nil,
transactional_id: txn_id,
acks: if(acks == :all, do: -1, else: acks),
timeout_ms: req_timeout,
topic_data: parse_batch_before_send(batch_to_send)
Expand Down
Loading

0 comments on commit af97649

Please sign in to comment.