-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: update producer to match consumer style #70
Conversation
@doc """ | ||
Checks if a value is a valid Kafka offset. | ||
""" | ||
defguard is_offset(number) when number in -9_223_372_036_854_775_808..9_223_372_036_854_775_807 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oddly enough dialyzer was complaining about using is_integer
for a guard, so I made this one instead. Seems to pass. I'm assuming this has to do with how large the number is and not technically being an integer low level or something. Haven't dug into why.
@@ -6,10 +6,12 @@ defmodule Kafee.Application do | |||
@doc false | |||
@spec start(Application.start_type(), term()) :: {:ok, pid} | {:error, term()} | |||
def start(_type, _args) do | |||
:ets.new(:kafee_config, [:public, :set, :named_table, {:read_concurrency, true}]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, currently the configuration is held in Kafee.Producer.Config
and spun up as a process in the Kafee.Producer
supervisor tree (because it's a supervisor). I then tried to run a simple Agent
to hold the configuration off of Kafee.Application
. This worked but still felt a little hacky. The end solution I ended up with is this simple ets table. It holds configuration for producers, has very little moving parts, and requires no developer intimate knowledge.
children = [ | ||
{Registry, keys: :unique, name: Kafee.Producer.AsyncRegistry} | ||
{Registry, keys: :unique, name: Kafee.Registry} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to just the Kafee registry because it can be used for other things (although I've tried to avoid it and it's only used for the dynamic async workers.)
@@ -29,7 +29,7 @@ defmodule Kafee.Consumer do | |||
Kafee has built in support for Jason and Protobuf encoding and decoding. See | |||
individual encoder decoder modules for more options. | |||
""", | |||
type: {:or, [nil, :mod_arg]} | |||
type: {:or, [nil, :atom, :mod_arg]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used to be encoder: nil
or encoder: {Kafee.JasonEncoderDecoder, []}
. Now it can simply be the module with encoder: Kafee.JasonEncoderDecoder
.
lib/kafee/conumser/message.ex
Outdated
@type t :: %__MODULE__{ | ||
key: binary(), | ||
value: any(), | ||
topic: binary(), | ||
partition: -2_147_483_648..2_147_483_647, | ||
offset: integer(), | ||
consumer_group: binary(), | ||
key: Kafee.key(), | ||
value: Kafee.value() | any(), | ||
topic: Kafee.topic(), | ||
partition: Kafee.partition(), | ||
offset: Kafee.offset(), | ||
consumer_group: Kafee.consumer_group_id(), | ||
timestamp: DateTime.t(), | ||
headers: [{binary(), binary()}] | ||
headers: Kafee.headers() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to avoid using low level :brod
types so we are not always stuck on using :brod
as a dependency. I also brought these to the root Kafee
module for documentation readability.
@@ -1,15 +0,0 @@ | |||
defmodule Kafee.NilEncoderDecoder do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been replaced with just using nil
as the encoder or decoder option.
def init(init_opts \\ []) do | ||
# credo:disable-for-lines:2 Credo.Check.Warning.UnsafeToAtom | ||
config = | ||
[brod_client_id: Module.concat(__MODULE__, BrodClient), producer: __MODULE__] | ||
|> Kafee.Producer.Config.new() | ||
|> Kafee.Producer.Config.merge(Application.get_env(:kafee, :producer, [])) | ||
|> Kafee.Producer.Config.merge(unquote(module_opts)) | ||
|> Kafee.Producer.Config.merge(init_opts) | ||
|> Kafee.Producer.Config.validate!() | ||
|
||
children = [ | ||
{Kafee.Producer.Config, config} | ||
] | ||
|
||
child_spec = config.producer_adapter.child_spec([config]) | ||
|
||
children = | ||
if is_nil(child_spec), | ||
do: children, | ||
else: Enum.reverse([child_spec | children]) | ||
|
||
Supervisor.init(children, strategy: :one_for_one) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole supervisor tree and configuration situation has been simplified a ton. Configuration is now handled by NimbleOptions
and the producer relies on the underlying adapter for starting what ever process it wants (or no process at all like the testing adapter.)
Sorry this PR line count got out of hand. Lots of clean up renaming and making everything perfect. I'm now happy with the state of Kafee and ready for a 3.0.0 release 🎊 I'll draft up a release announcement today or Monday for the engineering channel. |
An automated release has been created for you. --- ## [3.0.0](v2.6.2...v3.0.0) (2023-11-05) ### ⚠ BREAKING CHANGES * `Kafee.Producer` configuration is done differently to match how `Kafee.Consumer` works. * the use of "backend" has been renamed to "adapter" * `Kafee.Testing` is now reworked to `Kafee.Test` ### Features * [SRE-515] use testing pid for better kafka produce testing ([#58](#58)) ([cae7bce](cae7bce)) * [SRE-517] create a consumer module ([#63](#63)) ([d012734](d012734)) * [SRE-518] setup encoder decoder modules ([#62](#62)) ([b494049](b494049)) * Integrate data-streams into kafee producer ([#52](#52)) ([ffdd5da](ffdd5da)) * Update producer to match consumer style ([#70](#70)) ([39fc85a](39fc85a)) ### Bug Fixes * Fix consumer directory name typo ([#71](#71)) ([44de1d6](44de1d6)) ### Miscellaneous * **deps:** Update outdated dependencies ([#59](#59)) ([5439acb](5439acb)) * **main:** Release 3.0.0 ([#60](#60)) ([3c1b238](3c1b238)) * Sync files with stordco/common-config-elixir ([#72](#72)) ([2d458ba](2d458ba)) * Update backend copy to adapter to align with Elixir more ([#69](#69)) ([1c2da6b](1c2da6b)) * Update codeowners ([#61](#61)) ([10f0290](10f0290)) * Update codeowners ([#65](#65)) ([e352bc6](e352bc6)) * Update README code examples ([#64](#64)) ([50bed75](50bed75)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). [SRE-515]: https://stord.atlassian.net/browse/SRE-515?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ [SRE-517]: https://stord.atlassian.net/browse/SRE-517?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ [SRE-518]: https://stord.atlassian.net/browse/SRE-518?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
This redoes the
Kafee.Producer
modules to match more in line with howKafee.Consumer
works with both process trees and configuration. There is no longer aKafee.Producer.Config
module as it's been replaced withNimbleOptions
for validation. I also moved most of the message validation and filling in out ofKafee.Producer
toKafee.Producer.Message
which cleaned up the logic and made it more functional.Things I want to do before merge:
module
variable names withproducer
orconsumer
for clarityBREAKING CHANGE:
Kafee.Producer
configuration is done differently to match howKafee.Consumer
works.