Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement poisson partitioner #27

Merged
merged 56 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
07e9d64
astraea v0.0.1
wycccccc Sep 23, 2021
85393a9
spotlessApply
wycccccc Sep 25, 2021
ea98c4c
v0.0.2
wycccccc Sep 26, 2021
95092e0
v0.0.3
wycccccc Sep 26, 2021
7abf422
fix testTearDown
wycccccc Sep 26, 2021
c48b44d
add TestSmoothPartitioner
wycccccc Oct 1, 2021
64a8899
spotless
wycccccc Oct 1, 2021
7e89a7b
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 2, 2021
01b47b8
Integration metrics
wycccccc Oct 5, 2021
71bd9ac
bit fix
wycccccc Oct 5, 2021
f0a7117
add some annotation
wycccccc Oct 5, 2021
96aa30d
reslove conflict
wycccccc Oct 5, 2021
360a134
reslove spotless
wycccccc Oct 5, 2021
8996093
resolve test failed
wycccccc Oct 5, 2021
04e63fd
resolve test failed
wycccccc Oct 5, 2021
1924336
Update KafkaMetricClientApp.java
wycccccc Oct 5, 2021
9e8775a
reslove failed
wycccccc Oct 5, 2021
e525334
Merge branch 'nodeLoad' of github.com:wycccccc/astraea into nodeLoad
wycccccc Oct 5, 2021
b33c800
Update NodeLoadClientTest.java
wycccccc Oct 5, 2021
793876a
Merge branch 'nodeLoad' of github.com:wycccccc/astraea into nodeLoad
wycccccc Oct 5, 2021
7511db1
reslove failed test
wycccccc Oct 5, 2021
694cb7e
reslove failed test
wycccccc Oct 5, 2021
d6d741d
reslove failed test
wycccccc Oct 5, 2021
8a49aef
reslove failed test
wycccccc Oct 5, 2021
9839491
ensure nodeLoadClient
wycccccc Oct 5, 2021
d82f34b
ensure nodeLoadClient
wycccccc Oct 5, 2021
e825ce2
resolve thread-safe
wycccccc Oct 5, 2021
760e6d9
resolve thread-safe
wycccccc Oct 5, 2021
8c9ce46
thread-safe partitioner
wycccccc Oct 8, 2021
7135da0
bit fix
wycccccc Oct 8, 2021
e0d53fb
little fix
wycccccc Oct 8, 2021
52e7873
Refactor SmoothPartitionerFactoryTest
wycccccc Oct 10, 2021
c5c46de
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 10, 2021
5493100
thread-safe continue
wycccccc Oct 10, 2021
23512c4
remove redundant code
wycccccc Oct 10, 2021
ddfba6c
null
wycccccc Oct 13, 2021
ed53032
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 13, 2021
aa33c97
thread safe again
wycccccc Oct 13, 2021
513d292
thread safe again
wycccccc Oct 13, 2021
9bd6038
thread-safe for system logic
wycccccc Oct 17, 2021
eec470d
complete proposes
wycccccc Oct 17, 2021
fde7a46
complete proposes again
wycccccc Oct 17, 2021
6fb03f3
complete proposes again
wycccccc Oct 18, 2021
44cb313
help commit
wycccccc Oct 24, 2021
e31ae18
complete test
wycccccc Oct 25, 2021
f55648a
Merge branch 'main' of github.com:wycccccc/astraea into nodeLoad
wycccccc Oct 27, 2021
79cc949
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 27, 2021
669ae68
replace jmx
wycccccc Oct 27, 2021
480785d
add consumer
wycccccc Oct 28, 2021
8265989
resolve conflicts
wycccccc Oct 28, 2021
bed5b4d
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Oct 29, 2021
d8182d4
replace producer and consumer
wycccccc Oct 29, 2021
c541685
replace producer for other test
wycccccc Oct 30, 2021
a8e7413
test
wycccccc Oct 30, 2021
3dfe408
Merge branch 'main' of https://github.com/skiptests/astraea into node…
wycccccc Nov 1, 2021
5857808
complete
wycccccc Nov 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ repositories {
def versions = [
kafka: project.properties['kafka.version'] ?: "2.8.0",
junit: project.properties['junit.version'] ?: "5.7.2",
mockito: project.properties['mockito.version'] ?: "3.12.4",
]

dependencies {
testImplementation "org.junit.jupiter:junit-jupiter:${versions["junit"]}"
testImplementation "org.mockito:mockito-core:${versions["mockito"]}"

implementation "org.apache.kafka:kafka-clients:${versions["kafka"]}"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.astraea.partitioner.nodeLoadMetric;

import java.util.HashMap;
import java.util.Map;

public class BrokersWeight {

/**
* Record the current weight of each node according to Poisson calculation and the weight after partitioner calculation.
*/
private static HashMap<Integer, int[]> brokerHashMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static? why?

private LoadPoisson loadPoisson;

BrokersWeight(LoadPoisson loadPoisson){
this.loadPoisson = loadPoisson;
}

/**
* Change the weight of the node according to the current Poisson.
*/
public void setBrokerHashMap() {
HashMap<Integer, Double> poissonMap = loadPoisson.setAllPoisson();

for (Map.Entry<Integer, Double> entry:poissonMap.entrySet()){
if(!brokerHashMap.containsKey(entry.getKey())){
brokerHashMap.put(entry.getKey(), new int[]{(int)((1-entry.getValue())*20), 0});
}else {
brokerHashMap.put(entry.getKey(), new int[]{(int)((1-entry.getValue())*20), brokerHashMap.get(entry.getKey())[1]});
}
}
}

public int getAllWeight() {
int allWeight = 0;
for (Map.Entry<Integer, int[]> entry:brokerHashMap.entrySet()) {
allWeight += entry.getValue()[0];
}
return allWeight;
}

public HashMap<Integer, int[]> getBrokerHashMap() {
return brokerHashMap;
}

public void setCurrentBrokerHashMap(HashMap<Integer, int[]> currentBrokerHashMap) {
brokerHashMap = currentBrokerHashMap;
}

//Only for test
public void setBrokerHashMapValue(Integer x, int y) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package-private?

brokerHashMap.put(x,new int[]{0,y});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.astraea.partitioner.nodeLoadMetric;

import java.util.HashMap;
import java.util.Map;

public class LoadPoisson {

public HashMap<Integer, Double> setAllPoisson(){
HashMap<Integer, Double> poissonMap = new HashMap<>();
NodeLoadClient nodeLoadClient = new NodeLoadClient();
int lambda = nodeLoadClient.getAvgLoadCount();
for (Map.Entry<Integer,Integer> entry :nodeLoadClient.getOverLoadCount().entrySet()){
int x = nodeLoadClient.getBinOneCount(entry.getKey());
poissonMap.put(entry.getKey(), doPoisson(lambda, x));
}
return poissonMap;
}

public double doPoisson(int lambda, int x) {
double Probability = 0;
double ans = 0;

for(int i = 0; i <= x; i++) {
double j = Math.pow(lambda, i);
double e = Math.exp(-lambda);
long h = factorial(i);
Probability = (j * e) / h;
ans += Probability;
}

return ans;
}

public long factorial(long number) {
if (number <= 1)
return 1;
else
return number * factorial(number - 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.astraea.partitioner.nodeLoadMetric;

import java.util.*;

public class NodeLoadClient {

/**
*This value records the number of times each node has been overloaded within ten seconds.
*/
private static HashMap<Integer, Integer> overLoadCount = new HashMap<Integer, Integer>();

public static void setOverLoadCount() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感覺這個是要共享在所有partitioner? 如果是的話,那為何不讓它thread-safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是這樣,現在就讓他thread-safe

Timer timer = new Timer();
timer.schedule(new TimerTask() {
public OverLoadNode overLoadNode = new OverLoadNode();

public void run() {
overLoadNode.monitorOverLoad(overLoadCount);
}
}, 1 , 1000);
}

Copy link
Collaborator Author

@wycccccc wycccccc Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我添加了一個判定,一旦timeout被判定為true,也就是開始break的時候,如果此時恰好有新的getNodeLoadInstance進來那我就讓他等一秒。這似乎就能避免上述問題了。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個方法並不穩定喔 你無法確認那一秒等的夠不夠久

簡單暴力的方式是在有一個sync static負責關閉該物件


public HashMap<Integer, Integer> getOverLoadCount() {
return this.overLoadCount;
}

public int getAvgLoadCount() {
double avgLoadCount = 0;
for (Map.Entry<Integer,Integer> entry : overLoadCount.entrySet()) {
avgLoadCount += getBinOneCount(entry.getValue());
}
return (int)avgLoadCount/overLoadCount.size();
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我在這裡這樣寫算是有處理掉,上面所說的釋放資源這一問題上嘛。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這樣並沒有正確處理,因為在關閉的過程一樣沒有thread-safe


/**
* Get the number of times a node is overloaded.
*/
public int getBinOneCount(int n){
int index = 0;
int count = 0;
while(n > 0){
int x = n &1<<index;
if(x != 0){
count++;
n = n-(1<<index);
}
index++;
}
return count;
}

//TODO
private Collection<Integer> getNodeID() {
return null;
}
//TODO
private Integer getNodeOverLoadCount(Integer nodeID){
return 0;
}
//TODO
private static int[] getNodesID() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.astraea.partitioner.nodeLoadMetric;

import java.util.HashMap;
import java.util.Map;

public class OverLoadNode {
private double standardDeviation = 0;
private double avgBrokersMsgPerSec = 0;
private int[] nodesID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可否用collection取代array?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

當然可以,在下個commit一併更改

private int nodeNum;
private int mountCount;
private HashMap<Integer, Double> eachBrokerMsgPerSec = new HashMap();

OverLoadNode() {
this.nodesID = getNodesID();
this.nodeNum = getNodesID().length;
}

/**
* Monitor and update the number of overloads of each node.
*/
public void monitorOverLoad(HashMap<Integer, Integer> overLoadCount) {
int ifOverLoad = 0;
setBrokersMsgPerSec();
setAvgBrokersMsgPerSec();
standardDeviationImperative();
for (Map.Entry<Integer,Double> entry : eachBrokerMsgPerSec.entrySet()){
if (entry.getValue() > (avgBrokersMsgPerSec + standardDeviation)) {
ifOverLoad = 1;
}
overLoadCount.put(entry.getKey(),setOverLoadCount(overLoadCount.get(entry.getKey()), mountCount%10, ifOverLoad));
}
this.mountCount = mountCount++;
}

/**
*Use bit operations to record whether the node exceeds the load per second,the position of the number represents the recorded time.
*/
public int setOverLoadCount(int overLoadCount, int roundCount,int ifOverLoad){
int x = overLoadCount&1<<roundCount;
if(x == ifOverLoad<<roundCount){
return overLoadCount;
}else {
if (ifOverLoad!=0){
return overLoadCount | 1<<roundCount;
}
else {
return overLoadCount - (int)Math.pow(2, roundCount);
}
}
}

public void setBrokersMsgPerSec() {
for (int nodeID : nodesID){
eachBrokerMsgPerSec.put(nodeID, getEachBrokerMsgPerSec(nodeID));
}
}

public void setAvgBrokersMsgPerSec() {
double avg = 0;
for (Map.Entry<Integer,Double> entry : eachBrokerMsgPerSec.entrySet()) {
avg += entry.getValue();
}
this.avgBrokersMsgPerSec = avg/nodeNum;
}

public void standardDeviationImperative() {
double variance = 0;
for (Map.Entry<Integer,Double> entry : eachBrokerMsgPerSec.entrySet()) {
variance += (entry.getValue() - avgBrokersMsgPerSec) * (entry.getValue() - avgBrokersMsgPerSec);
}

this.standardDeviation = Math.sqrt(variance / nodeNum);
}

//Only for test
public void setEachBrokerMsgPerSec(HashMap<Integer, Double> hashMap) {
this.eachBrokerMsgPerSec = hashMap;
}

//Only for test
public double getStandardDeviation() {
return this.standardDeviation;
}

//Only for test
public double getAvgBrokersMsgPerSec() {
return this.avgBrokersMsgPerSec;
}

//Only for test
public void setMountCount(int i) {
this.mountCount = i;
}

//TODO
private double getEachBrokerMsgPerSec(int nodeID){
return 0;
}

//TODO
private int[] getNodesID() {
return new int[] {0, 1, 2, 3};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.astraea.partitioner.nodeLoadMetric;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import static org.astraea.partitioner.nodeLoadMetric.NodeLoadClient.setOverLoadCount;

public class SmoothPartitioner implements Partitioner {

/**
* Implement Smooth Weight Round Robin.
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

//I think I need to find a suitable place to put it.
setOverLoadCount();

LoadPoisson loadPoisson = new LoadPoisson();
BrokersWeight brokersWeight = new BrokersWeight(loadPoisson);
brokersWeight.setBrokerHashMap();
Map.Entry<Integer, int[]> maxWeightServer = null;

int allWeight = brokersWeight.getAllWeight();
HashMap<Integer, int[]> currentBrokerHashMap = brokersWeight.getBrokerHashMap();

for (Map.Entry<Integer, int[]> item : currentBrokerHashMap.entrySet()) {
Map.Entry<Integer, int[]> currentServer = item;
if (maxWeightServer == null || currentServer.getValue()[1] > maxWeightServer.getValue()[1]) {
maxWeightServer = currentServer;
}
}
assert maxWeightServer != null;
currentBrokerHashMap.put(maxWeightServer.getKey(), new int[]{maxWeightServer.getValue()[0], maxWeightServer.getValue()[1]-allWeight});
brokersWeight.setCurrentBrokerHashMap(currentBrokerHashMap);

ArrayList<Integer> partitionList = new ArrayList<>();
for (PartitionInfo partitionInfo : cluster.partitionsForNode(maxWeightServer.getKey())){
partitionList.add(partitionInfo.partition());
}
Random rand = new Random();
return partitionList.get(rand.nextInt(partitionList.size()));
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.astraea.partitioner.nodeLoadMetric;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class NodeLoadClientTest {
@Test
public void testGetBinOneCount() {
NodeLoadClient nodeLoadClient = new NodeLoadClient();
assertEquals(nodeLoadClient.getBinOneCount(7), 3);
}
}
Loading