-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP 1071- Refactor GroupMetadataManager #17941
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. A few initial comments.
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE; | ||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; | ||
|
||
public class GroupMetaManagerHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether this was intended to be GroupMetadataManagerHelper
.
* @param groupId The group id. | ||
* @param committedOffset A specified committed offset corresponding to this shard. | ||
* | ||
* @return A ConsumerGroup. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A ShareGroup.
private final Logger log; | ||
|
||
/** | ||
* The group manager. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Group config manager.
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that this import is incorrect. In almost all cases, ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH
is used, and I think actually that should be used in the other one also, meaning that this import is not required.
Time time, | ||
LogContext logContext, | ||
List<ConsumerGroupPartitionAssignor> consumerGroupAssignors, | ||
ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be a bit more legible with the line break after "policy" so the parentheses is at the state of the next line. Then the argument declarations will not blend into the assignments in the method body. There are a few other instances in this PR which would also benefit from the same formatting tweak.
} | ||
|
||
/** | ||
* Handles a DescribeGroup request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pedantic nit: It's DescribeGroups.
GroupCoordinatorMetricsShard metrics, | ||
Time time, | ||
CoordinatorTimer<Void, CoordinatorRecord> timer) { | ||
this.groupStore = groupStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Probably more legible with a line break before the ) {
so the method body is distinct from the argument declarations.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)