Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: update producer to match consumer style #70

Merged
merged 9 commits into from
Nov 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,21 @@ So you want to send messages to Kafka eh? Well, first you will need to create a
```elixir
defmodule MyProducer do
use Kafee.Producer,
producer_adapter: Application.compile_env(:my_app, :kafee_producer_adapter, Kafee.Producer.TestAdapter),
topic: "my-topic"
adapter: Application.compile_env(:my_app, :kafee_producer_adapter, nil),
encoder: Kafee.JasonEncoderDecoder,
topic: "my-topic",
partition_fun: :hash

# This is just a regular function that takes a struct from your
# application and converts it to a `t:Kafee.Producer.Message/0`
# struct and calls `produce/1`.
# struct and calls `produce/1`. Note that because we have the
# `encoder` option set above, the `order` value will be JSON
# encoded before sending to Kafka.
def publish(:order_created, %Order{} = order) do
produce([%Kafee.Producer.Message{
produce(%Kafee.Producer.Message{
key: order.tenant_id,
value: Jason.encode!(order)
}])
value: order
})
end
end
```
Expand All @@ -117,10 +121,8 @@ defmodule MyApplication do
{MyProducer, [
host: "localhost",
port: 9092,
username: "username",
password: "password",
ssl: true,
sasl: :plain
sasl: {:plain, "username", "password"},
ssl: true
]}
]

Expand All @@ -129,7 +131,7 @@ defmodule MyApplication do
end
```

And finally, instead of using the `Kafee.Producer.TestAdapter`, you'll want to use another adapter in production. So set that up in your `config/prod.exs` file:
And finally, you'll want to use another adapter in production. So set that up in your `config/prod.exs` file:

```elixir
import Config
Expand All @@ -143,4 +145,4 @@ Once that is done, to publish a message simply run:
MyProducer.publish(:order_created, %Order{})
```

All messages published _not_ in production will just be sent to the current process. This allows for easier testing with the [`Kafee.Test`](https://stord.hexdocs.pm/kafee/Kafee.Test.html) module, as well as not requiring Kafka running locally when in development. In production, the message will actually be sent to Kafka via the [`Kafee.Producer.AsyncAdaptor`](https://stord.hexdocs.pm/kafee/Kafee.Producer.AsyncAdaptor.html) module.
All messages published _not_ in production will be a no-op. This means you do not need any Kafka instance setup to run in development. For testing, we recommend using the [`Kafee.Producer.AsyncAdaptor`](https://stord.hexdocs.pm/kafee/Kafee.Producer.AsyncAdaptor.html) adapter, allowing easier testing via the [`Kafee.Test`](https://stord.hexdocs.pm/kafee/Kafee.Test.html) module.
5 changes: 5 additions & 0 deletions coveralls.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"skip_files": [
"test/support"
]
}
46 changes: 46 additions & 0 deletions lib/kafee.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Kafee do
@moduledoc """
Kafee is an abstraction layer above multiple different lower level Kafka libraries, while also adding features relevant to Stord. This allows switching between `:brod` or `Broadway` for message consuming with a quick configuration change and no code changes. Features include:

- Behaviour based adapters allowing quick low level changes.
- Built in support for testing without mocking.
- Automatic encoding and decoding of message values with `Jason` or `Protobuf`.
- `:telemetry` metrics for producing and consuming messages.
- Open Telemetry traces with correct attributes.
- DataDog data streams support via `data-streams-ex`.
"""

@typedoc "A Kafka message key."
@type key :: binary()

@typedoc "A Kafka message value encoded."
@type value :: binary()

@typedoc "A Kafka partition."
@type topic :: String.t()

@typedoc "Any valid Kafka partition."
@type partition :: -2_147_483_648..2_147_483_647

@typedoc "A function to assign a partition to a message."
@type partition_fun :: :hash | :random | (topic(), [partition()], key(), value() -> partition())

@typedoc "A valid Kafka offset for a message."
@type offset :: -9_223_372_036_854_775_808..9_223_372_036_854_775_807

@typedoc "A group id for consumers in a Kafka cluster."
@type consumer_group_id :: binary()

@typedoc "A list of Kafka message headers."
@type headers :: [{binary(), binary()}]

@doc """
Checks if the valid is a valid Kafka partition.
"""
defguard is_partition(number) when number in -2_147_483_648..2_147_483_647

@doc """
Checks if a value is a valid Kafka offset.
"""
defguard is_offset(number) when number in -9_223_372_036_854_775_808..9_223_372_036_854_775_807
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oddly enough dialyzer was complaining about using is_integer for a guard, so I made this one instead. Seems to pass. I'm assuming this has to do with how large the number is and not technically being an integer low level or something. Haven't dug into why.

end
6 changes: 4 additions & 2 deletions lib/kafee/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ defmodule Kafee.Application do
@doc false
@spec start(Application.start_type(), term()) :: {:ok, pid} | {:error, term()}
def start(_type, _args) do
:ets.new(:kafee_config, [:public, :set, :named_table, {:read_concurrency, true}])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, currently the configuration is held in Kafee.Producer.Config and spun up as a process in the Kafee.Producer supervisor tree (because it's a supervisor). I then tried to run a simple Agent to hold the configuration off of Kafee.Application. This worked but still felt a little hacky. The end solution I ended up with is this simple ets table. It holds configuration for producers, has very little moving parts, and requires no developer intimate knowledge.


children = [
{Registry, keys: :unique, name: Kafee.Producer.AsyncRegistry}
{Registry, keys: :unique, name: Kafee.Registry}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to just the Kafee registry because it can be used for other things (although I've tried to avoid it and it's only used for the dynamic async workers.)

]

Supervisor.start_link(children, strategy: :one_for_one)
Supervisor.start_link(children, name: Kafee.Application, strategy: :one_for_one)
end
end
10 changes: 4 additions & 6 deletions lib/kafee/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Kafee.Consumer do
Kafee has built in support for Jason and Protobuf encoding and decoding. See
individual encoder decoder modules for more options.
""",
type: {:or, [nil, :mod_arg]}
type: {:or, [nil, :atom, :mod_arg]}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used to be encoder: nil or encoder: {Kafee.JasonEncoderDecoder, []}. Now it can simply be the module with encoder: Kafee.JasonEncoderDecoder.

],
host: [
default: "localhost",
Expand Down Expand Up @@ -189,8 +189,6 @@ defmodule Kafee.Consumer do
@doc "Handles an error while processing a Kafka message"
@callback handle_failure(any(), Kafee.Consumer.Message.t()) :: :ok

@optional_callbacks handle_failure: 2

@doc false
defmacro __using__(opts \\ []) do
quote location: :keep, bind_quoted: [opts: opts, module: __CALLER__.module] do
Expand Down Expand Up @@ -254,12 +252,12 @@ defmodule Kafee.Consumer do
for starting the whole process tree.
"""
@spec start_link(module(), options()) :: Supervisor.on_start()
def start_link(module, options) do
def start_link(consumer, options) do
with {:ok, options} <- NimbleOptions.validate(options, @options_schema) do
case options[:adapter] do
nil -> :ignore
adapter when is_atom(adapter) -> adapter.start_link(module, options)
{adapter, _adapter_options} -> adapter.start_link(module, options)
adapter when is_atom(adapter) -> adapter.start_link(consumer, options)
{adapter, __options} -> adapter.start_link(consumer, options)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,25 @@ defmodule Kafee.Consumer.Adapter do
@behaviour Kafee.Consumer.Adapter

@impl Kafee.Consumer.Adapter
def start_link(module, options) do
def start_link(consumer, options) do
# Start the adapter processes
end

def handle_message(module, module_options, raw_kafka_message) do
def handle_message(consumer, options, raw_kafka_message) do
kafee_message = %Kafee.Consumer.Message{
# Set these fields from the raw kafka message
}

# This will wrap a bunch of the Open Telemetry and
# DataDog trace logic and push the final message
# to the Kafee consumer module.
Kafee.Consumer.Adapter.push_message(module, module_options, kafee_message)
Kafee.Consumer.Adapter.push_message(consumer, options, kafee_message)
end
end

"""
@spec push_message(atom(), Kafee.Consumer.options(), Message.t()) :: :ok
def push_message(module, options, %Message{} = message) do
def push_message(consumer, options, %Message{} = message) do
Message.set_logger_request_id(message)

span_name = Message.get_otel_span_name(message)
Expand All @@ -64,21 +64,22 @@ defmodule Kafee.Consumer.Adapter do
new_message_value =
case options[:decoder] do
nil -> message.value
decoder when is_atom(decoder) -> decoder.decode!(message.value, [])
{decoder, decoder_options} -> decoder.decode!(message.value, decoder_options)
end

message = Map.put(message, :value, new_message_value)

:telemetry.span([:kafee, :consume], %{module: module}, fn ->
result = module.handle_message(message)
:telemetry.span([:kafee, :consume], %{module: consumer}, fn ->
result = consumer.handle_message(message)
{result, %{}}
end)
end

:ok
rescue
error ->
module.handle_failure(error, message)
consumer.handle_failure(error, message)
:ok
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ defmodule Kafee.Consumer.BroadwayAdapter do
@doc false
@impl Kafee.Consumer.Adapter
@spec start_link(module(), Kafee.Consumer.options()) :: Supervisor.on_start()
def start_link(module, options) do
def start_link(consumer, options) do
adapter_options =
case options[:adapter] do
nil -> []
Expand All @@ -63,10 +63,10 @@ defmodule Kafee.Consumer.BroadwayAdapter do

with {:ok, adapter_options} <- NimbleOptions.validate(adapter_options, @options_schema) do
Broadway.start_link(__MODULE__,
name: module,
name: consumer,
context: %{
consumer: consumer,
consumer_group: options[:consumer_group_id],
module: module,
options: options
},
producer: [
Expand Down Expand Up @@ -101,10 +101,10 @@ defmodule Kafee.Consumer.BroadwayAdapter do
@doc false
@impl Broadway
def handle_message(:default, %Broadway.Message{data: value, metadata: metadata} = message, %{
module: module,
consumer: consumer,
options: options
}) do
Kafee.Consumer.Adapter.push_message(module, options, %Kafee.Consumer.Message{
Kafee.Consumer.Adapter.push_message(consumer, options, %Kafee.Consumer.Message{
key: metadata.key,
value: value,
topic: metadata.topic,
Expand All @@ -120,13 +120,13 @@ defmodule Kafee.Consumer.BroadwayAdapter do

@doc false
@impl Broadway
def handle_failed(message, %{module: module}) do
def handle_failed(message, %{consumer: consumer}) do
# This error only occurs when there is an issue with the `handle_message/2`
# function above because `Kafee.Consumer.Adapter.push_message/2` will catch any
# errors.

error = %RuntimeError{message: "Error converting a Broadway message to Kafee.Consumer.Message"}
module.handle_failure(error, message)
consumer.handle_failure(error, message)

message
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ defmodule Kafee.Consumer.BrodAdapter do
@doc false
@impl Kafee.Consumer.Adapter
@spec start_link(module(), Kafee.Consumer.options()) :: Supervisor.on_start()
def start_link(module, options) do
Supervisor.start_link(__MODULE__, {module, options})
def start_link(consumer, options) do
Supervisor.start_link(__MODULE__, {consumer, options})
end

@doc false
@spec child_spec({module(), Kafee.Consumer.options()}) :: :supervisor.child_spec()
def child_spec({module, options}) do
def child_spec({consumer, options}) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, [{module, options}]},
start: {__MODULE__, :start_link, [{consumer, options}]},
type: :supervisor
}

Expand All @@ -70,7 +70,7 @@ defmodule Kafee.Consumer.BrodAdapter do

@doc false
@impl Supervisor
def init({module, options}) do
def init({consumer, options}) do
adapter_options =
case options[:adapter] do
nil -> []
Expand All @@ -80,7 +80,7 @@ defmodule Kafee.Consumer.BrodAdapter do

with {:ok, adapter_options} <- NimbleOptions.validate(adapter_options, @options_schema) do
# credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom
brod_client = Module.concat(module, "BrodClient")
brod_client = Module.concat(consumer, "BrodClient")

children = [
%{
Expand All @@ -97,7 +97,7 @@ defmodule Kafee.Consumer.BrodAdapter do
shutdown: 500
},
%{
id: module,
id: consumer,
start:
{:brod_group_subscriber_v2, :start_link,
[
Expand All @@ -108,7 +108,7 @@ defmodule Kafee.Consumer.BrodAdapter do
cb_module: Kafee.Consumer.BrodWorker,
message_type: :message,
init_data: %{
module: module,
consumer: consumer,
options: options
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@ defmodule Kafee.Consumer.BrodWorker do
state =
info
|> Map.merge(config)
|> Map.take([:group_id, :module, :options, :partition, :topic])
|> Map.take([:consumer, :group_id, :options, :partition, :topic])

{:ok, state}
end

@doc false
@impl :brod_group_subscriber_v2
@spec handle_message(:brod.message(), map()) :: {:ok, :ack, map()}
def handle_message(
message,
%{
consumer: consumer,
group_id: group_id,
module: module,
options: options,
partition: partition,
topic: topic
} = state
) do
message = kafka_message(message)

Kafee.Consumer.Adapter.push_message(module, options, %Kafee.Consumer.Message{
Kafee.Consumer.Adapter.push_message(consumer, options, %Kafee.Consumer.Message{
key: message[:key],
value: message[:value],
topic: topic,
Expand Down
Loading