Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question - "dynamic subscriptions" for direct consumer #475

Closed
superdodd opened this issue Jun 13, 2023 · 1 comment
Closed

Question - "dynamic subscriptions" for direct consumer #475

superdodd opened this issue Jun 13, 2023 · 1 comment
Labels
enhancement New feature or request

Comments

@superdodd
Copy link

superdodd commented Jun 13, 2023

Hello - I have a question about the proper usage pattern of the kgo client in a situation where a given client may need to start and stop consumption of (many different) individual partitions over the course of its lifetime. In brief, we have a way of externally assigning partitions for consumption (either as active or standby) - in either case the partition must be consumed, but the contents may be treated differently. Over time, active or standby partitions may be assigned or unassigned to a given instance as load characteristics change.

Is there a way - easy, or even just correct - of handling this with a single long-lived kgo.Client? It seems like it should be possible to add (and remove) individual partition subscriptions in a running client without affecting the pipeline of fetched/buffered records for other partitions, but I can't see how to do it given the public API of the client. Our current approach is to close and recreate the client entirely when any partition subscriptions change, but this seems a little harsh in order to, for example, add the 100th partition subscription (because it could involve re-fetching any buffered data for the 99 other partition subscriptions that didn't change).

In particular, I guess I'm looking for something like AddConsumePartitions and PurgePartitionsFromClient that work at a partition level rather than a whole-topic level.

@twmb
Copy link
Owner

twmb commented Jun 27, 2023

Sorry for the delay. Are you using a consumer group or no? AddConsumePartitions would only be relevant for a direct consumer. PurgePartitionsFromClient is not possible because the client itself internally knows of a partition because it knows of a topic, you can't purge a partition because the client must still know of the topic itself (and thus the partition).

If you are using a direct consumer: can you use a combination of AddFetchTopics, PauseFetchPartitions, and ResumeFetchPartitions, PurgeTopicsFromClient to:

  • If you need to consume a partition on a new topic, add the topic then pause all unrelated partitions
  • Resume partitions you need to consume
  • Purge a topic if you decide you do not need to consume any partitions in it anymore
    Note that there will be a few stray messages after adding a topic and pausing (and clearing what's buffered). If you need to exactly consume specific offsets and avoid consuming more / resume exactly at where you wanted to leave off, you'll also have to use SetOffsets, but that gets a little tricky (although it's easier in a direct consumer).

I think AddConsumePartitions is feasible from a feature request standpoint, to avoid the need to add a full topic then pause everything unrelated. PauseFetchPartitions suffices for your PurgePartitionsFromClient request, and then a full purge once you do not want to consume any partitions in the topic anymore.

None of this applies for a group consumer, because as a group consumer you do not actually own which partitions you are consuming.

Let me know if the above add / pause / resume idea works. I'll consider AddConsumePartitions for a feature request for an eventual v1.14 but there still isn't much in the pipeline for that, and if the above idea works, it's good for the meantime.

@twmb twmb added the enhancement New feature or request label Jun 27, 2023
@twmb twmb mentioned this issue Jun 30, 2023
17 tasks
twmb added a commit that referenced this issue Jul 7, 2023
There are some catches, but this isn't too bad.

Closes #475.
twmb added a commit that referenced this issue Jul 8, 2023
There are some catches, but this isn't too bad.

Closes #475.
@twmb twmb closed this as completed in 34c8b3d Jul 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants