-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Client.Coordinator() to retrieve the coordinating broker for a consumer group #411
Conversation
return coordinator, nil | ||
} | ||
|
||
coordinator, err := client.refreshCoordinator(consumerGroup, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Number of retries is hardcoded for now.
Outstanding question: in the functional test, the retry attempts and backoff are set to very high values right now. Normally the default values are more than enough, but during the first call to I am not sure if this warrants using different configuration values. On Travis CI, this initial call takes about 4 seconds. |
300b948
to
9596184
Compare
Note: this only works against Kafka 0.8.2 and higher. |
|
|
The concern I separated with the Leader design is lazy open - only Also, nothing uses the |
The |
No no no, this is the same mistake I made with |
|
||
// If the number of partitions is large, we can get some churn calling cachedPartitions, | ||
// so the result is cached. It is important to update this value whenever metadata is changed | ||
cachedPartitionsResults map[string][maxPartitionIndex][]int32 | ||
lock sync.RWMutex // protects access to the maps, only one since they're always written together | ||
|
||
lock sync.RWMutex // protects access to the metadata and coordinator maps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and broker map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and cachedPartitionsResults map
Updated |
Remind me again why that caused issues for |
Looks like for all subsequent attempts for new consumergroups, the call succeeds immediately. So |
Basically I made it private but used it from outside the client. The semantics were confused. Lots of things. The solution finally boiled down to two different tasks the user might want to accomplish:
The first can already be accomplished (now that we have lazy connections) by just calling So based on all that, I think we should add edit, summary: |
Removed |
Some final 👀 ? |
@@ -42,6 +43,14 @@ type Client interface { | |||
// offset, OffsetNewest for the offset of the message that will be produced next, or a time. | |||
GetOffset(topic string, partitionID int32, time int64) (int64, error) | |||
|
|||
// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached | |||
// value if it's available. This may be stale, in which case you should call RefreshCoordinator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the UX of saying "this may be stale", I think the necessary information is conveyed with just "it's locally cached".
return err | ||
} | ||
|
||
coordinator := &Broker{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once #413 lands this can go away
@@ -42,6 +42,14 @@ type Client interface { | |||
// offset, OffsetNewest for the offset of the message that will be produced next, or a time. | |||
GetOffset(topic string, partitionID int32, time int64) (int64, error) | |||
|
|||
// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached | |||
// value if it's available. This value is cached locally; you can call RefreshCoordinator to update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need to says it's cached locally twice :)
@eapache final 👀 |
// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered | ||
// in the brokers map. It returns the broker that is registered, which may be the provided broker, | ||
// or a previously registered Broker instance. | ||
func (client *client) registerBroker(broker *Broker) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe worth mentioning this must be called with the lock held in the comment?
Very minor nits, but LGTM. Squash it and |
… retrieve the coordinating broker for a consumer group.
Add Client.Coordinator() to retrieve the coordinating broker for a consumer group
Damn, I spent hours trying to get Coordinator to work on v0.8.1.1. Maybe, it should be added to the docs? |
Great! |
This is the first step towards consumer group offset management, and later full consumer group management in Kafka 0.8.3.
Careful review of the 2 locks would be appreciated. Do we need to locks or can be just use the single lock for everything?
Extracted from #379.
@Shopify/kafka