Skip to content

Commit

Permalink
List members of partitions (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Oct 13, 2021
1 parent b96eb26 commit aeb9676
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 54 deletions.
8 changes: 6 additions & 2 deletions app/src/main/java/org/astraea/offset/OffsetExplorer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ static class Result {
this.partition = partition;
this.earliestOffset = startOffset;
this.latestOffset = endOffset;
this.groups = groups;
this.replicas = replicas;
this.groups =
groups.stream().sorted(Comparator.comparing(g -> g.groupId)).collect(Collectors.toList());
this.replicas =
replicas.stream()
.sorted(Comparator.comparing(r -> r.broker))
.collect(Collectors.toList());
}

@Override
Expand Down
167 changes: 121 additions & 46 deletions app/src/main/java/org/astraea/topic/TopicAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,69 @@ public void reassign(String topicName, int partition, Set<Integer> brokers) {

@Override
public Map<TopicPartition, List<Group>> groups(Set<String> topics) {
return Utils.handleException(
() ->
admin.listConsumerGroups().valid().get().stream()
.map(ConsumerGroupListing::groupId)
.collect(Collectors.toSet()))
.stream()
.flatMap(
group ->
Utils.handleException(
() ->
admin
.listConsumerGroupOffsets(group)
.partitionsToOffsetAndMetadata()
.get())
.entrySet()
.stream()
.filter(e -> topics.contains(e.getKey().topic()))
.map(e -> Map.entry(group, e)))
.collect(Collectors.groupingBy(e -> e.getValue().getKey()))
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
e.getValue().stream()
.map(
groupOffset ->
new Group(
groupOffset.getKey(),
groupOffset.getValue().getValue().offset()))
.collect(Collectors.toList())));
var groups =
Utils.handleException(() -> admin.listConsumerGroups().valid().get()).stream()
.map(ConsumerGroupListing::groupId)
.collect(Collectors.toList());

var allPartitions = partitions(topics);

var result = new HashMap<TopicPartition, List<Group>>();
Utils.handleException(() -> admin.describeConsumerGroups(groups).all().get())
.forEach(
(groupId, groupDescription) -> {
var partitionOffsets =
Utils.handleException(
() ->
admin
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get());

var partitionMembers =
groupDescription.members().stream()
.flatMap(
m ->
m.assignment().topicPartitions().stream()
.map(tp -> Map.entry(tp, m)))
.collect(Collectors.groupingBy(Map.Entry::getKey))
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
e.getValue().stream()
.map(Map.Entry::getValue)
.collect(Collectors.toList())));

allPartitions.forEach(
tp -> {
var offset =
partitionOffsets.containsKey(tp)
? OptionalLong.of(partitionOffsets.get(tp).offset())
: OptionalLong.empty();
var members =
partitionMembers.getOrDefault(tp, List.of()).stream()
.map(
m ->
new Member(
m.consumerId(),
m.groupInstanceId(),
m.clientId(),
m.host()))
.collect(Collectors.toList());
// This group is related to the partition only if it has either member or
// offset.
if (offset.isPresent() || !members.isEmpty()) {
result
.computeIfAbsent(tp, ignore -> new ArrayList<>())
.add(new Group(groupId, offset, members));
}
});
});

return result;
}

private Map<TopicPartition, Long> earliestOffset(Set<TopicPartition> partitions) {
Expand Down Expand Up @@ -114,15 +145,7 @@ public Set<String> topics() {

@Override
public Map<TopicPartition, Offset> offset(Set<String> topics) {
var partitions =
Utils.handleException(
() ->
admin.describeTopics(topics).all().get().entrySet().stream()
.flatMap(
e ->
e.getValue().partitions().stream()
.map(p -> new TopicPartition(e.getKey(), p.partition())))
.collect(Collectors.toSet()));
var partitions = partitions(topics);
var earliest = earliestOffset(partitions);
var latest = latestOffset(partitions);
return earliest.entrySet().stream()
Expand All @@ -132,6 +155,17 @@ public Map<TopicPartition, Offset> offset(Set<String> topics) {
Map.Entry::getKey, e -> new Offset(e.getValue(), latest.get(e.getKey()))));
}

private Set<TopicPartition> partitions(Set<String> topics) {
return Utils.handleException(
() ->
admin.describeTopics(topics).all().get().entrySet().stream()
.flatMap(
e ->
e.getValue().partitions().stream()
.map(p -> new TopicPartition(e.getKey(), p.partition())))
.collect(Collectors.toSet()));
}

@Override
public Map<TopicPartition, List<Replica>> replicas(Set<String> topics) {
var lags =
Expand Down Expand Up @@ -224,17 +258,58 @@ public Map<TopicPartition, List<Replica>> replicas(Set<String> topics) {
void reassign(String topicName, int partition, Set<Integer> brokers);

class Group {
public final String id;
public final long offset;
public final String groupId;
public final OptionalLong offset;
public final List<Member> members;

public Group(String id, long offset) {
this.id = id;
public Group(String groupId, OptionalLong offset, List<Member> members) {
this.groupId = groupId;
this.offset = offset;
this.members = members;
}

@Override
public String toString() {
return "Group{" + "id='" + id + '\'' + ", offset=" + offset + '}';
return "Group{"
+ "groupId='"
+ groupId
+ '\''
+ ", offset="
+ (offset.isEmpty() ? "none" : offset.getAsLong())
+ ", members="
+ members
+ '}';
}
}

class Member {
private final String memberId;
private final Optional<String> groupInstanceId;
private final String clientId;
private final String host;

public Member(String memberId, Optional<String> groupInstanceId, String clientId, String host) {
this.memberId = memberId;
this.groupInstanceId = groupInstanceId;
this.clientId = clientId;
this.host = host;
}

@Override
public String toString() {
return "Member{"
+ "memberId='"
+ memberId
+ '\''
+ ", groupInstanceId="
+ groupInstanceId
+ ", clientId='"
+ clientId
+ '\''
+ ", host='"
+ host
+ '\''
+ '}';
}
}

Expand Down
12 changes: 6 additions & 6 deletions app/src/test/java/org/astraea/offset/OffsetExplorerTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.astraea.offset;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import org.apache.kafka.common.TopicPartition;
import org.astraea.topic.TopicAdmin;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -38,7 +36,9 @@ public Map<TopicPartition, Offset> offset(Set<String> topics) {

@Override
public Map<TopicPartition, List<Group>> groups(Set<String> topics) {
return Map.of(topicPartition, List.of(new Group(groupId, groupOffset)));
return Map.of(
topicPartition,
List.of(new Group(groupId, OptionalLong.of(groupOffset), List.of())));
}

@Override
Expand Down Expand Up @@ -66,8 +66,8 @@ public void close() {}
Assertions.assertEquals(earliestOffset, result.get(0).earliestOffset);
Assertions.assertEquals(latestOffset, result.get(0).latestOffset);
Assertions.assertEquals(1, result.get(0).groups.size());
Assertions.assertEquals(groupId, result.get(0).groups.get(0).id);
Assertions.assertEquals(groupOffset, result.get(0).groups.get(0).offset);
Assertions.assertEquals(groupId, result.get(0).groups.get(0).groupId);
Assertions.assertEquals(groupOffset, result.get(0).groups.get(0).offset.getAsLong());
Assertions.assertEquals(1, result.get(0).replicas.size());
Assertions.assertEquals(brokerId, result.get(0).replicas.get(0).broker);
Assertions.assertEquals(lag, result.get(0).replicas.get(0).lag);
Expand Down

0 comments on commit aeb9676

Please sign in to comment.