Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement poisson partitioner #27

Merged
merged 56 commits into from
Nov 1, 2021
Merged

Conversation

wycccccc
Copy link
Collaborator

@wycccccc wycccccc commented Sep 23, 2021

基於過去全局資訊預測未來最佳部署的partitioner(phase 1,phase 2)#8

完成了poisson partitioner的基本功能
但有一個至關重要的問題我找不到太好的答案想要請教一下,我的該方法找不到很恰當的時間啟動monitorOverLoad。在我的設想中他應該伴隨一個producer整個周期。但將它放在client程式碼中顯然不是特別恰當,或許可以在partitoner中監測該方法是否有在運行,再做處理。但我不太了解這該怎麼寫。
解決這一問題後我會再更新partitoner的test文件。

關於單一節點及多節點情況區分,節點上下線,剛啟動收集數據需要一小段時間數據收集不完全導致算法的偏差等問題的優化及測試我會再逐步更新。

@wycccccc wycccccc requested a review from chia7712 September 23, 2021 15:58
@chia7712
Copy link
Contributor

@wycccccc 你不考慮先把checkstyle修好嗎XD

@chia7712
Copy link
Contributor

但有一個至關重要的問題我找不到太好的答案想要請教一下,我的該方法找不到很恰當的時間啟動monitorOverLoad

跟著partitioner建立 然後跟著partitioner關閉 這樣不好嗎?如果是有效能問題,那我們可以晚一點再處理

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果有些resource想要讓jvm上所有producer共用,那麼一個可能的方式是用java singleton的方式來設計partitioner,換言之,雖然每個producer都會建立一個partitioner,但實際上該partitioner都會將操作轉傳給一個static partitioner,然後該static partitioner要是thread-safe,如果你就可以很放心將成本很高(例如索取叢集資訊)的操作放在該static partitioner,此外,在每次關閉partitioner的時候,都要將計數器--,等到最後一個partitioner關閉時,他也必須去關閉static partitioner,然後等到下一個partitioner被建立時,又需要建立該static partitioner

上述是一種“架構”的設計,無關於你要如何填“實作”(也無關演算法),也就是我會議上跟你說要先把partitioner的“殼”先放上來,確認架構後,你才好專心的去填寫你的實作。而這件事情會發生的原因是你的演算法中會涉及一個成本很高的操作(同步整個叢集資訊),因此才會需要先完成此“架構"

public class OverLoadNode {
private double standardDeviation = 0;
private double avgBrokersMsgPerSec = 0;
private int[] nodesID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可否用collection取代array?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

當然可以,在下個commit一併更改

}

//Only for test
public void setBrokerHashMapValue(Integer x, int y) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package-private?

/**
* Record the current weight of each node according to Poisson calculation and the weight after partitioner calculation.
*/
private static HashMap<Integer, int[]> brokerHashMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static? why?

*/
private static HashMap<Integer, Integer> overLoadCount = new HashMap<Integer, Integer>();

public static void setOverLoadCount() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感覺這個是要共享在所有partitioner? 如果是的話,那為何不讓它thread-safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是這樣,現在就讓他thread-safe

@wycccccc
Copy link
Collaborator Author

主要的一個原因在於我需要存儲overload的次數。
我把overload定義為偏離集群平均負載數值過大的情況。然後設想是每秒對整個集群都進行一次判定記錄每個node的情況,持續維護更新最近十秒的數據。
如果將其monitorOverLoad放入partitioner隨着他的開啟關閉,可能無法做到持久監控。效能問題確實是我考慮的出發點,因為等待jmx獲取數據再進行計算分配這一過程每次partitioner的速度會肉眼可見的慢下來。(應該是因為要等待client從server端拿到數據)這個點我認為是必須要去解決的。

如果有些resource想要讓jvm上所有producer共用,那麼一個可能的方式是用java singleton的方式來設計partitioner,換言之,雖然每個producer都會建立一個partitioner,但實際上該partitioner都會將操作轉傳給一個static partitioner,然後該static partitioner要是thread-safe,如果你就可以很放心將成本很高(例如索取叢集資訊)的操作放在該static partitioner,此外,在每次關閉partitioner的時候,都要將計數器--,等到最後一個partitioner關閉時,他也必須去關閉static partitioner,然後等到下一個partitioner被建立時,又需要建立該static partitioner

有get到思路,我準備改寫一下NodeLoadClient.class,singleton感覺上是個不錯的解決方法,我目前想到的是用類似singleton的靜態內部類加載來處理。應該能夠解決上面我提到的這些問題。
提到的其餘問題我會在改寫的過程中一起解決。

@wycccccc
Copy link
Collaborator Author

我按照之前的思路改了一下NodeLoadClient.class,我將更新jmx數據這件事另開了thread處理。他會在第一次partitioner中被開啟,因為是另外一個thread也不會因為這次partitioner結束而結束。它由搖籃系統維持。想問一下這樣做是否合適。我接下去會去寫一下partitioner的測試,各項小功能的測試已經完成,如果partitioner測試通過就可以開始和MbeanClient整合。

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 感謝更新,在設計上我還有一些疑惑,請看下面的comments

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

public class SmoothPartitioner implements Partitioner {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

測試喔~

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

確保其他沒有大問題的時候,我就補上這個最終測試。

NodeLoadClientHolder.timeOutCount =
NodeLoadClientHolder.currentAlive ? 0 : NodeLoadClientHolder.timeOutCount;
NodeLoadClientHolder.currentAlive = false;
Thread.sleep(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use TimeUnit.SECONDS.sleep(1)

public static NodeLoadClient getNodeLoadInstance() throws InterruptedException {
if (!NodeLoadClientHolder.clientOn) {
NodeLoadClientHolder.clientOn = true;
for (int i = 0; i < 10; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10是什麼神奇數字?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我腦子打結了這個應該是節點的數量,不是10我再修正

private static class NodeLoadClientHolder {
private static OverLoadNode overLoadNode = new OverLoadNode();
private static NodeLoadClient nodeLoadClient = new NodeLoadClient();
private static Boolean clientOn = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

為啥要用object Boolean 而不是primitive type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因為我需要判斷所以用了boolean,boolean的使用是有什麼不妥的地方嗎

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java Boolean有兩種,一種是object type 一種是primitive type

通常用後者,除非有特殊需求

https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好 了解了

}

public static NodeLoadClient getNodeLoadInstance() throws InterruptedException {
if (!NodeLoadClientHolder.clientOn) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不用thread-safe?

NodeLoadClientHolder.clientOn = false;
}

public HashMap<Integer, Integer> getOverLoadCount() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這也不用thread-safe嗎?不同執行緒在改同一個物件


NodeLoadClient nodeLoadClient = null;
try {
nodeLoadClient = getNodeLoadInstance(cluster.nodes().size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所以代表每次呼叫partition都必須去拿一次NodeLoadInstance? 這個設計在同步下(一定要同步,不然多執行緒會出問題)會很糟糕。為何不加上一個檢查是目前的partitioner有沒有已經拿到NodeLoadClient? 然後在close那個方法裡去釋放目前拿到的NodeLoadClient?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

聽起來的意思好像是一個producer只會創建一次partitioner。我以為他每次發送完畢partitioner就會直接關掉。而實際上是直到producer關掉partitioner才會被關閉。看來我沒有仔細讀這段的源碼,讀完之後我就修正這個設計。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 感謝更新,有些之前的評論沒有回應,要再麻煩看一下。

@Override
public void configure(Map<String, ?> configs) {
try {
var list = Arrays.asList(((String) configs.get("jmx_servers")).split(","));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

為啥null check又不見了@@

serviceURL = new JMXServiceURL(createJmxUrl(address));
kafkaMetricClient = new KafkaMetricClient(serviceURL);
metricsValues = new HashMap();
argumentTargetMetrics = List.of(metricsName).subList(0, metricsName.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不太確定為何這邊要把陣列轉回List,為何不一開始就宣告List就好?

public class NodeMetrics {
public String JMX_URI_FORMAT = "service:jmx:rmi:///jndi/rmi://" + "%s" + "/jmxrmi";
String[] metricsName = {"BytesInPerSec", "BytesOutPerSec"};
public JMXServiceURL serviceURL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

請盡量用private + final

public String JMX_URI_FORMAT = "service:jmx:rmi:///jndi/rmi://" + "%s" + "/jmxrmi";
String[] metricsName = {"BytesInPerSec", "BytesOutPerSec"};
public JMXServiceURL serviceURL;
public KafkaMetricClient kafkaMetricClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

為何這個物件沒有關閉?


/** Store information about each node */
public class NodeMetadata {
private String nodeID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final

nodeMetadata.setOverLoadCount(
setOverLoadCount(nodeMetadata.getOverLoadCount(), mountCount % 10, ifOverLoad));
}
this.mountCount = mountCount++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這一段記得要修改

mapAddress.put(listAddress.get(1), listAddress.get(0));
}
Objects.requireNonNull(
mapAddress, "You must configure jmx_servers correctly.(JmxAddress@NodeID)");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在這裡,我沒把他弄丟。我在這裡才使用jmxaddress所以將判斷放在了這裡。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果使用者沒有塞jmx_servers的話,在前面分割字串的時候就會噴出NPE了,但那個NPE會缺少你這邊的說明

Copy link
Collaborator Author

@wycccccc wycccccc Oct 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

其他問題也已一並解決。

Copy link
Collaborator Author

@wycccccc wycccccc Oct 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果使用者沒有塞jmx_servers的話,在前面分割字串的時候就會噴出NPE了,但那個NPE會缺少你這邊的說明

解決了

props.put("jmx_servers", "127.0.0.1@0");
admin.createTopic(topicName, 10);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我分別測試過建立集群傳輸資料和建立JMX進行連接,他們分開時就很正常。可我將他們合併就會產生連接JMX的時候報錯,我嘗試了蠻多方法(from morning to night),但還是沒有進展。所以想請教一下出問題的點在哪裡。下面是問題截圖。

2021-10-25 02-14-26 的螢幕擷圖

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我分別測試過建立集群傳輸資料和建立JMX進行連接

這句我看不太懂,可以講一下步驟嗎

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

props.put("jmx_servers", "127.0.0.1@0");不太確定你是要連哪一個jmx server,如果是local broker的話,那是無法連線的,因為local broker是跑在gradle jvm裡面,沒有啟動jmx server。如果你是要連在setup建立的jmx server的話,那你要加上port

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我換了一種方法,完成了測試。它看上去有正常工作,比較均勻的分配到了node中的每個partition。

mapAddress, "You must configure jmx_servers correctly.(JmxAddress@NodeID)");
nodeLoadClient = new NodeLoadClient((mapAddress));
} catch (MalformedURLException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這些錯誤應該包成runtime exception然後拋出去,否則後續也無法執行了

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已解決


@BeforeEach
void setUp() throws IOException {
JMXServiceURL serviceURL = new JMXServiceURL("service:jmx:rmi://127.0.0.1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我們已經引入新的infra來建立local JMX server,這邊麻煩改用那個,謝謝

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 我覺得這隻PR差不多了,測試的地方再改一下,謝謝

for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record1 =
new ProducerRecord<String, String>(topicName, "shanghai", Integer.toString(i));
RecordMetadata recordMetadata = producer.send(record1).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要檢查一下recordMetadata的內容,看看是否有如預期(例如partition的位置)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊我之前有處理,因為每次跑的結果肯定是不一樣的。所以我有去監測每一個partition里是否都有存入資料。如果都有就算是符合預期(都有被分配到)。

for (var p : map.get(name).replicaInfos().keySet()) {
if (!p.topic().equals("__consumer_offsets")) {
var size = (float) map.get(name).replicaInfos().get(p).size();
Assertions.assertNotEquals(size, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊應該要用consumer把資料撈回來檢查key and value

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已完成。

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 幾個針對測試的想法,請參考看看

for (int i = 0; i < 20; i++) {
ProducerRecord<String, String> record2 =
new ProducerRecord<String, String>(topicName, "tainan-" + i, Integer.toString(i));
RecordMetadata recordMetadata2 = producer.send(record2).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

沒用到的參數

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 0; i < 20; i++) {
ProducerRecord<String, String> record2 =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

為何要叫做record2?

e.printStackTrace();
}

KafkaConsumer<String, String> consumer = new KafkaConsumer(initConConfig());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var consumer = new KafkaConsumerr<String, String>(initConConfig());

}

KafkaConsumer<String, String> consumer = new KafkaConsumer(initConConfig());
consumer.subscribe(Arrays.asList(topicName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer.subscribe(Set.of(topicName));

var count = 0;
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (Pattern.compile("^tainan").matcher(consumerRecord.key()).find()) count++;
// System.out.println("TopicName: " + consumerRecord.topic() + " Partition:" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用不到的程式碼請刪掉


KafkaConsumer<String, String> consumer = new KafkaConsumer(initConConfig());
consumer.subscribe(Arrays.asList(topicName));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(10));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個測試有點危險,你無法保證一次poll可以拉回所有資料

@wycccccc
Copy link
Collaborator Author

完成了替換producer interface and consumer interface。

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 針對測試還有一些想法,再請看一下,謝謝

// create multiples partitioners
var producers =
IntStream.range(0, 10)
.mapToObj(i -> new KafkaProducer<byte[], byte[]>(props))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個也是,請用我們自己封裝的APIs

}
}
} catch (InterruptedException | ExecutionException e) {
System.out.println("取得資訊失敗" + e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個錯誤應該是不合理的吧?如果是不合理的,在測試過程應該讓它直接失敗,而不是吃掉錯誤

Collection<Integer> brokerID = new ArrayList<>();
brokerID.add(0);

DescribeLogDirsResult result = client.describeLogDirs(brokerID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

既然已經用consumer確認資料了,就沒必要再看replica的大小了

@wycccccc
Copy link
Collaborator Author

上述問題都已解決。

@chia7712
Copy link
Contributor

chia7712 commented Nov 1, 2021

那個建制錯誤是因為我在#84 把該public API刪掉了,我發了另一隻PR (#85)把該方法加回來

@chia7712
Copy link
Contributor

chia7712 commented Nov 1, 2021

@wycccccc 可否麻煩你rebase code?

@chia7712
Copy link
Contributor

chia7712 commented Nov 1, 2021

admin.createTopic(topicName, 10);

這隻方法確定要刪掉了,因為partitions和replicas太容易混淆,要麻煩使用新的APIs

@chia7712 chia7712 merged commit f2f7db4 into opensource4you:main Nov 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants