Skip to content

Commit

Permalink
fix ScriptHistoryYarnStateRefreshJob and ScriptHistoryTimeoutJob get …
Browse files Browse the repository at this point in the history
…incorrect queue name according to CapacityScheduler or FairScheduler
  • Loading branch information
jjiey committed Jul 2, 2021
1 parent 187d35a commit 48c7f61
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 18 deletions.
8 changes: 8 additions & 0 deletions src/main/java/com/meiyou/bigwhale/common/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,12 @@ interface ErrorType {
*/
String DINGDING_ROBOT_URL = "https://oapi.dingtalk.com/robot/send?access_token=";

/**
* YARN resource manager scheduler type
*/
interface YarnResourcemanagerScheduler {

String CAPACITY_SCHEDULER = "capacityScheduler";
String FAIR_SCHEDULER = "fairScheduler";
}
}
124 changes: 124 additions & 0 deletions src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.meiyou.bigwhale.common.pojo;

import com.meiyou.bigwhale.common.Constant.YarnResourcemanagerScheduler;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

/**
* YARN resource manager scheduler info
*
* <a href="https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Scheduler_API">schedulerInfo</a>
*
* Elements of the schedulerInfo object
*
* @author yangjie
*/
@Data
public class SchedulerInfo implements Serializable {

/**
* @see YarnResourcemanagerScheduler
*/
private String type;
private Float capacity;
private Float usedCapacity;
private Float maxCapacity;
private String queueName;
private List<Queue> queues;
private Health health;

/**
* Elements of the queues object for a Parent queue
*
* Elements of the queues object for a Leaf queue - contains all the elements in parent except ‘queues’ plus the following
*/
@Data
public static class Queue {
/**
* Elements of the queues object for a Parent queue
*/
private Float capacity;
private Float usedCapacity;
private Float maxCapacity;
private Float absoluteCapacity;
private Float absoluteMaxCapacity;
private Float absoluteUsedCapacity;
private int numApplications;
private String usedResources;
private String queueName;
private String state;
private List<Queue> queues;
private Resource resourcesUsed;
/**
* Elements of the queues object for a Leaf queue
* contains all the elements in parent except ‘queues’ plus the following
*/
private String type;
private Integer numActiveApplications;
private Integer numPendingApplications;
private Integer numContainers;
private Integer allocatedContainers;
private Integer reservedContainers;
private Integer pendingContainers;
private Integer maxApplications;
private Integer maxApplicationsPerUser;
private Integer maxActiveApplications;
private Integer maxActiveApplicationsPerUser;
private Integer userLimit;
private Float userLimitFactor;
private List<User> users;
}

/**
* Elements of the user object for users
*/
@Data
public static class User {
private String username;
private Resource resourcesUsed;
private Integer numActiveApplications;
private Integer numPendingApplications;
}

/**
* Elements of the resource object for resourcesUsed in user and queues
*/
@Data
public static class Resource {
private Integer memory;
private Integer vCores;
}

/**
* Elements of the health object in schedulerInfo
*/
@Data
public static class Health {
private Long lastrun;
private List<Operation> operationsInfo;
private List<LastRunDetail> lastRunDetails;
}

/**
* Elements of the operation object in health
*/
@Data
public static class Operation {
private String operation;
private String nodeId;
private String containerId;
private String queue;
}

/**
* Elements of the lastRunDetail object in health
*/
@Data
public static class LastRunDetail {
private String operation;
private Long count;
private Resource resources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.meiyou.bigwhale.common.Constant;
import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
import com.meiyou.bigwhale.entity.Cluster;
import com.meiyou.bigwhale.entity.ScriptHistory;
import com.meiyou.bigwhale.service.ClusterService;
Expand Down Expand Up @@ -49,8 +50,15 @@ public void execute(JobExecutionContext jobExecutionContext) {
// Yarn资源不够时,客户端会长时间处于提交请求状态,平台无法中断此请求,故在此处再判断一次状态
if (scriptHistory.getClusterId() != null && scriptHistory.getState().equals(Constant.JobState.SUBMITTING)) {
Cluster cluster = clusterService.findById(scriptHistory.getClusterId());
// request Cluster Scheduler API for schedulerType
SchedulerInfo scheduler = YarnApiUtils.getYarnSchedulerInfo(cluster.getYarnUrl());
if (scheduler == null) {
// handle this next time.
continue;
}
String schedulerType = scheduler.getType();
String [] jobParams = scriptHistory.getJobParams().split(";");
HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), jobParams[0], jobParams[1], jobParams[2], 3);
HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), schedulerType, jobParams[0], jobParams[1], jobParams[2], 3);
if (httpYarnApp != null) {
retry = false;
scriptHistory.updateState(Constant.JobState.SUBMITTED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import com.meiyou.bigwhale.common.Constant;
import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
import com.meiyou.bigwhale.entity.Cluster;
import com.meiyou.bigwhale.entity.ScriptHistory;
import com.meiyou.bigwhale.service.ClusterService;
import com.meiyou.bigwhale.service.ScriptHistoryService;
import com.meiyou.bigwhale.util.YarnApiUtils;
import com.meiyou.bigwhale.util.YarnUtil;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
Expand Down Expand Up @@ -43,21 +45,24 @@ public void execute(JobExecutionContext jobExecutionContext) {
if (scriptHistories.isEmpty()) {
continue;
}
// request Cluster Scheduler API for schedulerType
SchedulerInfo scheduler = YarnApiUtils.getYarnSchedulerInfo(cluster.getYarnUrl());
if (scheduler == null) {
continue;
}
// 请求yarn web url, 获取所有应用
List<HttpYarnApp> httpYarnApps = YarnApiUtils.getActiveApps(cluster.getYarnUrl());
// 请求出错,不清理数据
if (httpYarnApps == null) {
continue;
}
String schedulerType = scheduler.getType();
httpYarnApps.removeIf(httpYarnApp -> !httpYarnApp.getName().contains(".bw_instance_") && !httpYarnApp.getName().contains(".bw_test_instance_"));
Map<String, ScriptHistory> yarnParamsToScriptHistoryMap = new HashMap<>();
scriptHistories.forEach(scriptHistory -> {
String [] jobParams = scriptHistory.getJobParams().split(";");
String user = jobParams[0];
String queue = jobParams[1];
if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
queue = "root." + queue;
}
String queue = YarnUtil.getQueueName(schedulerType, jobParams[1]);
String app = jobParams[2];
String key = user + ";" + queue + ";" + app;
yarnParamsToScriptHistoryMap.put(key, scriptHistory);
Expand All @@ -82,7 +87,7 @@ public void execute(JobExecutionContext jobExecutionContext) {
}
scriptHistories.forEach(scriptHistory -> {
if (!matchIds.contains(scriptHistory.getId())) {
updateNoMatchScriptHistory(cluster.getYarnUrl(), scriptHistory);
updateNoMatchScriptHistory(cluster.getYarnUrl(), scriptHistory, schedulerType);
}
});
}
Expand All @@ -104,9 +109,9 @@ private void updateMatchScriptHistory(HttpYarnApp httpYarnApp, ScriptHistory scr
scriptHistoryService.save(scriptHistory);
}

private void updateNoMatchScriptHistory(String yarnUrl, ScriptHistory scriptHistory) {
private void updateNoMatchScriptHistory(String yarnUrl, ScriptHistory scriptHistory, String schedulerType) {
String [] jobParams = scriptHistory.getJobParams().split(";");
HttpYarnApp httpYarnApp = YarnApiUtils.getLastNoActiveApp(yarnUrl, jobParams[0], jobParams[1], jobParams[2], 3);
HttpYarnApp httpYarnApp = YarnApiUtils.getLastNoActiveApp(yarnUrl, schedulerType, jobParams[0], jobParams[1], jobParams[2], 3);
if (httpYarnApp != null) {
if ("FINISHED".equals(httpYarnApp.getState())) {
scriptHistory.updateState(httpYarnApp.getFinalStatus());
Expand Down
42 changes: 32 additions & 10 deletions src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package com.meiyou.bigwhale.util;

import com.alibaba.fastjson.*;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.meiyou.bigwhale.common.pojo.BackpressureInfo;
import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class YarnApiUtils {

Expand Down Expand Up @@ -45,10 +55,8 @@ public static List<HttpYarnApp> getActiveApps(String yarnUrl) {
* @param retries
* @return
*/
public static HttpYarnApp getActiveApp(String yarnUrl, String user, String queue, String name, int retries) {
if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
queue = "root." + queue;
}
public static HttpYarnApp getActiveApp(String yarnUrl, String schedulerType, String user, String queue, String name, int retries) {
queue = YarnUtil.getQueueName(schedulerType, queue);
Map<String, Object> params = new HashMap<>();
params.put("user", user);
params.put("queue", queue);
Expand Down Expand Up @@ -84,10 +92,8 @@ public static HttpYarnApp getActiveApp(String yarnUrl, String user, String queue
* @param retries 重试次数
* @return
*/
public static HttpYarnApp getLastNoActiveApp(String yarnUrl, String user, String queue, String name, int retries) {
if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
queue = "root." + queue;
}
public static HttpYarnApp getLastNoActiveApp(String yarnUrl, String schedulerType, String user, String queue, String name, int retries) {
queue = YarnUtil.getQueueName(schedulerType, queue);
Map<String, Object> params = new HashMap<>();
params.put("user", user);
params.put("states", "finished,killed,failed");
Expand Down Expand Up @@ -295,10 +301,26 @@ public static boolean killApp(String yarnUrl, String appId) {
return false;
}

/**
* <a href="https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Scheduler_API">cluster scheduler_API</a>
*/
public static SchedulerInfo getYarnSchedulerInfo(String yarnUrl) {
OkHttpUtils.Result result = OkHttpUtils.doGet(getSchedulerUrl(yarnUrl), Collections.emptyMap(), HEADERS);
if (result.isSuccessful && StringUtils.isNotEmpty(result.content)) {
JSONObject jsonObject = JSON.parseObject(JSON.parseObject(result.content).getString("scheduler"));
return JSON.parseObject(jsonObject.getString("schedulerInfo"), SchedulerInfo.class);
}
return null;
}

private static String getAppsUrl(String yarnUrl) {
return appendUrl(yarnUrl) + "ws/v1/cluster/apps";
}

private static String getSchedulerUrl(String yarnUrl) {
return appendUrl(yarnUrl) + "ws/v1/cluster/scheduler";
}

private static String appendUrl(String url) {
if (!url.endsWith("/")) {
url += "/";
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/com/meiyou/bigwhale/util/YarnUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.meiyou.bigwhale.util;

import com.meiyou.bigwhale.common.Constant.YarnResourcemanagerScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author yangjie
*/
public class YarnUtil {

private YarnUtil() {
}

private static final Logger LOGGER = LoggerFactory.getLogger(YarnUtil.class);

public static String getQueueName(String yarnSchedulerType, String queue) {
if (YarnResourcemanagerScheduler.FAIR_SCHEDULER.equals(yarnSchedulerType) &&
queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
return "root." + queue;
}
return queue;
}
}

0 comments on commit 48c7f61

Please sign in to comment.