Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [SRE-517] create a consumer module #63

Merged
merged 8 commits into from
Oct 12, 2023
Merged

feat: [SRE-517] create a consumer module #63

merged 8 commits into from
Oct 12, 2023

Conversation

btkostner
Copy link
Contributor

This creates a Kafee.Consumer module that can take multiple different backends (similar to the producer.) This determines the whole backend process tree. Aside from that, the handle_message/1 and handle_failure/2 functions are generic and don't require any code changes based on the backend picked.

Kafee.Consumer.BroadwayBackend

This backend uses the Broadway library to process Kafka messages. It's made to be a freight train of maximum throughput that won't stop for anything. This is great for processing a ton of messages that require no order guarantee and have their own dead letter failure logic.

Kafee.Consumer.BrodBackend

This backend will use :brod directly. It will spin up a process for each partition to process messages, but will only process them one at a time. This means if a message fails to process, it will be retried and not committed. This sacrifices throughput for safer message handling and order guarantees.

nil

Because it's super annoying to try starting a service locally only for it to start throwing "I can't connect to Kafka" errors. By default the Kafee.Consumer module will have a nil backend and not start any tree. This makes it easier for development and testing.

Different approaches

This module goes a bit different that the producer in that I'm using NimbleOptions to parse and validate options given, and avoiding creating a Config module to hold everything. This should end up being cleaner code in the long term. If successful, I'll probably take a deeper look into the producer code and try to clean it up before 3.0 release.

I also took some inspiration from how Broadway handles start_link and __using__ to try and avoid huge macros and cleaner code. I think it's working out better.

@btkostner btkostner self-assigned this Oct 6, 2023
@@ -0,0 +1,47 @@
defmodule Kafee.Consumer.BrodBackendIntegrationTest do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For those interested in seeing how it's actually used: here are two integration tests. Once for brod and one for broadway.

@btkostner
Copy link
Contributor Author

btkostner commented Oct 9, 2023

Yet to do:

  • Setup encoder decoder module with consumer
  • Look into our Elixir watcher process we had to create last peak and see if we need to add that into Kafee This was due to Elsa, not the lower level brod library Kafee uses.
  • Ensure docs are solid

Things outside this PR:

  • Add request id to all outgoing Kafka messages
  • Maybe some docs on error handling or ideas
  • Maybe backport our configuration handling to the producer modules to clean code up
  • Maybe standardize on a single message struct (instead of a producer message and a consumer message)

lib/kafee/consumer.ex Outdated Show resolved Hide resolved
@btkostner btkostner marked this pull request as ready for review October 11, 2023 01:10
@btkostner btkostner requested a review from a team as a code owner October 11, 2023 01:10
Copy link
Contributor

@doomspork doomspork left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked through the code and everything looks good. Given we've discussed this quite a bit in Slack we should be good to go. Excited to try this out in one of the WMS services.

@btkostner btkostner merged commit d012734 into main Oct 12, 2023
@btkostner btkostner deleted the consumer branch October 12, 2023 00:32
josephbosire pushed a commit that referenced this pull request Oct 12, 2023
An automated release has been created for you.
---


## [3.0.0](v2.6.2...v3.0.0)
(2023-10-12)


### ⚠ BREAKING CHANGES

* `Kafee.Testing` is now reworked to `Kafee.Test`

### Features

* [SRE-515] use testing pid for better kafka produce testing
([#58](#58))
([cae7bce](cae7bce))
* [SRE-517] create a consumer module
([#63](#63))
([d012734](d012734))
* [SRE-518] setup encoder decoder modules
([#62](#62))
([b494049](b494049))
* Integrate data-streams into kafee producer
([#52](#52))
([ffdd5da](ffdd5da))


### Miscellaneous

* **deps:** Update outdated dependencies
([#59](#59))
([5439acb](5439acb))
* Update codeowners ([#61](#61))
([10f0290](10f0290))
* Update README code examples
([#64](#64))
([50bed75](50bed75))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

[SRE-515]:
https://stord.atlassian.net/browse/SRE-515?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
[SRE-517]:
https://stord.atlassian.net/browse/SRE-517?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
[SRE-518]:
https://stord.atlassian.net/browse/SRE-518?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
This was referenced Oct 12, 2023
btkostner pushed a commit that referenced this pull request Nov 13, 2023
An automated release has been created for you.
---


## [3.0.0](v2.6.2...v3.0.0)
(2023-11-05)


### ⚠ BREAKING CHANGES

* `Kafee.Producer` configuration is done differently to match how
`Kafee.Consumer` works.
* the use of "backend" has been renamed to "adapter"
* `Kafee.Testing` is now reworked to `Kafee.Test`

### Features

* [SRE-515] use testing pid for better kafka produce testing
([#58](#58))
([cae7bce](cae7bce))
* [SRE-517] create a consumer module
([#63](#63))
([d012734](d012734))
* [SRE-518] setup encoder decoder modules
([#62](#62))
([b494049](b494049))
* Integrate data-streams into kafee producer
([#52](#52))
([ffdd5da](ffdd5da))
* Update producer to match consumer style
([#70](#70))
([39fc85a](39fc85a))


### Bug Fixes

* Fix consumer directory name typo
([#71](#71))
([44de1d6](44de1d6))


### Miscellaneous

* **deps:** Update outdated dependencies
([#59](#59))
([5439acb](5439acb))
* **main:** Release 3.0.0
([#60](#60))
([3c1b238](3c1b238))
* Sync files with stordco/common-config-elixir
([#72](#72))
([2d458ba](2d458ba))
* Update backend copy to adapter to align with Elixir more
([#69](#69))
([1c2da6b](1c2da6b))
* Update codeowners ([#61](#61))
([10f0290](10f0290))
* Update codeowners ([#65](#65))
([e352bc6](e352bc6))
* Update README code examples
([#64](#64))
([50bed75](50bed75))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

[SRE-515]:
https://stord.atlassian.net/browse/SRE-515?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
[SRE-517]:
https://stord.atlassian.net/browse/SRE-517?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
[SRE-518]:
https://stord.atlassian.net/browse/SRE-518?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants