Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DISPATCHER] create Admin for all dispatcher to fetch full cluster info #1402

Merged
merged 1 commit into from
Jan 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 54 additions & 17 deletions common/src/main/java/org/astraea/common/partitioner/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,31 @@
*/
package org.astraea.common.partitioner;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.Replica;
import org.astraea.common.admin.ReplicaInfo;
import org.astraea.common.producer.ProducerConfigs;

public abstract class Dispatcher implements Partitioner {
/**
* cache the cluster info to reduce the cost of converting cluster. Producer does not update
* Cluster frequently, so it is ok to cache it.
*/
static final ConcurrentHashMap<Cluster, ClusterInfo<ReplicaInfo>> CLUSTER_CACHE =
new ConcurrentHashMap<>();
private static final Duration CLUSTER_INFO_LEASE = Duration.ofSeconds(15);

static final ThreadLocal<Interdependent> THREAD_LOCAL =
ThreadLocal.withInitial(Interdependent::new);

private final AtomicLong lastUpdated = new AtomicLong(-1);
volatile ClusterInfo<Replica> clusterInfo = ClusterInfo.empty();
Admin admin = null;

/**
* Compute the partition for the given record.
*
Expand All @@ -47,7 +50,7 @@ public abstract class Dispatcher implements Partitioner {
* @param clusterInfo The current cluster metadata
*/
protected abstract int partition(
String topic, byte[] key, byte[] value, ClusterInfo<ReplicaInfo> clusterInfo);
String topic, byte[] key, byte[] value, ClusterInfo<? extends ReplicaInfo> clusterInfo);

/**
* configure this dispatcher. This method is called only once.
Expand All @@ -59,7 +62,9 @@ protected void configure(Configuration config) {}
protected void onNewBatch(String topic, int prevPartition) {}

@Override
public void close() {}
public void close() {
if (admin != null) Utils.packException(admin::close);
}

// -----------------------[interdependent]-----------------------//

Expand Down Expand Up @@ -138,10 +143,13 @@ private static class Interdependent {

@Override
public final void configure(Map<String, ?> configs) {
configure(
var config =
Configuration.of(
configs.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()))));
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())));
config.string(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG).ifPresent(s -> admin = Admin.of(s));
configure(config);
tryToUpdate(null);
}

@Override
Expand All @@ -150,16 +158,45 @@ public final int partition(
var interdependent = THREAD_LOCAL.get();
if (interdependent.isInterdependent && interdependent.targetPartitions >= 0)
return interdependent.targetPartitions;
var target =
partition(
topic,
keyBytes,
valueBytes,
CLUSTER_CACHE.computeIfAbsent(cluster, ignored -> ClusterInfo.of(cluster)));
tryToUpdate(topic);
final int target;
if (!clusterInfo.topics().contains(topic)) {
// the cached cluster info is not updated, so we just return a random partition
var ps = cluster.availablePartitionsForTopic(topic);
target = ps.isEmpty() ? 0 : ps.get((int) (Math.random() * ps.size())).partition();
} else target = partition(topic, keyBytes, valueBytes, clusterInfo);
interdependent.targetPartitions = target;
return target;
}

boolean tryToUpdate(String topic) {
if (admin == null) return false;
var now = System.currentTimeMillis();
// need to refresh cluster info if
// 1) the topic is not included by ClusterInfo
// 2) lease expires
if (lastUpdated.updateAndGet(
last -> {
if (topic != null && !clusterInfo.topics().contains(topic)) return now;
if (now - last >= CLUSTER_INFO_LEASE.toMillis()) return now;
return last;
})
== now) {
admin
.topicNames(true)
.thenCompose(names -> admin.clusterInfo(names))
.whenComplete(
(c, e) -> {
if (c != null) {
this.clusterInfo = c;
lastUpdated.set(System.currentTimeMillis());
}
});
return true;
}
return false;
}

@Override
public final void onNewBatch(String topic, Cluster cluster, int prevPartition) {
onNewBatch(topic, prevPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class StrictCostDispatcher extends Dispatcher {

volatile long timeToUpdateRoundRobin = -1;

void tryToUpdateFetcher(ClusterInfo<ReplicaInfo> clusterInfo) {
void tryToUpdateFetcher(ClusterInfo<? extends ReplicaInfo> clusterInfo) {
// register new nodes to metric collector
costFunction
.fetcher()
Expand All @@ -96,7 +96,7 @@ void tryToUpdateFetcher(ClusterInfo<ReplicaInfo> clusterInfo) {

@Override
public int partition(
String topic, byte[] key, byte[] value, ClusterInfo<ReplicaInfo> clusterInfo) {
String topic, byte[] key, byte[] value, ClusterInfo<? extends ReplicaInfo> clusterInfo) {
var partitionLeaders = clusterInfo.replicaLeaders(topic);
// just return first partition if there is no available partitions
if (partitionLeaders.isEmpty()) return 0;
Expand All @@ -119,7 +119,7 @@ public int partition(
return candidate.get((int) (Math.random() * candidate.size())).partition();
}

synchronized void tryToUpdateRoundRobin(ClusterInfo<ReplicaInfo> clusterInfo) {
synchronized void tryToUpdateRoundRobin(ClusterInfo<? extends ReplicaInfo> clusterInfo) {
if (System.currentTimeMillis() >= timeToUpdateRoundRobin) {
var roundRobin =
RoundRobin.smooth(
Expand Down Expand Up @@ -206,5 +206,6 @@ public static Map<HasBrokerCost, Double> parseCostFunctionWeight(Configuration c
@Override
public void close() {
metricCollector.close();
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public synchronized void init(Supplier<Map<Integer, Double>> brokerScore) {
*
* @return the preferred ID
*/
public synchronized int getAndChoose(String topic, ClusterInfo<ReplicaInfo> clusterInfo) {
public synchronized int getAndChoose(
String topic, ClusterInfo<? extends ReplicaInfo> clusterInfo) {
// TODO Update brokerID with ClusterInfo frequency.
var brokerID =
brokersIDofTopic.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ public class SmoothWeightRoundRobinDispatcher extends Dispatcher {

private final NeutralIntegratedCost neutralIntegratedCost = new NeutralIntegratedCost();

private List<ReplicaInfo> partitions;
private List<? extends ReplicaInfo> partitions;

public static final String JMX_PORT = "jmx.port";

@Override
protected int partition(
String topic, byte[] key, byte[] value, ClusterInfo<ReplicaInfo> clusterInfo) {
String topic, byte[] key, byte[] value, ClusterInfo<? extends ReplicaInfo> clusterInfo) {
var targetPartition = unusedPartitions.poll();
refreshPartitionMetaData(clusterInfo, topic);
Supplier<Map<Integer, Double>> supplier =
Expand Down Expand Up @@ -92,6 +92,7 @@ protected int partition(
@Override
public void close() {
metricCollector.close();
super.close();
}

@Override
Expand All @@ -118,15 +119,17 @@ int jmxPort(int id) {
() -> new NoSuchElementException("broker: " + id + " does not have jmx port"));
}

private int nextValue(String topic, ClusterInfo<ReplicaInfo> clusterInfo, int targetBroker) {
private int nextValue(
String topic, ClusterInfo<? extends ReplicaInfo> clusterInfo, int targetBroker) {
return topicCounter
.computeIfAbsent(topic, k -> new BrokerNextCounter(clusterInfo))
.brokerCounter
.get(targetBroker)
.getAndIncrement();
}

private void refreshPartitionMetaData(ClusterInfo<ReplicaInfo> clusterInfo, String topic) {
private void refreshPartitionMetaData(
ClusterInfo<? extends ReplicaInfo> clusterInfo, String topic) {
partitions = clusterInfo.availableReplicas(topic);
partitions.stream()
.filter(p -> !metricCollector.listIdentities().contains(p.nodeInfo().id()))
Expand All @@ -141,7 +144,7 @@ private void refreshPartitionMetaData(ClusterInfo<ReplicaInfo> clusterInfo, Stri
private static class BrokerNextCounter {
private final Map<Integer, AtomicInteger> brokerCounter;

BrokerNextCounter(ClusterInfo<ReplicaInfo> clusterInfo) {
BrokerNextCounter(ClusterInfo<? extends ReplicaInfo> clusterInfo) {
brokerCounter =
clusterInfo.nodes().stream()
.collect(Collectors.toMap(NodeInfo::id, node -> new AtomicInteger(0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.security.Key;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -51,14 +52,95 @@
import org.junit.jupiter.api.Test;

public class DispatcherTest extends RequireSingleBrokerCluster {

@Test
void testNoTopicInCachedClusterInfo() {
var topicName = Utils.randomString();
var count = new AtomicInteger(0);
try (var dispatcher =
new Dispatcher() {
@Override
public int partition(
String topic,
byte[] key,
byte[] value,
ClusterInfo<? extends ReplicaInfo> clusterInfo) {
Assertions.assertNotNull(clusterInfo);
return 0;
}

@Override
boolean tryToUpdate(String topic) {
if (topic != null) Assertions.assertEquals(topicName, topic);
var rval = super.tryToUpdate(topic);
if (rval) count.incrementAndGet();
return rval;
}
}) {

dispatcher.configure(Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()));

// the topic is nonexistent in cluster, so it always request to update cluster info
Assertions.assertEquals(
0,
dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty()));
Assertions.assertEquals(2, count.get());
Assertions.assertEquals(
0,
dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty()));
Assertions.assertEquals(3, count.get());

// create the topic
dispatcher.admin.creator().topic(topicName).run().toCompletableFuture().join();
Utils.sleep(Duration.ofSeconds(2));
Assertions.assertEquals(
0,
dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty()));
Assertions.assertEquals(4, count.get());

// ok, the topic exists now, and it should not request to update
Utils.sleep(Duration.ofSeconds(2));
Assertions.assertEquals(
0,
dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty()));
Assertions.assertEquals(4, count.get());
}
}

@Test
void testUpdateClusterInfo() {

try (var dispatcher =
new Dispatcher() {
@Override
public int partition(
String topic,
byte[] key,
byte[] value,
ClusterInfo<? extends ReplicaInfo> clusterInfo) {
Assertions.assertNotNull(clusterInfo);
return 0;
}
}) {

dispatcher.configure(Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()));
Assertions.assertNotNull(dispatcher.admin);
Utils.sleep(Duration.ofSeconds(3));
Assertions.assertNotEquals(0, dispatcher.clusterInfo.nodes().size());
}
}

@Test
void testNullKey() {
var count = new AtomicInteger();
var dispatcher =
new Dispatcher() {
@Override
public int partition(
String topic, byte[] key, byte[] value, ClusterInfo<ReplicaInfo> clusterInfo) {
String topic,
byte[] key,
byte[] value,
ClusterInfo<? extends ReplicaInfo> clusterInfo) {
Assertions.assertNull(key);
Assertions.assertNull(value);
count.incrementAndGet();
Expand All @@ -76,26 +158,6 @@ public void configure(Configuration config) {
Assertions.assertEquals(1, count.get());
dispatcher.partition(
"t", null, null, null, null, new Cluster("aa", List.of(), List.of(), Set.of(), Set.of()));
Assertions.assertEquals(2, count.get());
}

@Test
void testClusterCache() {
var dispatcher =
new Dispatcher() {
@Override
public int partition(
String topic, byte[] key, byte[] value, ClusterInfo<ReplicaInfo> clusterInfo) {
return 0;
}
};
dispatcher.configure(Map.of());
var initialCount = Dispatcher.CLUSTER_CACHE.size();
var cluster = new Cluster("aa", List.of(), List.of(), Set.of(), Set.of());
dispatcher.partition("topic", "a", new byte[0], "v", new byte[0], cluster);
Assertions.assertEquals(initialCount + 1, Dispatcher.CLUSTER_CACHE.size());
dispatcher.partition("topic", "a", new byte[0], "v", new byte[0], cluster);
Assertions.assertEquals(initialCount + 1, Dispatcher.CLUSTER_CACHE.size());
}

@RepeatedTest(5)
Expand Down