Skip to content
This repository has been archived by the owner on Nov 8, 2023. It is now read-only.

Reconsider arroyo's interface #9

Open
volokluev opened this issue Apr 22, 2022 · 6 comments
Open

Reconsider arroyo's interface #9

volokluev opened this issue Apr 22, 2022 · 6 comments

Comments

@volokluev
Copy link
Member

volokluev commented Apr 22, 2022

Preface

I do not use arroyo or work with kafka as part of my job, except when something goes wrong with our consumers. However I would like to better understand how our kafka infrastructure works and make it user friendly enough that someone with no knowledge of kafka can write a high performance consumer. This issue question's arroyo's design but also my own understanding of it.

Problem

As arroyo is being re-implemented in another language, it is a worthy exercise to look at the way it is built and see if things can be done to improve code readability and user experience. Taking messages from kafka and putting them somewhere else is a key part of sentry and we want teams who know very little about kafka to write consumers for it. How can we get there?

Goals

  1. Decrease learning curve for users of arroyo
  2. Make arroyo harder to misuse
  3. Make external contributions to arroyo easier

Proposals

Clear boundaries on what arroyo is for

Building a kafka consumer has many complexities, we should be explicit about which of them is to be handled by arroyo, and which of them the user of arroyo is supposed to worry about. We should be able to do this without referencing any of the traits/class names that we have introduced as part of arroyo. As a plus, this exercise builds the case for using arroyo in the first place.

This may be just a documentation issue however being clear about this can drive a better UX for contributors and users. This is the most important thing to be clear about, everything else is not nearly as important.

Composition over inheritance

arroyo-python makes heavy use of inheritance, and arroyo-rust makes heavy use of traits. Too much ability to customize can make the library more unapproachable and easier to misues. Example:

pub trait Consumer<'a, TPayload: Clone> 

What is the value gained from making a consumer a trait? Arroyo is a kafka consumer library. This goes back into being clear what arroyo is for. Why not just make an arroyo consumer a wrapper object around the rdkafka one?

pub trait ProcessingStrategy<TPayload: Clone>: Send + Sync

Why would we process anything except kafka messages?

pub trait ProcessingStrategyFactory<TPayload: Clone>: Send + Sync

The use of the factory (AFAIK) is to reset the state of the strategy on partition revocation. Maybe a reset function on the strategy wold be enough without introducing another layer of indirection?

Do the right thing by default

seeing this comment in the ProcessingStrategy trait:

 /// Close this instance. No more messages should be accepted by the
    /// instance after this method has been called.
    /// 
    /// This method should not block. Once this strategy instance has
    /// finished processing (or discarded) all messages that were submitted
    /// prior to this method being called, the strategy should commit its
    /// partition offsets and release any resources that will no longer be
    /// used (threads, processes, sockets, files, etc.)
    fn close(&mut self);

the trait requires an asynchronous behavior. Maybe it would be better to make this an async function? This way the could could simply await when it does a blocking operation, making the consumer code more clear.

@fpacifici
Copy link
Collaborator

fpacifici commented Apr 22, 2022

Some context that should clarify a few design constraints. This is not meant to push back against a redesign

What is the value gained from making a consumer a trait? Arroyo is a kafka consumer library. This goes back into being clear what arroyo is for. Why not just make an arroyo consumer a wrapper object around the rdkafka one?

This is because the Consumer is not a simple wrapper over a Kafka consumer. There are a few implementations. I will reference some from here and some from python as they are not all fully reimplemented yet.

  • https://github.com/getsentry/rust_arroyo/blob/main/src/backends/local/mod.rs#L96. Local in memory fake consumer. This is meant for unit tests. Writing unit tests on Kafka is very impractical both in terms of resources you need to run the tests and in terms of ensuring your consumer gets into a specific state you need to test. On Kafka it is really hard to trigger a rebalance in a unit test, In order to consume message you need to also instantiate a producer, ensure that you produce there, wait for the callbacks, wait for partition assignment. In practice this discourages people from properly unit testing their code. Simplify unit tests of consumers is one of the reasons of having this library.
  • Higher level consumers https://github.com/getsentry/arroyo/blob/main/arroyo/synchronized.py#L80 These do preprocessing of the of the messages. The consumer consumes from an inner consumer and gives you messages that represent temporal intervals. Why is it not just another ProcessingStrategy? Because it is a lower level abstraction that needs to introduce itself in the callback and rebalance process that is intentionally hidden from the ProcessingStrategy, which is what the application developer writes. Hiding the rebalance from the ProcessingStrategy is basically the main reason this library has to exist as it is the trickiest part you need to address when writing a consumer.

Can this all be done via composition? by swapping the backend (in memory vs kafka) inside the consumer class. Yes, but that backend would have to implement a common trait so we would just move the trait somewhere else. Fundamentally there is more than one consumer implementation (functional requirement) and they are supposed to be indistinguishable to the logic that uses the consumer.

Why would we process anything except kafka messages?

In high level consumers like the synchronized consumer the message is not a Kafka message but an interval. If we dropped high level consumers altogether we could avoid this, but there has to be a replacement for that functionality that we use. And the replacement should not make rebalancing details visible to the ProcessingStrategy

@volokluev
Copy link
Member Author

Thank you, all that context in helpful.

In turns of what arroyo is for I'm getting the following:

  • Making it easy to write consumers that handle rebalancing
  • Making it easy to write consumers that are testable

Questions:

  • What else is arroyo for?
    • What other things does it take care of and what does it pass on to the user?
    • I know there's stuff in arroyo about multiprocessing where does that come in?
  • Is the SynchronizedConsumer a good example to use? With the new subscription architecture, we want to deprecate it.
    • Are there any other high level consumers that we want to make it over or maybe it's a good idea to drop these high level consumers?
  • If the goal is to make a more unit-testable consumers, is the ArroyoConsumer the right level of abstraction to make the in-memory consumer? I would have thought the thing to replace would have been the rdkafka consumer that the Arroyo consumer relies on. There's even a generic consumer trait.
    • Yes we would be pushing the traits somewhere else but I would argue we're pushing them out of the realm of user concern and that is a good thing.
    • It is would also lead to better tests because the MockRDKafkaConsumer would be mocking closer to the system the library does not control. Currently the mock is mocking library functionality which makes tests less reliable.
  • What does it mean to handle rebalancing?
    • What is the arroyo user experience with respect to rebalancing?
    • Is the abstraction meant to make it look like rebalancing is not a thing or is it a process that the user is supposed to plug into?

Food for thought

rdkafka has a StreamConsumer which is a high level consumer implementation. Can we build on top of the StreamConsumer to make our lives easier?

@fpacifici
Copy link
Collaborator

Is the SynchronizedConsumer a good example to use? With the new subscription architecture, we want to deprecate it

We want to deprecate it for subscriptions. There are other scenarios (like post process forwarder) where we do not have an alternative as of now, even though it does not use the arroyo implementation but a previous one. We either have to replace the current implementation with the arroyo synchronized consumer or redesign it entirely and there is no plan for the second at present.

@fpacifici
Copy link
Collaborator

If the goal is to make a more unit-testable consumers, is the ArroyoConsumer the right level of abstraction to make the in-memory consumer? I would have thought the thing to replace would have been the rdkafka consumer that the Arroyo consumer relies on.

That is possible with the rust rdkafka. It is not possible in the original python arroyo implementation as there is no abstract way from the python confluent library to simulate a consumer, at least if you want to retain type hints. And you cannot just monkey patch the kafka implementation because that is not python code but just C code exposed as a python class.

So I would be ok doing this in this implementation. Though it cannot be done in the original one.

@fpacifici
Copy link
Collaborator

What is the arroyo user experience with respect to rebalancing?

The application developer does not see that happening. They only know about two events: the strategy is created with a new state (if it is stateful). The strategy is terminated and thus they have the responsibility of cleaning up the state, which can include committing offset, flushing producers writing on DB, sending smoke signal to somebody, etc.

The application does not know which partitions have been assigned or revoked, it does not know if it is consumer termination or if it is rebalancing, it does not know if kafka simply took partitions away and reassigned, it does not know if the rebalancing is eager or cooperative, how to reset offsets, when and why. So the application only care about its state: how to initialize it. How to close it in a consistent way.

Is the abstraction meant to make it look like rebalancing is not a thing or is it a process that the user is supposed to plug into?

The user should not plug into rebalancing. That would require to know all the possible types of rebalancing and to interpret separately partitions revocation and assignment. Furthermore the application would have to deal with consumer termination separately. The application does not need to deal with this complexity.

@fpacifici
Copy link
Collaborator

What else is arroyo for?

In one sentence: simplify the development of streaming applications by providing reusable patterns and hiding pitfalls that come when dealing with Kafka. This includes making it possible to unit test them.
Today it is centered around Kafka, but I would expect we will break this soon as there is a lot of demand to support RedPanda, so we will likely want to find a proper abstraction that works in both.

I know there's stuff in arroyo about multiprocessing where does that come in?

Messaging/streaming applications tend to follow some common patterns (a lot of theory here https://www.enterpriseintegrationpatterns.com/patterns/messaging/) so it is quite useful to have implementation of these higher level primitives (rdkafka itself has some in the streaming consumer).

  • multi process (mostly useful in python where we cannot do multi threading and the multi process implementation is quite complex)
  • synchronized consumers (see the comment about the post process forwarder)
  • batching strategies. This is a common pattern: process messages one by one, collect them in batch, write the batch somewhere.
  • Dead letter queue.
  • as soon as we start using it in sentry we are likely going to implement others like routers based on headers (which we have in the eventstream and post process consumer implemented independently)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants