Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async split_partition_queue to StreamConsumer #351

Closed

Conversation

nemosupremo
Copy link
Contributor

@nemosupremo nemosupremo commented Mar 9, 2021

I currently need a way to consume partitions concurrently in an asynchronous context. BaseConsumer currently provides split_partition_queue for "synchronous" polling, but StreamConsumer provides no function. I decided to take a stab at implementing it, as it seemed simple enough; however this PR is more in the design stage.

What I opted to do was "overload" MessageStream to simply rollover an enum of values, either a StreamConsumer or Arc<BaseConsumer> + Partition. To achieve this I also had StreamConsumer always hold on to an Arc of BaseConsumer, which I think should have a very minimal cost compared to the owned version.

The only problem I see is that each PartitionQueue has its lifetime tied to the StreamConsumer despite each PartitionQueue holding an Arc to base consumer. I'm not sure yet if this is an onerous restriction, however an alternate design would require a new MessageStream type (which I think may duplicate more code), or allowing MessageStream to hold an Arc to StreamConsumer or making the implantation of MessageStream generic over the "consumer".

@nemosupremo
Copy link
Contributor Author

nemosupremo commented Mar 9, 2021

Upon further research, it seems like for this to work there would need to be a type of BorrowedMessage that has a 'static lifetime but also holds a reference to its consumer in an Arc, or to just give up on lifetimes and have AsyncPartitionQueue own it's buffers.

@nemosupremo
Copy link
Contributor Author

I've committed on a second approach, which is more flexible regarding the lifetime of the Async Partition Queue. I've added a new type ArcMessage, which like a BorrowedMessage is zero-copy, but instead of being tied to the lifetime of the consumer, it instead holds an Arc to the consumer. Next, MessageStream is now generic over the MessageType (which may break the API?) which allows this one type to have implement Stream.

@nemosupremo
Copy link
Contributor Author

Now the only remaining problem is much of the Consumer API requires a BorrowedMessage. A path forward would be to change the api to accept something like AsRef<BorrowedMessage>

@nemosupremo nemosupremo force-pushed the stream/split_partition branch from 5907f46 to afff390 Compare March 9, 2021 23:05
@nemosupremo
Copy link
Contributor Author

Ok, this api now has a design that I think works best. What I've opted to do is to add a PartitionStream type that holds a reference to Arc<StreamConsumer>. Much like BaseConsumer's split_partition_queue, StreamConsumer has the same function with the same signature (requiring an Arc), and StreamConsumer holds an Arc to base now as well.

PartitionStream has a stream function that returns a MessageStream, like normal, but now the lifetime is conceptually tied to the PartitionQueue instead making the API more flexible than my first attempt. Hopefully this is good enough to be merged now.

@nemosupremo nemosupremo marked this pull request as draft March 10, 2021 17:06
@nemosupremo nemosupremo marked this pull request as ready for review March 10, 2021 17:28
@nemosupremo
Copy link
Contributor Author

One final change: I've opted to remove the boxing of BaseConsumer in StreamConsumer. It seemed silly to have the double Arc requirement.

@nemosupremo nemosupremo force-pushed the stream/split_partition branch from c4f6cd0 to e931027 Compare March 10, 2021 18:44
@benesch
Copy link
Collaborator

benesch commented Mar 29, 2021

Thanks very much for working on this, @nemosupremo! I'm sorry I haven't had a chance to look things over yet. Things are busy at work right now, and this partition queue stuff is really subtle. Definitely a useful change though. Hopefully have time for a proper review soon.

@benesch
Copy link
Collaborator

benesch commented Nov 27, 2021

Sorry for the very long delay here. I put together a slightly different implementation in #411, but one that was very much based on this one! The big difference is a refactor that allowed the removal of the MessageStreamConsumer type.

@benesch benesch closed this Nov 27, 2021
@benesch
Copy link
Collaborator

benesch commented Nov 27, 2021

Thanks again for the PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants