From c5670e427c378444516429f9b4372d923c9ef1f8 Mon Sep 17 00:00:00 2001 From: Rafael Barbosa Date: Wed, 6 Nov 2024 20:19:58 -0300 Subject: [PATCH 1/4] First try to benchmark agains erlkaf --- lib/klife/testing.ex | 2 +- lib/mix/tasks/benchmark.ex | 71 ++++++++++++++++++++++++++++++++++++++ mix.exs | 1 + mix.lock | 6 ++++ 4 files changed, 79 insertions(+), 1 deletion(-) diff --git a/lib/klife/testing.ex b/lib/klife/testing.ex index 461bc6f..1a58649 100644 --- a/lib/klife/testing.ex +++ b/lib/klife/testing.ex @@ -201,7 +201,7 @@ defmodule Klife.Testing do end) end - defp get_latest_offsets(leader_id, metas, client_name) do + def get_latest_offsets(leader_id, metas, client_name) do content = %{ replica_id: -1, isolation_level: 1, diff --git a/lib/mix/tasks/benchmark.ex b/lib/mix/tasks/benchmark.ex index e977be2..2c92569 100644 --- a/lib/mix/tasks/benchmark.ex +++ b/lib/mix/tasks/benchmark.ex @@ -2,6 +2,8 @@ if Mix.env() in [:dev] do defmodule Mix.Tasks.Benchmark do use Mix.Task + alias Klife.Producer.Controller, as: PController + def run(args) do Application.ensure_all_started(:klife) @@ -78,6 +80,75 @@ if Mix.env() in [:dev] do ) end + def do_run_bench("producer_async", parallel) do + %{ + records_0: records_0, + records_1: records_1, + } = generate_data() + + # start producing with both Klife and Erlkaf in an infine loop + :erlkaf.start() + + producer_config = [bootstrap_servers: "localhost:19092"] + :ok = :erlkaf.create_producer(:erlkaf_test_producer, producer_config) + erlkaf_topic = List.first(records_1).topic + + pid_erlkaf = + Task.start(fn -> + Enum.reduce(1..1_000_000, 0, fn i, acc -> + erlkaf_msg = Enum.random(records_1) + :erlkaf.produce(:erlkaf_test_producer, erlkaf_topic, erlkaf_msg.key, erlkaf_msg.value) + end) + end) + + klife_topic = List.first(records_0).topic + + pid_klife = + Task.start(fn -> + Enum.reduce(1..1_000_000, 0, fn i, acc -> + klife_msg = Enum.random(records_0) + + MyClient.produce_async(klife_msg) + end) + end) + + Process.sleep(1000) + + starting_offset = get_offset_by_topic([klife_topic, erlkaf_topic]) |> dbg() + + IO.puts("Starting the test, 5 seconds to first measure") + + Process.sleep(5000) + + new_offset = get_offset_by_topic([klife_topic, erlkaf_topic]) |> dbg() + + new_klife_offset = Map.get(new_offset, klife_topic) - Map.get(starting_offset, klife_topic) + + new_erlkaf_offset = + Map.get(new_offset, erlkaf_topic) - Map.get(starting_offset, erlkaf_topic) + + IO.puts( + "Klife:\t#{new_klife_offset}\nErlkaf:\t#{new_erlkaf_offset}\nDifference:#{new_klife_offset / new_erlkaf_offset}x" + ) + end + + defp get_offset_by_topic(topics) do + metas = PController.get_all_topics_partitions_metadata(MyClient) + + data_by_topic = + metas + |> Enum.group_by(fn m -> m.leader_id end) + |> Enum.flat_map(fn {leader_id, metas} -> + Klife.Testing.get_latest_offsets(leader_id, metas, MyClient) + end) + |> Enum.filter(fn {topic, _pdata} -> Enum.member?(topics, topic) end) + |> Enum.group_by(fn {topic, _pdata} -> topic end, fn {_topic, pdata} -> pdata end) + |> Enum.map(fn {k, v} -> + {k, List.flatten(v) |> Enum.map(fn {_p, offset} -> offset end) |> Enum.sum()} + end) + |> Map.new() + end + def do_run_bench("producer_sync", parallel) do %{ records_0: records_0, diff --git a/mix.exs b/mix.exs index ccd5c86..d3fdf14 100644 --- a/mix.exs +++ b/mix.exs @@ -73,6 +73,7 @@ defmodule Klife.MixProject do {:benchee, "~> 1.0", only: :dev, runtime: false}, {:kafka_ex, "~> 0.13", only: :dev}, {:brod, "~> 3.16", only: :dev}, + {:erlkaf, "~> 2.1.6", only: :dev}, {:observer_cli, "~> 1.7", only: :dev} ] end diff --git a/mix.lock b/mix.lock index 54e30d3..ac54637 100644 --- a/mix.lock +++ b/mix.lock @@ -3,9 +3,13 @@ "brod": {:hex, :brod, "3.17.0", "437daa5204a2175a3f6d01ee31152ca881539ca90acdf123d69835577f6133b1", [:rebar3], [{:kafka_protocol, "4.1.3", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:snappyer, "1.2.9", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "1bf5eb9d1bad1140f97b9d0c5a819ceb30414231cb7f5ad5d5e18201cfaf09f4"}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"}, + "datum": {:hex, :datum, "4.6.1", "93b131203a60cfea9ffff6435a50dc24239f689dfebb76e6aecf6ce689efe8f4", [:rebar3], [], "hexpm", "e14340f8280fedb1731d5cd6e9f5aeaa14b880c51f0b3dc16c42c6671c167e4d"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, + "erlkaf": {:hex, :erlkaf, "2.1.6", "fb9aed863f09249dc549135391f5d173d1a1064bf222dc14a74c92fe3408cd60", [:rebar3], [{:esq, "2.0.6", [hex: :esq, repo: "hexpm", optional: false]}, {:jsone, "1.8.1", [hex: :jsone, repo: "hexpm", optional: false]}], "hexpm", "22ab3e870e78b6d16ae1ffc3fee0c155c9179fcd6e2f2a703398d6fb677ddff9"}, + "esq": {:hex, :esq, "2.0.6", "9917e1a731c609b42624a4bb8594a25d537ea30e7b55d46cd46fa1b95e6db675", [:rebar3], [{:datum, "~> 4.6.1", [hex: :datum, repo: "hexpm", optional: false]}, {:pipe, "~> 2.0.1", [hex: :pipes, repo: "hexpm", optional: false]}, {:uid, "~> 1.3.4", [hex: :uid, repo: "hexpm", optional: false]}], "hexpm", "3b798da50c508fe93248dbbd64d3d2cb618cab5387e66515ab83cadf2b1abac1"}, "ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"}, + "jsone": {:hex, :jsone, "1.8.1", "6bc74d3863d55d420077346da97c601711017a057f2fd1df65d6d65dd562fbab", [:rebar3], [], "hexpm", "c78918124148c51a7a84c678e39bbc6281f8cb582f1d88584628a98468e99738"}, "kafka_ex": {:hex, :kafka_ex, "0.13.0", "2bfaf3c81d4ee01ed2088cb09e46c070c245f60f5752ec7043f29e807f6679ec", [:mix], [{:kayrock, "~> 0.1.12", [hex: :kayrock, repo: "hexpm", optional: false]}], "hexpm", "8a806eee5cd8191f45870b2ef4b3f4f52c57d798039f2d3fc602ce47053db7b9"}, "kafka_protocol": {:hex, :kafka_protocol, "4.1.3", "362d85a898d4148a43dbabb10a30bb2d6ff32ba0097eb06981d11b34e2e0a9cd", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "28cf73001270d972524dd0fad4a59074f4441219f9cf237ad808a2ac1ec97487"}, "kayrock": {:hex, :kayrock, "0.1.15", "61ce03b65dd2236479357ca4162f43fe3a42923b39fbb6551a16d57cf2b93072", [:mix], [{:connection, "~> 1.1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~> 0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~> 1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm", "61d7b3579db68e61c26f316b9246e0231b878148bb1887adc59fecedcbc46c12"}, @@ -17,8 +21,10 @@ "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "observer_cli": {:hex, :observer_cli, "1.7.4", "3c1bfb6d91bf68f6a3d15f46ae20da0f7740d363ee5bc041191ce8722a6c4fae", [:mix, :rebar3], [{:recon, "~> 2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "50de6d95d814f447458bd5d72666a74624eddb0ef98bdcee61a0153aae0865ff"}, + "pipe": {:hex, :pipes, "2.0.1", "a2b56796c63690ed0e78bb77bb389af250bd70afa15a6869369dbdc11087d68f", [:rebar3], [], "hexpm", "623357a158e4c33ee589d4c735ddbab9c77a04e85159192e4d42f1dc97c60bd9"}, "recon": {:hex, :recon, "2.5.5", "c108a4c406fa301a529151a3bb53158cadc4064ec0c5f99b03ddb8c0e4281bdf", [:mix, :rebar3], [], "hexpm", "632a6f447df7ccc1a4a10bdcfce71514412b16660fe59deca0fcf0aa3c054404"}, "snappyer": {:hex, :snappyer, "1.2.9", "9cc58470798648ce34c662ca0aa6daae31367667714c9a543384430a3586e5d3", [:rebar3], [], "hexpm", "18d00ca218ae613416e6eecafe1078db86342a66f86277bd45c95f05bf1c8b29"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, + "uid": {:hex, :uid, "1.3.4", "42e30e22908e8e2faa6227e9c261f1954cb540be3c5a139e112369ae6cc451fc", [:rebar3], [], "hexpm", "f8388ef93b16a5d5f9977e1fe814ae0acf5529b1e0ee5d7b18d23cb4c0f87eaa"}, "varint": {:hex, :varint, "1.2.0", "61bffd9dcc2d5242d59f75694506b4d4013bb103f6a23e34b94f89cebb0c1ab3", [:mix], [], "hexpm", "d94941ed8b9d1a5fdede9103a5e52035bd0aaf35081d44e67713a36799927e47"}, } From 75403e630b4e4d1c8eb5e253ed16a6e309d7a903 Mon Sep 17 00:00:00 2001 From: Rafael Barbosa Date: Wed, 13 Nov 2024 16:54:24 -0300 Subject: [PATCH 2/4] Adding brod --- lib/mix/tasks/benchmark.ex | 68 +----------- test/support/async_producer_benchmark.ex | 133 +++++++++++++++++++++++ 2 files changed, 136 insertions(+), 65 deletions(-) create mode 100644 test/support/async_producer_benchmark.ex diff --git a/lib/mix/tasks/benchmark.ex b/lib/mix/tasks/benchmark.ex index 2c92569..f298855 100644 --- a/lib/mix/tasks/benchmark.ex +++ b/lib/mix/tasks/benchmark.ex @@ -3,6 +3,7 @@ if Mix.env() in [:dev] do use Mix.Task alias Klife.Producer.Controller, as: PController + alias Klife.TestUtils.AsyncProducerBenchmark def run(args) do Application.ensure_all_started(:klife) @@ -81,72 +82,9 @@ if Mix.env() in [:dev] do end def do_run_bench("producer_async", parallel) do - %{ - records_0: records_0, - records_1: records_1, - } = generate_data() - - # start producing with both Klife and Erlkaf in an infine loop - :erlkaf.start() - - producer_config = [bootstrap_servers: "localhost:19092"] - :ok = :erlkaf.create_producer(:erlkaf_test_producer, producer_config) - erlkaf_topic = List.first(records_1).topic - - pid_erlkaf = - Task.start(fn -> - Enum.reduce(1..1_000_000, 0, fn i, acc -> - erlkaf_msg = Enum.random(records_1) - :erlkaf.produce(:erlkaf_test_producer, erlkaf_topic, erlkaf_msg.key, erlkaf_msg.value) - end) - end) - - klife_topic = List.first(records_0).topic - - pid_klife = - Task.start(fn -> - Enum.reduce(1..1_000_000, 0, fn i, acc -> - klife_msg = Enum.random(records_0) - - MyClient.produce_async(klife_msg) - end) - end) + sample_data = generate_data() - Process.sleep(1000) - - starting_offset = get_offset_by_topic([klife_topic, erlkaf_topic]) |> dbg() - - IO.puts("Starting the test, 5 seconds to first measure") - - Process.sleep(5000) - - new_offset = get_offset_by_topic([klife_topic, erlkaf_topic]) |> dbg() - - new_klife_offset = Map.get(new_offset, klife_topic) - Map.get(starting_offset, klife_topic) - - new_erlkaf_offset = - Map.get(new_offset, erlkaf_topic) - Map.get(starting_offset, erlkaf_topic) - - IO.puts( - "Klife:\t#{new_klife_offset}\nErlkaf:\t#{new_erlkaf_offset}\nDifference:#{new_klife_offset / new_erlkaf_offset}x" - ) - end - - defp get_offset_by_topic(topics) do - metas = PController.get_all_topics_partitions_metadata(MyClient) - - data_by_topic = - metas - |> Enum.group_by(fn m -> m.leader_id end) - |> Enum.flat_map(fn {leader_id, metas} -> - Klife.Testing.get_latest_offsets(leader_id, metas, MyClient) - end) - |> Enum.filter(fn {topic, _pdata} -> Enum.member?(topics, topic) end) - |> Enum.group_by(fn {topic, _pdata} -> topic end, fn {_topic, pdata} -> pdata end) - |> Enum.map(fn {k, v} -> - {k, List.flatten(v) |> Enum.map(fn {_p, offset} -> offset end) |> Enum.sum()} - end) - |> Map.new() + AsyncProducerBenchmark.run(["klife", "erlkaf", "brod"], sample_data) end def do_run_bench("producer_sync", parallel) do diff --git a/test/support/async_producer_benchmark.ex b/test/support/async_producer_benchmark.ex new file mode 100644 index 0000000..df858be --- /dev/null +++ b/test/support/async_producer_benchmark.ex @@ -0,0 +1,133 @@ +defmodule Klife.TestUtils.AsyncProducerBenchmark do + alias Klife.Producer.Controller, as: PController + + @number_of_records 5_000_000 + + def run(clients, sample_data) do + topics = [ + List.first(sample_data.records_0).topic, + List.first(sample_data.records_1).topic, + List.first(sample_data.records_2).topic + ] + + records = sample_data.records_0 ++ sample_data.records_1 ++ sample_data.records_2 + + Enum.map(clients, &run_benchmark(&1, topics, records)) |> dbg() + end + + defp run_benchmark("erlkaf", topics, records) do + :erlkaf.start() + + producer_config = [bootstrap_servers: "localhost:19092"] + + :ok = :erlkaf.create_producer(:erlkaf_test_producer, producer_config) + + client_pid = + Task.start(fn -> + Enum.map(1..@number_of_records, fn _i -> + erlkaf_msg = Enum.random(records) + + :erlkaf.produce( + :erlkaf_test_producer, + erlkaf_msg.topic, + erlkaf_msg.key, + erlkaf_msg.value + ) + end) + + :ok + end) + + result = measurement_collector("erlkaf", topics) + + :erlkaf.stop() + + result + end + + defp run_benchmark("klife", topics, records) do + {:ok, client_pid} = + Task.start(fn -> + Enum.map(1..@number_of_records, fn _i -> + klife_msg = Enum.random(records) + MyClient.produce_async(klife_msg) + Process.sleep(1) + end) + end) + + result = measurement_collector("klife", topics) + + Process.exit(client_pid, :kill) + + result + end + + defp run_benchmark("brod", topics, records) do + Task.async(fn -> + Enum.map(1..@number_of_records, fn _i -> + brod_msg = Enum.random(records) + + :brod.produce( + :kafka_client, + brod_msg.topic, + brod_msg.partition, + brod_msg.key, + brod_msg.value + ) + end) + end) + + result = measurement_collector("klife", topics) + + :brod.stop() + + result + end + + defp measurement_collector(client, topics) do + starting_offset = get_total_offsets(topics) + + IO.puts("Starting to measure #{client} , 2 seconds to first measure") + + Process.sleep(2000) + measurement = get_total_offsets(topics) - starting_offset + IO.puts("Measurement 1: #{measurement}") + + Process.sleep(2000) + measurement = get_total_offsets(topics) - starting_offset + IO.puts("Measurement 2: #{measurement}") + + Process.sleep(2000) + measurement = get_total_offsets(topics) - starting_offset + IO.puts("Measurement 3: #{measurement}") + + Process.sleep(2000) + measurement = get_total_offsets(topics) - starting_offset + IO.puts("Measurement 4: #{measurement}") + + Process.sleep(2000) + measurement = get_total_offsets(topics) - starting_offset + IO.puts("Measurement 5: #{measurement}") + + measurement + end + + defp get_total_offsets(topics), do: get_offset_by_topic(topics) |> Map.values() |> Enum.sum() + + defp get_offset_by_topic(topics) do + metas = PController.get_all_topics_partitions_metadata(MyClient) + + data_by_topic = + metas + |> Enum.group_by(fn m -> m.leader_id end) + |> Enum.flat_map(fn {leader_id, metas} -> + Klife.Testing.get_latest_offsets(leader_id, metas, MyClient) + end) + |> Enum.filter(fn {topic, _pdata} -> Enum.member?(topics, topic) end) + |> Enum.group_by(fn {topic, _pdata} -> topic end, fn {_topic, pdata} -> pdata end) + |> Enum.map(fn {k, v} -> + {k, List.flatten(v) |> Enum.map(fn {_p, offset} -> offset end) |> Enum.sum()} + end) + |> Map.new() + end +end From 9c4b61d81c0df6bb302ff7b5f73bd54dff51de84 Mon Sep 17 00:00:00 2001 From: Rafael Barbosa Date: Sat, 16 Nov 2024 00:08:24 -0300 Subject: [PATCH 3/4] Add erlkaf producer config --- test/support/async_producer_benchmark.ex | 33 ++++++++++++++---------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/test/support/async_producer_benchmark.ex b/test/support/async_producer_benchmark.ex index df858be..a68efff 100644 --- a/test/support/async_producer_benchmark.ex +++ b/test/support/async_producer_benchmark.ex @@ -18,26 +18,31 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do defp run_benchmark("erlkaf", topics, records) do :erlkaf.start() - producer_config = [bootstrap_servers: "localhost:19092"] + producer_config = [ + bootstrap_servers: "localhost:19092", + max_in_flight: 1, + enable_idempotence: true, + sticky_partitioning_linger_ms: 0, + batch_size: 512_000 + ] :ok = :erlkaf.create_producer(:erlkaf_test_producer, producer_config) - client_pid = - Task.start(fn -> - Enum.map(1..@number_of_records, fn _i -> - erlkaf_msg = Enum.random(records) - - :erlkaf.produce( - :erlkaf_test_producer, - erlkaf_msg.topic, - erlkaf_msg.key, - erlkaf_msg.value - ) - end) + Task.start(fn -> + Enum.map(1..@number_of_records, fn _i -> + erlkaf_msg = Enum.random(records) - :ok + :erlkaf.produce( + :erlkaf_test_producer, + erlkaf_msg.topic, + erlkaf_msg.key, + erlkaf_msg.value + ) end) + :ok + end) + result = measurement_collector("erlkaf", topics) :erlkaf.stop() From cf2361f979f50a064899a56d8e38ef97d9d4690f Mon Sep 17 00:00:00 2001 From: Rafael Barbosa Date: Sat, 16 Nov 2024 23:40:57 -0300 Subject: [PATCH 4/4] Adding producer config and cleanup --- config/config.exs | 19 ++++- lib/mix/tasks/benchmark.ex | 4 +- test/support/async_producer_benchmark.ex | 98 +++++++++++++++++------- 3 files changed, 88 insertions(+), 33 deletions(-) diff --git a/config/config.exs b/config/config.exs index 8450518..ef5e6af 100644 --- a/config/config.exs +++ b/config/config.exs @@ -30,7 +30,12 @@ config :klife, MyClient, producers: [ [ name: :benchmark_producer, - client_id: "my_custom_client_id" + client_id: "my_custom_client_id", + ], + [ + name: :async_benchmark_producer, + client_id: "my_custom_client_id", + batchers_count: 4 ], [ name: :benchmark_producer_in_flight, @@ -68,6 +73,18 @@ config :klife, MyClient, name: "benchmark_topic_2", default_producer: :benchmark_producer ], + [ + name: "async_benchmark_topic_0", + default_producer: :async_benchmark_producer + ], + [ + name: "async_benchmark_topic_1", + default_producer: :async_benchmark_producer + ], + [ + name: "async_benchmark_topic_2", + default_producer: :async_benchmark_producer + ], [ name: "benchmark_topic_in_flight", default_producer: :benchmark_producer_in_flight diff --git a/lib/mix/tasks/benchmark.ex b/lib/mix/tasks/benchmark.ex index f298855..fb3876b 100644 --- a/lib/mix/tasks/benchmark.ex +++ b/lib/mix/tasks/benchmark.ex @@ -82,9 +82,7 @@ if Mix.env() in [:dev] do end def do_run_bench("producer_async", parallel) do - sample_data = generate_data() - - AsyncProducerBenchmark.run(["klife", "erlkaf", "brod"], sample_data) + AsyncProducerBenchmark.run(["klife", "erlkaf", "brod"]) end def do_run_bench("producer_sync", parallel) do diff --git a/test/support/async_producer_benchmark.ex b/test/support/async_producer_benchmark.ex index a68efff..4499304 100644 --- a/test/support/async_producer_benchmark.ex +++ b/test/support/async_producer_benchmark.ex @@ -1,9 +1,13 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do + require Logger + alias Klife.Producer.Controller, as: PController @number_of_records 5_000_000 - def run(clients, sample_data) do + def run(clients) do + sample_data = generate_data() + topics = [ List.first(sample_data.records_0).topic, List.first(sample_data.records_1).topic, @@ -12,7 +16,15 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do records = sample_data.records_0 ++ sample_data.records_1 ++ sample_data.records_2 - Enum.map(clients, &run_benchmark(&1, topics, records)) |> dbg() + client_results = Enum.map(clients, &run_benchmark(&1, topics, records)) + + results = Enum.zip(clients, client_results) |> Map.new() + IO.puts("Client | Result | Compared to klife") + Enum.each(results, fn {client, result} -> + IO.puts( + "#{client}\t| #{result} | x#{results_compared_to_klife(result, results)}" + ) + end) end defp run_benchmark("erlkaf", topics, records) do @@ -43,7 +55,7 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do :ok end) - result = measurement_collector("erlkaf", topics) + result = measurement_collector(topics) :erlkaf.stop() @@ -56,11 +68,10 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do Enum.map(1..@number_of_records, fn _i -> klife_msg = Enum.random(records) MyClient.produce_async(klife_msg) - Process.sleep(1) end) end) - result = measurement_collector("klife", topics) + result = measurement_collector(topics) Process.exit(client_pid, :kill) @@ -82,39 +93,19 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do end) end) - result = measurement_collector("klife", topics) + result = measurement_collector(topics) :brod.stop() result end - defp measurement_collector(client, topics) do + defp measurement_collector(topics) do starting_offset = get_total_offsets(topics) - IO.puts("Starting to measure #{client} , 2 seconds to first measure") - - Process.sleep(2000) - measurement = get_total_offsets(topics) - starting_offset - IO.puts("Measurement 1: #{measurement}") + Process.sleep(10000) - Process.sleep(2000) - measurement = get_total_offsets(topics) - starting_offset - IO.puts("Measurement 2: #{measurement}") - - Process.sleep(2000) - measurement = get_total_offsets(topics) - starting_offset - IO.puts("Measurement 3: #{measurement}") - - Process.sleep(2000) - measurement = get_total_offsets(topics) - starting_offset - IO.puts("Measurement 4: #{measurement}") - - Process.sleep(2000) - measurement = get_total_offsets(topics) - starting_offset - IO.puts("Measurement 5: #{measurement}") - - measurement + get_total_offsets(topics) - starting_offset end defp get_total_offsets(topics), do: get_offset_by_topic(topics) |> Map.values() |> Enum.sum() @@ -135,4 +126,53 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do end) |> Map.new() end + + defp generate_data() do + topic0 = "async_benchmark_topic_0" + topic1 = "async_benchmark_topic_1" + topic2 = "async_benchmark_topic_2" + + max_partition = 30 + + records_0 = + Enum.map(0..(max_partition - 1), fn p -> + %Klife.Record{ + value: :rand.bytes(1_000), + key: :rand.bytes(50), + topic: topic0, + partition: p + } + end) + + records_1 = + Enum.map(0..(max_partition - 1), fn p -> + %Klife.Record{ + value: :rand.bytes(1_000), + key: :rand.bytes(50), + topic: topic1, + partition: p + } + end) + + records_2 = + Enum.map(0..(max_partition - 1), fn p -> + %Klife.Record{ + value: :rand.bytes(1_000), + key: :rand.bytes(50), + topic: topic2, + partition: p + } + end) + + %{ + records_0: records_0, + records_1: records_1, + records_2: records_2, + max_partition: max_partition + } + end + + defp results_compared_to_klife(result, results) do + result / Map.get(results, "klife") |> Float.round(2) + end end