Skip to content

Commit

Permalink
Merge pull request #63 from MeetYouDevs/1.3
Browse files Browse the repository at this point in the history
1. 修复YARN任务状态更新可能出现错误的BUG
  • Loading branch information
progr1mmer authored Jun 16, 2021
2 parents b17076b + 652e0eb commit 187d35a
Show file tree
Hide file tree
Showing 34 changed files with 670 additions and 894 deletions.
23 changes: 23 additions & 0 deletions script/big-whale.sql
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,29 @@ CREATE TABLE IF NOT EXISTS `yarn_app` (
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;


ALTER TABLE `script`
CHANGE COLUMN `monitor_id` `monitor_id` INT(11) NULL DEFAULT NULL AFTER `type`,
CHANGE COLUMN `monitor_enabled` `monitor_enabled` BIT(1) NULL DEFAULT NULL AFTER `monitor_id`,
CHANGE COLUMN `cluster_id` `cluster_id` INT(11) NULL DEFAULT NULL AFTER `schedule_top_node_id`;


ALTER TABLE `script_history`
CHANGE COLUMN `monitor_id` `monitor_id` INT(11) NULL DEFAULT NULL AFTER `id`,
CHANGE COLUMN `schedule_retry_num` `schedule_failure_handle` VARCHAR(255) NULL DEFAULT NULL AFTER `schedule_instance_id`,
CHANGE COLUMN `schedule_history_mode` `schedule_supplement` BIT NULL DEFAULT NULL COLLATE 'utf8_general_ci' AFTER `schedule_failure_handle`,
CHANGE COLUMN `schedule_history_time` `schedule_operate_time` DATETIME NULL DEFAULT NULL AFTER `schedule_supplement`,
ADD COLUMN `previous_schedule_top_node_id` VARCHAR(255) NULL DEFAULT NULL AFTER `schedule_operate_time`,
ADD COLUMN `script_name` VARCHAR(255) NOT NULL AFTER `script_id`,
CHANGE COLUMN `cluster_id` `cluster_id` INT(11) NULL DEFAULT NULL AFTER `script_type`,
CHANGE COLUMN `state` `state` VARCHAR(255) NOT NULL COLLATE 'utf8_general_ci' AFTER `content`,
CHANGE COLUMN `steps` `steps` VARCHAR(255) NOT NULL COLLATE 'utf8_general_ci' AFTER `state`,
ADD COLUMN `job_params` VARCHAR(255) NULL DEFAULT NULL AFTER `finish_time`,
DROP COLUMN `schedule_snapshot_id`;


DROP TABLE `schedule_snapshot`;

/*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */;
/*!40014 SET FOREIGN_KEY_CHECKS=IF(@OLD_FOREIGN_KEY_CHECKS IS NULL, 1, @OLD_FOREIGN_KEY_CHECKS) */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
13 changes: 0 additions & 13 deletions src/main/java/com/meiyou/bigwhale/common/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ interface ScriptType {
String PYTHON = "python";
String SPARK_BATCH = "sparkbatch";
String FLINK_BATCH = "flinkbatch";

// --- 监控 ---
String SPARK_STREAM = "sparkstream";
String FLINK_STREAM = "flinkstream";
}
Expand Down Expand Up @@ -42,8 +40,6 @@ interface JobState {
String KILLED = "KILLED";
String FAILED = "FAILED";
String TIMEOUT = "TIMEOUT";
String SUBMITTING_TIMEOUT = "SUBMITTING_TIMEOUT";
String SUBMITTING_FAILED = "SUBMITTING_FAILED";
/**
* 调度扩展执行状态
*/
Expand All @@ -52,15 +48,6 @@ interface JobState {
String PARENT_FAILED_ = "PARENT_FAILED";
}

/**
* 任务调度历史实例处理模式
*/
interface HistoryMode {
String RETRY = "retry";
String RERUN = "rerun";
String SUPPLEMENT = "supplement";
}

/**
* 任务告警
*/
Expand Down
155 changes: 76 additions & 79 deletions src/main/java/com/meiyou/bigwhale/controller/ScheduleController.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@
import com.meiyou.bigwhale.dto.DtoSchedule;
import com.meiyou.bigwhale.entity.Script;
import com.meiyou.bigwhale.entity.Schedule;
import com.meiyou.bigwhale.entity.ScheduleSnapshot;
import com.meiyou.bigwhale.entity.ScriptHistory;
import com.meiyou.bigwhale.job.ScheduleJob;
import com.meiyou.bigwhale.job.ScriptHistoryShellRunnerJob;
import com.meiyou.bigwhale.service.ScheduleService;
import com.meiyou.bigwhale.security.LoginUser;
import com.meiyou.bigwhale.service.ScriptHistoryService;
import com.meiyou.bigwhale.service.ScriptService;
import com.meiyou.bigwhale.service.ScheduleSnapshotService;
import com.meiyou.bigwhale.util.SchedulerUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
Expand All @@ -40,8 +38,6 @@ public class ScheduleController extends BaseController {
@Autowired
private ScheduleService scheduleService;
@Autowired
private ScheduleSnapshotService scheduleSnapshotService;
@Autowired
private ScriptService scriptService;
@Autowired
private ScriptHistoryService scriptHistoryService;
Expand Down Expand Up @@ -122,9 +118,8 @@ public Msg one(@RequestParam Integer id) {
Collections.addAll(dingdingHooks, schedule.getDingdingHooks().split(","));
dtoSchedule.setDingdingHooks(dingdingHooks);
}
ScheduleSnapshot scheduleSnapshot = scheduleSnapshotService.findByScheduleIdAndSnapshotTime(schedule.getId(), new Date());
List<DtoScript> dtoScripts = new ArrayList<>();
generateScripts(scheduleSnapshot, dtoScripts, null);
generateScripts(schedule, null, dtoScripts);
dtoSchedule.setScripts(dtoScripts);
return success(dtoSchedule);
}
Expand Down Expand Up @@ -155,7 +150,7 @@ public Msg save(@RequestBody DtoSchedule req) {
Set<String> keys = new HashSet<>();
for (DtoScript dtoScript : req.getScripts()) {
if (dtoScript.isYarn()) {
String key = dtoScript.getClusterId() + "$" + dtoScript.getUser() + "$" + dtoScript.getQueue() + "$" + dtoScript.getApp();
String key = dtoScript.getClusterId() + ";" + dtoScript.getUser() + ";" + dtoScript.getQueue() + ";" + dtoScript.getApp();
if (keys.contains(key)) {
return failed("脚本【" + dtoScript.getName() + "】YARN应用重复");
}
Expand Down Expand Up @@ -221,33 +216,21 @@ public Msg instance(@RequestBody DtoSchedule req) {
return success(instance);
}

@RequestMapping(value = "/treeview.api", method = RequestMethod.GET)
public Msg treeView(@RequestParam Integer id,
@RequestParam String instance) {
Integer scheduleSnapshotId = scriptHistoryService.findOneByQuery("scheduleId=" + id + ";scheduleInstanceId=" + instance,
new Sort(Sort.Direction.DESC, "id")).getScheduleSnapshotId();
ScheduleSnapshot scheduleSnapshot = scheduleSnapshotService.findById(scheduleSnapshotId);
Map<String, Object> nodeTree = new HashMap<>();
generateNodeTree(scheduleSnapshot, nodeTree, null, instance);
return success(Collections.singletonList(nodeTree));
}

@RequestMapping(value = "/run.api", method = RequestMethod.POST)
public Msg run(@RequestBody DtoSchedule req) {
Schedule schedule = scheduleService.findById(req.getId());
if (schedule == null) {
return failed();
}
Date now = new Date();
ScheduleSnapshot scheduleSnapshot = scheduleSnapshotService.findByScheduleIdAndSnapshotTime(req.getId(), now);
String instanceId = new SimpleDateFormat("yyyyMMddHHmmss").format(now);
generateScriptHistories(scheduleSnapshot, null, instanceId, now, now, false);
String scheduleInstanceId = new SimpleDateFormat("yyyyMMddHHmmss").format(now);
generateScriptHistories(schedule, scheduleInstanceId, null, now, now, false);
schedule.setRealFireTime(now);
scheduleService.save(schedule);
DtoSchedule dtoSchedule = new DtoSchedule();
BeanUtils.copyProperties(schedule, dtoSchedule);
Map<String, Object> result = new HashMap<>();
result.put("instanceId", instanceId);
result.put("instanceId", scheduleInstanceId);
result.put("obj", dtoSchedule);
return success(result);
}
Expand All @@ -264,103 +247,116 @@ public Msg supplement(@RequestBody JSONObject params) throws ParseException {
Date startTime = dateFormat1.parse(params.getString("start"));
Date endTime = dateFormat1.parse(params.getString("end"));
Date now = new Date();
ScheduleSnapshot scheduleSnapshot = scheduleSnapshotService.findByScheduleIdAndSnapshotTime(id, now);
Date needFireTime = ScheduleJob.getNeedFireTime(scheduleSnapshot.generateCron(), startTime);
Date needFireTime = ScheduleJob.getNeedFireTime(schedule.generateCron(), startTime);
// 获取第一个触发时间
while (needFireTime.compareTo(startTime) < 0) {
needFireTime = ScheduleJob.getNextFireTime(scheduleSnapshot.generateCron(), needFireTime);
needFireTime = ScheduleJob.getNextFireTime(schedule.generateCron(), needFireTime);
}
if (needFireTime.compareTo(endTime) > 0) {
return failed("时间范围有误");
}
do {
generateScriptHistories(scheduleSnapshot, null, dateFormat.format(needFireTime), needFireTime, now, true);
needFireTime = ScheduleJob.getNextFireTime(scheduleSnapshot.generateCron(), needFireTime);
generateScriptHistories(schedule, dateFormat.format(needFireTime), null, needFireTime, now, true);
needFireTime = ScheduleJob.getNextFireTime(schedule.generateCron(), needFireTime);
} while (needFireTime.compareTo(endTime) < 0);
return success();
}

private void generateScripts(ScheduleSnapshot scheduleSnapshot, List<DtoScript> dtoScripts, String currentNodeId) {
Map<String, ScheduleSnapshot.Topology.Node> nodeIdToData = scheduleSnapshot.analyzeNextNode(currentNodeId);
for (Map.Entry<String, ScheduleSnapshot.Topology.Node> entry : nodeIdToData.entrySet()) {
String nodeId = entry.getKey();
Script script = scriptService.findOneByQuery("scheduleId=" + scheduleSnapshot.getScheduleId() + ";scheduleTopNodeId=" + nodeId);
@RequestMapping(value = "/treeview.api", method = RequestMethod.GET)
public Msg treeView(@RequestParam Integer id,
@RequestParam String instance) {
Schedule schedule = scheduleService.findById(id);
if (schedule == null) {
return failed();
}
Map<String, Object> nodeTree = new HashMap<>();
List<ScriptHistory> scriptHistories = scriptHistoryService.findByQuery(
"scheduleId=" + schedule.getId() +
";scheduleInstanceId=" + instance,
Sort.by(Sort.Direction.ASC, "id"));
generateNodeTree(null, scriptHistories, nodeTree);
return success(Collections.singletonList(nodeTree));
}

private void generateScripts(Schedule schedule, String previousScheduleTopNodeId, List<DtoScript> dtoScripts) {
Map<String, Schedule.Topology.Node> nodeIdToData = schedule.analyzeNextNode(previousScheduleTopNodeId);
for (String nodeId : nodeIdToData.keySet()) {
Script script = scriptService.findOneByQuery("scheduleId=" + schedule.getId() + ";scheduleTopNodeId=" + nodeId);
DtoScript dtoScript = new DtoScript();
BeanUtils.copyProperties(script, dtoScript);
dtoScripts.add(dtoScript);
generateScripts(scheduleSnapshot, dtoScripts, nodeId);
generateScripts(schedule, nodeId, dtoScripts);
}
}

@SuppressWarnings("unchecked")
private void generateNodeTree(ScheduleSnapshot scheduleSnapshot, Map<String, Object> nodeTree, String currentNodeId, String instanceId) {
Map<String, ScheduleSnapshot.Topology.Node> nodeIdToData = scheduleSnapshot.analyzeNextNode(currentNodeId);
for (Map.Entry<String, ScheduleSnapshot.Topology.Node> entry : nodeIdToData.entrySet()) {
String nodeId = entry.getKey();
Script script = scriptService.findOneByQuery("scheduleId=" + scheduleSnapshot.getScheduleId() + ";scheduleTopNodeId=" + nodeId);
String stateTag = null;
if (instanceId != null) {
stateTag = StringUtils.join(getStateTag(scheduleSnapshot, nodeId, instanceId), "");
private void generateScriptHistories(Schedule schedule, String scheduleInstanceId, String previousScheduleTopNodeId, Date instanceTime, Date now, boolean supplement) {
Map<String, Schedule.Topology.Node> nodeIdToData = schedule.analyzeNextNode(previousScheduleTopNodeId);
for (String nodeId : nodeIdToData.keySet()) {
Script script = scriptService.findOneByQuery("scheduleId=" + schedule.getId() + ";scheduleTopNodeId=" + nodeId);
ScriptHistory scriptHistory = scriptService.generateHistory(script, schedule, scheduleInstanceId, previousScheduleTopNodeId, supplement ? 2 : 0);
scriptHistory.updateState(Constant.JobState.WAITING_PARENT_);
if (supplement) {
scriptHistory.updateState(Constant.JobState.INITED);
}
scriptHistory = scriptHistoryService.save(scriptHistory);
if (supplement) {
if (instanceTime.compareTo(now) <= 0) {
ScriptHistoryShellRunnerJob.build(scriptHistory);
} else {
ScriptHistoryShellRunnerJob.build(scriptHistory, instanceTime);
}
}
if (currentNodeId == null) {
generateScriptHistories(schedule, scheduleInstanceId, nodeId, instanceTime, now, supplement);
}
}

private void generateNodeTree(String previousScheduleTopNodeId, List<ScriptHistory> scriptHistories, Map<String, Object> nodeTree) {
Map<String, List<ScriptHistory>> topScriptHistories = new HashMap<>();
for (ScriptHistory scriptHistory : scriptHistories) {
if (Objects.equals(scriptHistory.getPreviousScheduleTopNodeId(), previousScheduleTopNodeId)) {
List<ScriptHistory> topScriptHistory = topScriptHistories.computeIfAbsent(scriptHistory.getScheduleTopNodeId(), k -> new ArrayList<>());
topScriptHistory.add(scriptHistory);
}
}
for (List<ScriptHistory> histories : topScriptHistories.values()) {
String stateTag = StringUtils.join(getStateTag(histories), "");
ScriptHistory history = histories.get(0);
if (previousScheduleTopNodeId == null) {
if (stateTag != null) {
nodeTree.put("text", script.getName() + stateTag);
nodeTree.put("text", history.getScriptName() + stateTag);
nodeTree.put("rerunEnabled_", !stateTag.contains("label-default"));
} else {
nodeTree.put("text", script.getName());
nodeTree.put("text", history.getScriptName());
}
nodeTree.put("nodeId_", nodeId);
nodeTree.put("snapshotId_", scheduleSnapshot.getId());
nodeTree.put("icon", "iconfont " + scriptIconClass.get(script.getType()));
nodeTree.put("nodeId_", history.getScheduleTopNodeId());
nodeTree.put("scheduleId_", history.getScheduleId());
nodeTree.put("icon", "iconfont " + scriptIconClass.get(history.getScriptType()));
nodeTree.put("state", Collections.singletonMap("expanded", true));
generateNodeTree(scheduleSnapshot, nodeTree, nodeId, instanceId);
generateNodeTree(history.getScheduleTopNodeId(), scriptHistories, nodeTree);
} else {
Map<String, Object> childNode = new HashMap<>();
if (stateTag != null) {
childNode.put("text", script.getName() + stateTag);
childNode.put("text", history.getScriptName() + stateTag);
childNode.put("rerunEnabled_", !stateTag.contains("label-default"));
} else {
childNode.put("text", script.getName());
childNode.put("text", history.getScriptName());
}
childNode.put("nodeId_", nodeId);
childNode.put("snapshotId_", scheduleSnapshot.getId());
childNode.put("icon", "iconfont " + scriptIconClass.get(script.getType()));
childNode.put("nodeId_", history.getScheduleTopNodeId());
childNode.put("scheduleId_", history.getScheduleId());
childNode.put("icon", "iconfont " + scriptIconClass.get(history.getScriptType()));
childNode.put("state", Collections.singletonMap("expanded", true));
List<Map<String, Object>> childNodes = (List<Map<String, Object>>)nodeTree.get("nodes");
if (childNodes == null) {
childNodes = new ArrayList<>();
nodeTree.put("nodes", childNodes);
}
childNodes.add(childNode);
generateNodeTree(scheduleSnapshot, childNode, nodeId, instanceId);
generateNodeTree(history.getScheduleTopNodeId(), scriptHistories, childNode);
}
}
}

private void generateScriptHistories(ScheduleSnapshot scheduleSnapshot, String currentNodeId, String instanceId, Date instanceTime, Date now, boolean supplement) {
Map<String, ScheduleSnapshot.Topology.Node> nodeIdToData = scheduleSnapshot.analyzeNextNode(currentNodeId);
for (Map.Entry<String, ScheduleSnapshot.Topology.Node> entry : nodeIdToData.entrySet()) {
String nodeId = entry.getKey();
Script script = scriptService.findOneByQuery("scheduleId=" + scheduleSnapshot.getScheduleId() + ";scheduleTopNodeId=" + nodeId);
ScriptHistory scriptHistory = scriptService.generateHistory(script, scheduleSnapshot, instanceId, supplement ? 2 : 0);
scriptHistory.updateState(Constant.JobState.WAITING_PARENT_);
scriptHistory.updateState(Constant.JobState.INITED);
scriptHistory = scriptHistoryService.save(scriptHistory);
if (instanceTime.compareTo(now) <= 0) {
ScriptHistoryShellRunnerJob.build(scriptHistory);
} else {
ScriptHistoryShellRunnerJob.build(scriptHistory, instanceTime);
}
generateScriptHistories(scheduleSnapshot, nodeId, instanceId, instanceTime, now, supplement);
}
}

private List<String> getStateTag(ScheduleSnapshot scheduleSnapshot, String nodeId, String instanceId) {
List<ScriptHistory> scriptHistories = scriptHistoryService.findByQuery(
"scheduleId=" + scheduleSnapshot.getScheduleId() +
";scheduleTopNodeId=" + nodeId +
";scheduleInstanceId=" + instanceId,
Sort.by(Sort.Direction.ASC, "id"));
private List<String> getStateTag(List<ScriptHistory> scriptHistories) {
return scriptHistories.stream().map(scriptHistory -> {
if (Constant.JobState.UN_CONFIRMED_.equals(scriptHistory.getState())) {
return "<span class=\"cube label-default\"></span>";
Expand All @@ -377,4 +373,5 @@ private List<String> getStateTag(ScheduleSnapshot scheduleSnapshot, String nodeI
return "<span class=\"cube label-danger\"></span>";
}).collect(Collectors.toList());
}

}
Loading

0 comments on commit 187d35a

Please sign in to comment.