From a91c0115a45182d310e8c43a136f344d0c0bc1eb Mon Sep 17 00:00:00 2001
From: yangsanity <471419897@qq.com>
Date: Fri, 2 Jul 2021 19:42:30 +0800
Subject: [PATCH] fix ScriptHistoryYarnStateRefreshJob and
ScriptHistoryTimeoutJob: get correct queue name According to different
CapacityScheduler or FairScheduler
---
.../com/meiyou/bigwhale/common/Constant.java | 8 ++
.../bigwhale/common/pojo/SchedulerInfo.java | 124 ++++++++++++++++++
.../bigwhale/job/ScriptHistoryTimeoutJob.java | 10 +-
.../job/ScriptHistoryYarnStateRefreshJob.java | 19 ++-
.../meiyou/bigwhale/util/YarnApiUtils.java | 42 ++++--
.../com/meiyou/bigwhale/util/YarnUtil.java | 24 ++++
6 files changed, 209 insertions(+), 18 deletions(-)
create mode 100644 src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java
create mode 100644 src/main/java/com/meiyou/bigwhale/util/YarnUtil.java
diff --git a/src/main/java/com/meiyou/bigwhale/common/Constant.java b/src/main/java/com/meiyou/bigwhale/common/Constant.java
index c7046ea..180d683 100644
--- a/src/main/java/com/meiyou/bigwhale/common/Constant.java
+++ b/src/main/java/com/meiyou/bigwhale/common/Constant.java
@@ -95,4 +95,12 @@ interface ErrorType {
*/
String DINGDING_ROBOT_URL = "https://oapi.dingtalk.com/robot/send?access_token=";
+ /**
+ * YARN resource manager scheduler type
+ */
+ interface YarnResourcemanagerScheduler {
+
+ String CAPACITY_SCHEDULER = "capacityScheduler";
+ String FAIR_SCHEDULER = "fairScheduler";
+ }
}
diff --git a/src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java b/src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java
new file mode 100644
index 0000000..dcd0676
--- /dev/null
+++ b/src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java
@@ -0,0 +1,124 @@
+package com.meiyou.bigwhale.common.pojo;
+
+import com.meiyou.bigwhale.common.Constant.YarnResourcemanagerScheduler;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * YARN resource manager scheduler info
+ *
+ * schedulerInfo
+ *
+ * Elements of the schedulerInfo object
+ *
+ * @author yangjie
+ */
+@Data
+public class SchedulerInfo implements Serializable {
+
+ /**
+ * @see YarnResourcemanagerScheduler
+ */
+ private String type;
+ private Float capacity;
+ private Float usedCapacity;
+ private Float maxCapacity;
+ private String queueName;
+ private List queues;
+ private Health health;
+
+ /**
+ * Elements of the queues object for a Parent queue
+ *
+ * Elements of the queues object for a Leaf queue - contains all the elements in parent except ‘queues’ plus the following
+ */
+ @Data
+ public static class Queue {
+ /**
+ * Elements of the queues object for a Parent queue
+ */
+ private Float capacity;
+ private Float usedCapacity;
+ private Float maxCapacity;
+ private Float absoluteCapacity;
+ private Float absoluteMaxCapacity;
+ private Float absoluteUsedCapacity;
+ private Integer numApplications;
+ private String usedResources;
+ private String queueName;
+ private String state;
+ private List queues;
+ private Resource resourcesUsed;
+ /**
+ * Elements of the queues object for a Leaf queue
+ * contains all the elements in parent except ‘queues’ plus the following
+ */
+ private String type;
+ private Integer numActiveApplications;
+ private Integer numPendingApplications;
+ private Integer numContainers;
+ private Integer allocatedContainers;
+ private Integer reservedContainers;
+ private Integer pendingContainers;
+ private Integer maxApplications;
+ private Integer maxApplicationsPerUser;
+ private Integer maxActiveApplications;
+ private Integer maxActiveApplicationsPerUser;
+ private Integer userLimit;
+ private Float userLimitFactor;
+ private List users;
+ }
+
+ /**
+ * Elements of the user object for users
+ */
+ @Data
+ public static class User {
+ private String username;
+ private Resource resourcesUsed;
+ private Integer numActiveApplications;
+ private Integer numPendingApplications;
+ }
+
+ /**
+ * Elements of the resource object for resourcesUsed in user and queues
+ */
+ @Data
+ public static class Resource {
+ private Integer memory;
+ private Integer vCores;
+ }
+
+ /**
+ * Elements of the health object in schedulerInfo
+ */
+ @Data
+ public static class Health {
+ private Long lastrun;
+ private List operationsInfo;
+ private List lastRunDetails;
+ }
+
+ /**
+ * Elements of the operation object in health
+ */
+ @Data
+ public static class Operation {
+ private String operation;
+ private String nodeId;
+ private String containerId;
+ private String queue;
+ }
+
+ /**
+ * Elements of the lastRunDetail object in health
+ */
+ @Data
+ public static class LastRunDetail {
+ private String operation;
+ private Long count;
+ private Resource resources;
+ }
+}
diff --git a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java b/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java
index fae9ad0..974e0ad 100644
--- a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java
+++ b/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java
@@ -2,6 +2,7 @@
import com.meiyou.bigwhale.common.Constant;
import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
+import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
import com.meiyou.bigwhale.entity.Cluster;
import com.meiyou.bigwhale.entity.ScriptHistory;
import com.meiyou.bigwhale.service.ClusterService;
@@ -49,8 +50,15 @@ public void execute(JobExecutionContext jobExecutionContext) {
// Yarn资源不够时,客户端会长时间处于提交请求状态,平台无法中断此请求,故在此处再判断一次状态
if (scriptHistory.getClusterId() != null && scriptHistory.getState().equals(Constant.JobState.SUBMITTING)) {
Cluster cluster = clusterService.findById(scriptHistory.getClusterId());
+ // request Cluster Scheduler API for schedulerType
+ SchedulerInfo scheduler = YarnApiUtils.getYarnSchedulerInfo(cluster.getYarnUrl());
+ if (scheduler == null) {
+ // handle this next time.
+ continue;
+ }
+ String schedulerType = scheduler.getType();
String [] jobParams = scriptHistory.getJobParams().split(";");
- HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), jobParams[0], jobParams[1], jobParams[2], 3);
+ HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), schedulerType, jobParams[0], jobParams[1], jobParams[2], 3);
if (httpYarnApp != null) {
retry = false;
scriptHistory.updateState(Constant.JobState.SUBMITTED);
diff --git a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java b/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java
index 4cadfa0..50f4eb5 100644
--- a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java
+++ b/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java
@@ -2,11 +2,13 @@
import com.meiyou.bigwhale.common.Constant;
import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
+import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
import com.meiyou.bigwhale.entity.Cluster;
import com.meiyou.bigwhale.entity.ScriptHistory;
import com.meiyou.bigwhale.service.ClusterService;
import com.meiyou.bigwhale.service.ScriptHistoryService;
import com.meiyou.bigwhale.util.YarnApiUtils;
+import com.meiyou.bigwhale.util.YarnUtil;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
@@ -43,21 +45,24 @@ public void execute(JobExecutionContext jobExecutionContext) {
if (scriptHistories.isEmpty()) {
continue;
}
+ // request Cluster Scheduler API for schedulerType
+ SchedulerInfo scheduler = YarnApiUtils.getYarnSchedulerInfo(cluster.getYarnUrl());
+ if (scheduler == null) {
+ continue;
+ }
// 请求yarn web url, 获取所有应用
List httpYarnApps = YarnApiUtils.getActiveApps(cluster.getYarnUrl());
// 请求出错,不清理数据
if (httpYarnApps == null) {
continue;
}
+ String schedulerType = scheduler.getType();
httpYarnApps.removeIf(httpYarnApp -> !httpYarnApp.getName().contains(".bw_instance_") && !httpYarnApp.getName().contains(".bw_test_instance_"));
Map yarnParamsToScriptHistoryMap = new HashMap<>();
scriptHistories.forEach(scriptHistory -> {
String [] jobParams = scriptHistory.getJobParams().split(";");
String user = jobParams[0];
- String queue = jobParams[1];
- if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
- queue = "root." + queue;
- }
+ String queue = YarnUtil.getQueueName(schedulerType, jobParams[1]);
String app = jobParams[2];
String key = user + ";" + queue + ";" + app;
yarnParamsToScriptHistoryMap.put(key, scriptHistory);
@@ -82,7 +87,7 @@ public void execute(JobExecutionContext jobExecutionContext) {
}
scriptHistories.forEach(scriptHistory -> {
if (!matchIds.contains(scriptHistory.getId())) {
- updateNoMatchScriptHistory(cluster.getYarnUrl(), scriptHistory);
+ updateNoMatchScriptHistory(cluster.getYarnUrl(), scriptHistory, schedulerType);
}
});
}
@@ -104,9 +109,9 @@ private void updateMatchScriptHistory(HttpYarnApp httpYarnApp, ScriptHistory scr
scriptHistoryService.save(scriptHistory);
}
- private void updateNoMatchScriptHistory(String yarnUrl, ScriptHistory scriptHistory) {
+ private void updateNoMatchScriptHistory(String yarnUrl, ScriptHistory scriptHistory, String schedulerType) {
String [] jobParams = scriptHistory.getJobParams().split(";");
- HttpYarnApp httpYarnApp = YarnApiUtils.getLastNoActiveApp(yarnUrl, jobParams[0], jobParams[1], jobParams[2], 3);
+ HttpYarnApp httpYarnApp = YarnApiUtils.getLastNoActiveApp(yarnUrl, schedulerType, jobParams[0], jobParams[1], jobParams[2], 3);
if (httpYarnApp != null) {
if ("FINISHED".equals(httpYarnApp.getState())) {
scriptHistory.updateState(httpYarnApp.getFinalStatus());
diff --git a/src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java b/src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java
index 7a3a867..d8d3887 100644
--- a/src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java
+++ b/src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java
@@ -1,13 +1,23 @@
package com.meiyou.bigwhale.util;
-import com.alibaba.fastjson.*;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONException;
+import com.alibaba.fastjson.JSONObject;
import com.meiyou.bigwhale.common.pojo.BackpressureInfo;
import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
+import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class YarnApiUtils {
@@ -45,10 +55,8 @@ public static List getActiveApps(String yarnUrl) {
* @param retries
* @return
*/
- public static HttpYarnApp getActiveApp(String yarnUrl, String user, String queue, String name, int retries) {
- if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
- queue = "root." + queue;
- }
+ public static HttpYarnApp getActiveApp(String yarnUrl, String schedulerType, String user, String queue, String name, int retries) {
+ queue = YarnUtil.getQueueName(schedulerType, queue);
Map params = new HashMap<>();
params.put("user", user);
params.put("queue", queue);
@@ -84,10 +92,8 @@ public static HttpYarnApp getActiveApp(String yarnUrl, String user, String queue
* @param retries 重试次数
* @return
*/
- public static HttpYarnApp getLastNoActiveApp(String yarnUrl, String user, String queue, String name, int retries) {
- if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
- queue = "root." + queue;
- }
+ public static HttpYarnApp getLastNoActiveApp(String yarnUrl, String schedulerType, String user, String queue, String name, int retries) {
+ queue = YarnUtil.getQueueName(schedulerType, queue);
Map params = new HashMap<>();
params.put("user", user);
params.put("states", "finished,killed,failed");
@@ -295,10 +301,26 @@ public static boolean killApp(String yarnUrl, String appId) {
return false;
}
+ /**
+ * cluster scheduler API
+ */
+ public static SchedulerInfo getYarnSchedulerInfo(String yarnUrl) {
+ OkHttpUtils.Result result = OkHttpUtils.doGet(getSchedulerUrl(yarnUrl), Collections.emptyMap(), HEADERS);
+ if (result.isSuccessful && StringUtils.isNotEmpty(result.content)) {
+ JSONObject jsonObject = JSON.parseObject(JSON.parseObject(result.content).getString("scheduler"));
+ return JSON.parseObject(jsonObject.getString("schedulerInfo"), SchedulerInfo.class);
+ }
+ return null;
+ }
+
private static String getAppsUrl(String yarnUrl) {
return appendUrl(yarnUrl) + "ws/v1/cluster/apps";
}
+ private static String getSchedulerUrl(String yarnUrl) {
+ return appendUrl(yarnUrl) + "ws/v1/cluster/scheduler";
+ }
+
private static String appendUrl(String url) {
if (!url.endsWith("/")) {
url += "/";
diff --git a/src/main/java/com/meiyou/bigwhale/util/YarnUtil.java b/src/main/java/com/meiyou/bigwhale/util/YarnUtil.java
new file mode 100644
index 0000000..c7f56f0
--- /dev/null
+++ b/src/main/java/com/meiyou/bigwhale/util/YarnUtil.java
@@ -0,0 +1,24 @@
+package com.meiyou.bigwhale.util;
+
+import com.meiyou.bigwhale.common.Constant.YarnResourcemanagerScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangjie
+ */
+public class YarnUtil {
+
+ private YarnUtil() {
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(YarnUtil.class);
+
+ public static String getQueueName(String yarnSchedulerType, String queue) {
+ if (YarnResourcemanagerScheduler.FAIR_SCHEDULER.equals(yarnSchedulerType) &&
+ queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
+ return "root." + queue;
+ }
+ return queue;
+ }
+}