Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASSIGNOR] Assignor handle admin #1415

Merged
merged 10 commits into from
Jan 14, 2023
36 changes: 23 additions & 13 deletions common/src/main/java/org/astraea/common/assignor/Assignor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.cost.HasPartitionCost;
import org.astraea.common.cost.ReplicaSizeCost;
import org.astraea.common.metrics.collector.MetricCollector;
Expand All @@ -41,11 +43,12 @@
public abstract class Assignor implements ConsumerPartitionAssignor, Configurable {
public static final String JMX_PORT = "jmx.port";
Function<Integer, Optional<Integer>> jmxPortGetter = (id) -> Optional.empty();
private String bootstrap;
HasPartitionCost costFunction = HasPartitionCost.EMPTY;
// TODO: metric collector may be configured by user in the future.
// TODO: need to track the performance when using the assignor in large scale consumers, see
// https://github.com/skiptests/astraea/pull/1162#discussion_r1036285677
private final MetricCollector metricCollector =
protected final MetricCollector metricCollector =
MetricCollector.builder()
.interval(Duration.ofSeconds(1))
.expiration(Duration.ofSeconds(15))
Expand All @@ -55,12 +58,11 @@ public abstract class Assignor implements ConsumerPartitionAssignor, Configurabl
* Perform the group assignment given the member subscriptions and current cluster metadata.
*
* @param subscriptions Map from the member id to their respective topic subscription.
* @param topicPartitions Current topic/broker metadata known by consumer.
* @param clusterInfo Current cluster information fetched by admin.
* @return Map from each member to the list of partitions assigned to them.
*/
protected abstract Map<String, List<TopicPartition>> assign(
Map<String, org.astraea.common.assignor.Subscription> subscriptions,
Set<TopicPartition> topicPartitions);
Map<String, org.astraea.common.assignor.Subscription> subscriptions, ClusterInfo clusterInfo);
// TODO: replace the topicPartitions by ClusterInfo after Assignor is able to handle Admin
// https://github.com/skiptests/astraea/issues/1409

Expand Down Expand Up @@ -102,6 +104,17 @@ protected void registerLocalJMX(Map<Integer, String> unregister) {
unregister.forEach((id, host) -> metricCollector.registerLocalJmx(id));
}

/**
* update cluster information
*
* @return cluster information
*/
private ClusterInfo updateClusterInfo() {
try (Admin admin = Admin.of(bootstrap)) {
return admin.topicNames(false).thenCompose(admin::clusterInfo).toCompletableFuture().join();
}
}

/**
* Parse cost function names and weight. you can specify multiple cost function with assignor. The
* format of key and value pair is "<CostFunction name>"="<weight>". For instance,
Expand Down Expand Up @@ -138,20 +151,16 @@ static Map<HasPartitionCost, Double> parseCostFunctionWeight(Configuration confi

@Override
public final GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {
var clusterInfo = updateClusterInfo();
// convert Kafka's data structure to ours
var subscriptionsPerMember =
org.astraea.common.assignor.GroupSubscription.from(groupSubscription).groupSubscription();

var topicPartitions =
metadata.topics().stream()
.flatMap(
name ->
metadata.partitionsForTopic(name).stream()
.map(p -> TopicPartition.of(p.topic(), p.partition())))
.collect(Collectors.toSet());
// TODO: Detected if consumers subscribed to the same topics.
// For now, assume that the consumers only subscribed to identical topics

return new GroupAssignment(
assign(subscriptionsPerMember, topicPartitions).entrySet().stream()
assign(subscriptionsPerMember, clusterInfo).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
Expand All @@ -168,6 +177,7 @@ public final void configure(Map<String, ?> configs) {
Configuration.of(
configs.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())));
config.string(ConsumerConfigs.BOOTSTRAP_SERVERS_CONFIG).ifPresent(s -> bootstrap = s);
var costFunctions = parseCostFunctionWeight(config);
var customJMXPort = PartitionerUtils.parseIdJMXPort(config);
var defaultJMXPort = config.integer(JMX_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,34 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.TopicPartition;

public class RandomAssignor extends Assignor {

@Override
public Map<String, List<TopicPartition>> assign(
Map<String, org.astraea.common.assignor.Subscription> subscriptions,
Set<TopicPartition> topicPartitions) {
ClusterInfo clusterInfo) {
var assignments = new HashMap<String, List<TopicPartition>>();
var consumers = new ArrayList<>(subscriptions.keySet());
Set<String> topics = new HashSet<>();
consumers.forEach(consumer -> assignments.put(consumer, new ArrayList<>()));

topicPartitions.forEach(
topicPartition -> {
var consumer = consumers.get((int) (Math.random() * consumers.size()));
assignments.get(consumer).add(topicPartition);
});
for (org.astraea.common.assignor.Subscription subscription : subscriptions.values())
topics.addAll(subscription.topics());

clusterInfo.topicPartitions().stream()
.filter(tp -> topics.contains(tp.topic()))
.forEach(
tp -> {
var consumer = consumers.get((int) (Math.random() * consumers.size()));
assignments.get(consumer).add(tp);
});

return assignments;
}
Expand Down