Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
remove offsetAcker get
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang committed Feb 4, 2021
1 parent d289c2f commit 6e93572
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final String advertisedListeners;
private final int defaultNumPartitions;
public final int maxReadEntriesNum;
// store the group name for current connected client.
public final ConcurrentHashMap<String, String> currentConnectedGroup;
public final String groupIdStoredPath;
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartit
// make sure internal consumer existed
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
if (!requestHandler.getGroupCoordinator()
.getoffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) {
.getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) {
log.warn("not get consumer for group {} this time", groupId);
consumerFuture.complete(null);
return consumerFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,6 @@ public void shutdown() {
log.info("Shutdown group coordinator completely.");
}

public OffsetAcker getoffsetAcker() {
return offsetAcker;
}

public int partitionFor(String coordinatorKey) {
return groupManager.partitionFor(coordinatorKey);
}
Expand Down

0 comments on commit 6e93572

Please sign in to comment.