Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement poisson partitioner #27

Merged
merged 56 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
07e9d64
astraea v0.0.1
wycccccc Sep 23, 2021
85393a9
spotlessApply
wycccccc Sep 25, 2021
ea98c4c
v0.0.2
wycccccc Sep 26, 2021
95092e0
v0.0.3
wycccccc Sep 26, 2021
7abf422
fix testTearDown
wycccccc Sep 26, 2021
c48b44d
add TestSmoothPartitioner
wycccccc Oct 1, 2021
64a8899
spotless
wycccccc Oct 1, 2021
7e89a7b
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 2, 2021
01b47b8
Integration metrics
wycccccc Oct 5, 2021
71bd9ac
bit fix
wycccccc Oct 5, 2021
f0a7117
add some annotation
wycccccc Oct 5, 2021
96aa30d
reslove conflict
wycccccc Oct 5, 2021
360a134
reslove spotless
wycccccc Oct 5, 2021
8996093
resolve test failed
wycccccc Oct 5, 2021
04e63fd
resolve test failed
wycccccc Oct 5, 2021
1924336
Update KafkaMetricClientApp.java
wycccccc Oct 5, 2021
9e8775a
reslove failed
wycccccc Oct 5, 2021
e525334
Merge branch 'nodeLoad' of github.com:wycccccc/astraea into nodeLoad
wycccccc Oct 5, 2021
b33c800
Update NodeLoadClientTest.java
wycccccc Oct 5, 2021
793876a
Merge branch 'nodeLoad' of github.com:wycccccc/astraea into nodeLoad
wycccccc Oct 5, 2021
7511db1
reslove failed test
wycccccc Oct 5, 2021
694cb7e
reslove failed test
wycccccc Oct 5, 2021
d6d741d
reslove failed test
wycccccc Oct 5, 2021
8a49aef
reslove failed test
wycccccc Oct 5, 2021
9839491
ensure nodeLoadClient
wycccccc Oct 5, 2021
d82f34b
ensure nodeLoadClient
wycccccc Oct 5, 2021
e825ce2
resolve thread-safe
wycccccc Oct 5, 2021
760e6d9
resolve thread-safe
wycccccc Oct 5, 2021
8c9ce46
thread-safe partitioner
wycccccc Oct 8, 2021
7135da0
bit fix
wycccccc Oct 8, 2021
e0d53fb
little fix
wycccccc Oct 8, 2021
52e7873
Refactor SmoothPartitionerFactoryTest
wycccccc Oct 10, 2021
c5c46de
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 10, 2021
5493100
thread-safe continue
wycccccc Oct 10, 2021
23512c4
remove redundant code
wycccccc Oct 10, 2021
ddfba6c
null
wycccccc Oct 13, 2021
ed53032
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 13, 2021
aa33c97
thread safe again
wycccccc Oct 13, 2021
513d292
thread safe again
wycccccc Oct 13, 2021
9bd6038
thread-safe for system logic
wycccccc Oct 17, 2021
eec470d
complete proposes
wycccccc Oct 17, 2021
fde7a46
complete proposes again
wycccccc Oct 17, 2021
6fb03f3
complete proposes again
wycccccc Oct 18, 2021
44cb313
help commit
wycccccc Oct 24, 2021
e31ae18
complete test
wycccccc Oct 25, 2021
f55648a
Merge branch 'main' of github.com:wycccccc/astraea into nodeLoad
wycccccc Oct 27, 2021
79cc949
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 27, 2021
669ae68
replace jmx
wycccccc Oct 27, 2021
480785d
add consumer
wycccccc Oct 28, 2021
8265989
resolve conflicts
wycccccc Oct 28, 2021
bed5b4d
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 29, 2021
d8182d4
replace producer and consumer
wycccccc Oct 29, 2021
c541685
replace producer for other test
wycccccc Oct 30, 2021
a8e7413
test
wycccccc Oct 30, 2021
3dfe408
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Nov 1, 2021
5857808
complete
wycccccc Nov 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@ repositories {
}

def versions = [
kafka: project.properties['kafka.version'] ?: "2.8.1",
kafka: project.properties['kafka.version'] ?: "2.8.0",
junit: project.properties['junit.version'] ?: "5.8.1",
mockito: project.properties['mockito.version'] ?: "3.12.4",
jcommander: project.properties['jcommander.version'] ?: "1.81",
slf4j: project.properties['slf4j.version'] ?: "1.7.32",
zookeeper: project.properties['zookeeper.version'] ?: "3.5.9",
]

dependencies {
testImplementation "org.junit.jupiter:junit-jupiter:${versions["junit"]}"
testImplementation "org.mockito:mockito-core:${versions["mockito"]}"
testImplementation "org.mockito:mockito-inline:${versions["mockito"]}"
testImplementation "org.apache.zookeeper:zookeeper:${versions["zookeeper"]}"
testImplementation "org.apache.kafka:kafka_2.12:${versions["kafka"]}"


implementation "org.apache.kafka:kafka-clients:${versions["kafka"]}"
implementation "com.beust:jcommander:${versions["jcommander"]}"
// we don't use slf4j actually, and it is used by kafka so we swallow the log.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.astraea.partitioner.nodeLoadMetric;

import java.util.HashMap;
import java.util.Map;

public class BrokersWeight {

/**
* Record the current weight of each node according to Poisson calculation and the weight after
* partitioner calculation.
*/
private static HashMap<String, int[]> brokerHashMap = new HashMap<>();

private LoadPoisson loadPoisson;

public BrokersWeight(LoadPoisson loadPoisson) {
this.loadPoisson = loadPoisson;
}

/** Change the weight of the node according to the current Poisson. */
public synchronized void setBrokerHashMap() {
HashMap<String, Double> poissonMap = loadPoisson.setAllPoisson();

for (Map.Entry<String, Double> entry : poissonMap.entrySet()) {
if (!brokerHashMap.containsKey(entry.getKey())) {
brokerHashMap.put(entry.getKey(), new int[] {(int) ((1 - entry.getValue()) * 20), 0});
} else {
brokerHashMap.put(
entry.getKey(),
new int[] {(int) ((1 - entry.getValue()) * 20), brokerHashMap.get(entry.getKey())[1]});
}
}
}

public synchronized int getAllWeight() {
int allWeight = 0;
for (Map.Entry<String, int[]> entry : brokerHashMap.entrySet()) {
allWeight += entry.getValue()[0];
}
return allWeight;
}

public synchronized HashMap<String, int[]> getBrokerHashMap() {
return brokerHashMap;
}

public synchronized void setCurrentBrokerHashMap(HashMap<String, int[]> currentBrokerHashMap) {
brokerHashMap = currentBrokerHashMap;
}

// Only for test
void setBrokerHashMapValue(String x, int y) {
brokerHashMap.put(x, new int[] {0, y});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.astraea.partitioner.nodeLoadMetric;

import java.util.HashMap;
import java.util.Map;

public class LoadPoisson {
private NodeLoadClient nodeLoadClient;

public LoadPoisson(NodeLoadClient nodeLoadClient) {
this.nodeLoadClient = nodeLoadClient;
}

public synchronized HashMap<String, Double> setAllPoisson() {
HashMap<String, Double> poissonMap = new HashMap<>();
int lambda = nodeLoadClient.getAvgLoadCount();
for (Map.Entry<String, Integer> entry : nodeLoadClient.getAllOverLoadCount().entrySet()) {
int x = nodeLoadClient.getBinOneCount(entry.getValue());
poissonMap.put(entry.getKey(), doPoisson(lambda, x));
}
return poissonMap;
}

public double doPoisson(int lambda, int x) {
double Probability = 0;
double ans = 0;

for (int i = 0; i <= x; i++) {
double j = Math.pow(lambda, i);
double e = Math.exp(-lambda);
long h = factorial(i);
Probability = (j * e) / h;
ans += Probability;
}

return ans;
}

public long factorial(long number) {
if (number <= 1) return 1;
else return number * factorial(number - 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.astraea.partitioner.nodeLoadMetric;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.astraea.concurrent.ThreadPool;
import org.astraea.metrics.jmx.MBeanClient;

public class NodeLoadClient implements ThreadPool.Executor {

private final OverLoadNode overLoadNode;
private final Collection<NodeMetadata> nodeMetadataCollection = new ArrayList<>();

public NodeLoadClient(HashMap<String, String> jmxAddresses) throws IOException {
for (HashMap.Entry<String, String> entry : jmxAddresses.entrySet()) {
this.nodeMetadataCollection.add(
new NodeMetadata(entry.getKey(), createNodeMetrics(entry.getKey(), entry.getValue())));
}
this.overLoadNode = new OverLoadNode(this.nodeMetadataCollection);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主要的改動比較集中在這個class中,我把那些怪怪的保證只有一個NodeLoadClient的功能全部摘掉了。現在的做法是一個partitioner創建一個NodeLoadClient,然後NodeLoadClient還是負責數據的更新與overload的記錄。


public NodeMetrics createNodeMetrics(String key, String value) throws IOException {
return new NodeMetrics(key, value);
}

/** A thread that continuously updates metricsfor NodeLoadClient. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metricsfor -> metrics for

@Override
public State execute() throws InterruptedException {
try {
refreshNodesMetrics();
overLoadNode.monitorOverLoad();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}

@Override
public void close() {
for (NodeMetadata nodeMetadata : nodeMetadataCollection) {
NodeMetrics nodeMetrics = nodeMetadata.getNodeMetrics();
MBeanClient mBeanClient = nodeMetrics.getKafkaMetricClient();
try {
mBeanClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我在這裡這樣寫算是有處理掉,上面所說的釋放資源這一問題上嘛。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這樣並沒有正確處理,因為在關閉的過程一樣沒有thread-safe

}

public synchronized HashMap<String, Integer> getAllOverLoadCount() {
HashMap<String, Integer> overLoadCount = new HashMap<>();
for (NodeMetadata nodeMetadata : nodeMetadataCollection) {
overLoadCount.put(nodeMetadata.getNodeID(), nodeMetadata.getOverLoadCount());
}
return overLoadCount;
}

public synchronized int getAvgLoadCount() {
double avgLoadCount = 0;
for (NodeMetadata nodeMetadata : nodeMetadataCollection) {
avgLoadCount += getBinOneCount(nodeMetadata.getOverLoadCount());
}
return nodeMetadataCollection.size() > 0
? (int) avgLoadCount / nodeMetadataCollection.size()
: 0;
}

/** Get the number of times a node is overloaded. */
public static int getBinOneCount(int n) {
int index = 0;
int count = 0;
while (n > 0) {
int x = n & 1 << index;
if (x != 0) {
count++;
n = n - (1 << index);
}
index++;
}
return count;
}

public void refreshNodesMetrics() {
for (NodeMetadata nodeMetadata : nodeMetadataCollection) {
NodeMetrics nodeMetrics = nodeMetadata.getNodeMetrics();
nodeMetrics.refreshMetrics();
nodeMetadata.setTotalBytes(nodeMetrics.totalBytesPerSec());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.astraea.partitioner.nodeLoadMetric;

/** Store information about each node */
public class NodeMetadata {
private String nodeID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final

private NodeMetrics nodeMetrics;
private double totalBytes;
private int overLoadCount;

NodeMetadata(String nodeID, NodeMetrics nodeMetrics) {
this.nodeID = nodeID;
this.nodeMetrics = nodeMetrics;
this.overLoadCount = 0;
this.totalBytes = 0.0;
}

public NodeMetrics getNodeMetrics() {
return this.nodeMetrics;
}

public void setOverLoadCount(int count) {
this.overLoadCount = count;
}

public void setTotalBytes(double bytes) {
this.totalBytes = bytes;
}

public double getTotalBytes() {
return this.totalBytes;
}

public String getNodeID() {
return this.nodeID;
}

public int getOverLoadCount() {
return this.overLoadCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.astraea.partitioner.nodeLoadMetric;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.management.remote.JMXServiceURL;
import org.astraea.metrics.jmx.MBeanClient;
import org.astraea.metrics.kafka.BrokerTopicMetricsResult;
import org.astraea.metrics.kafka.KafkaMetrics;

/** Responsible for connecting jmx according to the received address */
public class NodeMetrics {
private final String JMX_URI_FORMAT = "service:jmx:rmi:///jndi/rmi://" + "%s" + "/jmxrmi";
private final JMXServiceURL serviceURL;
private final MBeanClient mBeanClient;
private final String nodeID;
private HashMap<String, Double> metricsValues;
private Collection<String> argumentTargetMetrics = new ArrayList<>();

NodeMetrics(String ID, String address) throws IOException {
argumentTargetMetrics.add("BytesInPerSec");
argumentTargetMetrics.add("BytesOutPerSec");
nodeID = ID;
if (Pattern.compile("^service:").matcher(address).find())
serviceURL = new JMXServiceURL(address);
else serviceURL = new JMXServiceURL(createJmxUrl(address));
mBeanClient = new MBeanClient(serviceURL);
metricsValues = new HashMap();
}

public String createJmxUrl(String address) {
return String.format(JMX_URI_FORMAT, address);
}

public void refreshMetrics() {
List<KafkaMetrics.BrokerTopic> metrics =
argumentTargetMetrics.stream()
.map(KafkaMetrics.BrokerTopic::of)
.collect(Collectors.toUnmodifiableList());

List<BrokerTopicMetricsResult> collect =
metrics.stream().map(x -> x.fetch(mBeanClient)).collect(Collectors.toUnmodifiableList());

for (BrokerTopicMetricsResult result : collect) {
metricsValues.put(
result.beanObject().getProperties().get("name"),
(Double) result.beanObject().getAttributes().get("MeanRate"));
}
}

public double totalBytesPerSec() {
return metricsValues.get("BytesInPerSec") + metricsValues.get("BytesOutPerSec");
}

public MBeanClient getKafkaMetricClient() {
return this.mBeanClient;
}
}
Loading