Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Fix condition task will cause workflow instance failed #16152

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;

import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -192,6 +196,9 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private DependentParameters dependency;

@TableField(exist = false)
private ConditionsParameters conditionsParameters;

/**
* switch dependency
*/
Expand Down Expand Up @@ -318,6 +325,43 @@ public void setDependency(DependentParameters dependency) {
this.dependency = dependency;
}

public ConditionsParameters getConditionsParameters() {
if (this.conditionsParameters == null) {
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
this.conditionsParameters =
JSONUtils.parseObject((String) taskParamsMap.get(Constants.DEPENDENCE), ConditionsParameters.class);
}
return conditionsParameters;
}

public ConditionsParameters.ConditionResult getConditionResult() {
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
String conditionResult = (String) taskParamsMap.getOrDefault(Constants.CONDITION_RESULT, "");
if (StringUtils.isNotEmpty(conditionResult)) {
return JSONUtils.parseObject(conditionResult, new TypeReference<ConditionsParameters.ConditionResult>() {
});
}
return null;
}

public void setConditionResult(ConditionsParameters conditionsParameters) {
if (conditionsParameters == null) {
return;
}
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
if (taskParamsMap == null) {
taskParamsMap = new HashMap<>();
}
taskParamsMap.put(Constants.CONDITION_RESULT, JSONUtils.toJsonString(conditionsParameters));
this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
}

public SwitchParameters getSwitchDependency() {
// todo: We need to directly use Jackson to deserialize the taskParam, rather than parse the map and get from
// field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,18 +385,19 @@ public void taskFinished(TaskInstance taskInstance) throws StateEventHandleExcep
// retry task
log.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().isFailure()) {
} else if (taskInstance.getState().isFailure() || taskInstance.getState().isKill()
|| taskInstance.getState().isStop()) {
completeTaskSet.add(taskInstance.getTaskCode());
ProjectUser projectUser =
processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance, taskInstance, projectUser);
listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance, taskInstance);
if (isTaskNeedPutIntoErrorMap(taskInstance)) {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
}
// There are child nodes and the failure policy is: CONTINUE
if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
taskInstance.getTaskCode(),
workflowExecuteContext.getWorkflowGraph().getDag())) {
submitPostNode(taskInstance.getTaskCode());
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (workflowInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
Expand Down Expand Up @@ -805,10 +806,7 @@ private void initTaskQueue() throws StateEventHandleException, CronParseExceptio
completeTaskSet.add(task.getTaskCode());
continue;
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getTaskCode(),
workflowExecuteContext.getWorkflowGraph().getDag())) {
continue;
}

if (task.taskCanRetry()) {
if (task.getState().isNeedFaultTolerance()) {
log.info("TaskInstance needs fault tolerance, will be added to standby list.");
Expand All @@ -824,7 +822,7 @@ private void initTaskQueue() throws StateEventHandleException, CronParseExceptio
}
continue;
}
if (task.getState().isFailure()) {
if (isTaskNeedPutIntoErrorMap(task)) {
errorTaskMap.put(task.getTaskCode(), task.getId());
}
} finally {
Expand Down Expand Up @@ -2015,6 +2013,24 @@ private void saveCacheTaskInstance(TaskInstance taskInstance) {
}
}

/**
* Whether the task instance need to put into {@link #errorTaskMap}.
* Only the task instance is failed or killed, and it is parent of condition task.
* Then it should be put into {@link #errorTaskMap}.
* <p> Once a task instance is put into {@link #errorTaskMap}, it will be thought as failed and make the workflow be failed.
*/
Comment on lines +2016 to +2021
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

private boolean isTaskNeedPutIntoErrorMap(TaskInstance taskInstance) {
if (!taskInstance.getState().isFailure() && !taskInstance.getState().isStop()
&& !taskInstance.getState().isKill()) {
return false;
}
TaskNode taskNode = workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode());
if (DagHelper.haveConditionsAfterNode(taskNode.getCode(), workflowExecuteContext.getWorkflowGraph().getDag())) {
return false;
}
return true;
}

private enum WorkflowRunnableStatus {
CREATED, INITIALIZE_QUEUE, STARTED,
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.exception.LogicTaskInitializeException;
import org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask;

import java.util.List;
Expand All @@ -40,37 +38,34 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ConditionLogicTask extends BaseSyncLogicTask<DependentParameters> {
public class ConditionLogicTask extends BaseSyncLogicTask<ConditionsParameters> {

public static final String TASK_TYPE = "CONDITIONS";

private final TaskInstanceDao taskInstanceDao;
private final ProcessInstanceDao workflowInstanceDao;

private final TaskInstance taskInstance;

public ConditionLogicTask(TaskExecutionContext taskExecutionContext,
ProcessInstanceExecCacheManager processInstanceExecCacheManager,
TaskInstance taskInstance,
TaskInstanceDao taskInstanceDao,
ProcessInstanceDao workflowInstanceDao) throws LogicTaskInitializeException {
ProcessInstanceDao workflowInstanceDao) {
// todo: we need to change the parameter in front-end, so that we can directly use json to parse
super(taskExecutionContext,
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
.orElseThrow(() -> new LogicTaskInitializeException(
"Cannot find the task instance in workflow execute runnable"))
.getDependency());
// todo:check the parameters, why we don't use conditionTask? taskInstance.getDependency();
super(taskExecutionContext, taskInstance.getConditionsParameters());
this.taskInstanceDao = taskInstanceDao;
this.workflowInstanceDao = workflowInstanceDao;
this.taskInstance = taskInstance;
}

@Override
public void handle() {
// calculate the conditionResult
DependResult conditionResult = calculateConditionResult();
TaskExecutionStatus taskExecutionStatus =
(conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
log.info("The condition result is {}, task instance statue will be: {}", conditionResult, taskExecutionStatus);
taskExecutionContext.setCurrentExecutionStatus(taskExecutionStatus);
log.info("The condition result is {}", conditionResult);
taskParameters.setConditionSuccess(conditionResult == DependResult.SUCCESS);
taskInstance.setConditionsParameters(taskParameters);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the doc is correct, we don't need to update it.

}

private DependResult calculateConditionResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.master.runner.task.condition;

import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
Expand All @@ -43,7 +44,15 @@ public class ConditionLogicTaskPluginFactory implements ILogicTaskPluginFactory<

@Override
public ConditionLogicTask createLogicTask(TaskExecutionContext taskExecutionContext) throws LogicTaskInitializeException {
return new ConditionLogicTask(taskExecutionContext, processInstanceExecCacheManager, taskInstanceDao,
TaskInstance taskInstance =
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
.orElseThrow(() -> new LogicTaskInitializeException(
"Cannot find the task instance in workflow execute runnable"));
return new ConditionLogicTask(
taskExecutionContext,
taskInstance,
taskInstanceDao,
processInstanceDao);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -71,6 +72,9 @@ public class ListenerEventAlertManager {
@Autowired
private AlertPluginInstanceMapper alertPluginInstanceMapper;

@Autowired
private ProcessService processService;

public void publishServerDownListenerEvent(String host, String type) {
ServerDownListenerEvent event = new ServerDownListenerEvent();
event.setEventTime(new Date());
Expand Down Expand Up @@ -214,8 +218,9 @@ public void publishTaskEndListenerEvent(ProcessInstance processInstance,
}

public void publishTaskFailListenerEvent(ProcessInstance processInstance,
TaskInstance taskInstance,
ProjectUser projectUser) {
TaskInstance taskInstance) {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());

TaskFailListenerEvent event = new TaskFailListenerEvent();
event.setProjectCode(projectUser.getProjectCode());
event.setProjectName(projectUser.getProjectName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ public static List<TaskNodeRelation> generateRelationListByFlowNodes(List<TaskNo
/**
* generate task nodes needed by dag
*
* @param taskNodeList taskNodeList
* @param startNodeNameList startNodeNameList
* @param taskNodeList taskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskDependType taskDependType
* @param taskDependType taskDependType
* @return task node list
*/
public static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> taskNodeList,
Expand Down Expand Up @@ -139,7 +139,7 @@ public static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> task
/**
* find all the nodes that depended on the start node
*
* @param startNode startNode
* @param startNode startNode
* @param taskNodeList taskNodeList
* @return task node list
*/
Expand All @@ -166,9 +166,9 @@ private static List<TaskNode> getFlowNodeListPost(TaskNode startNode,
/**
* find all nodes that start nodes depend on.
*
* @param startNode startNode
* @param startNode startNode
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskNodeList taskNodeList
* @param taskNodeList taskNodeList
* @return task node list
*/
private static List<TaskNode> getFlowNodeListPre(TaskNode startNode,
Expand Down Expand Up @@ -204,10 +204,10 @@ private static List<TaskNode> getFlowNodeListPre(TaskNode startNode,
/**
* generate dag by start nodes and recovery nodes
*
* @param totalTaskNodeList totalTaskNodeList
* @param startNodeNameList startNodeNameList
* @param totalTaskNodeList totalTaskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param depNodeType depNodeType
* @param depNodeType depNodeType
* @return process dag
* @throws Exception if error throws Exception
*/
Expand All @@ -232,7 +232,7 @@ public static ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
* find node by node name
*
* @param nodeDetails nodeDetails
* @param nodeName nodeName
* @param nodeName nodeName
* @return task node
*/
public static TaskNode findNodeByName(List<TaskNode> nodeDetails, String nodeName) {
Expand All @@ -248,7 +248,7 @@ public static TaskNode findNodeByName(List<TaskNode> nodeDetails, String nodeNam
* find node by node code
*
* @param nodeDetails nodeDetails
* @param nodeCode nodeCode
* @param nodeCode nodeCode
* @return task node
*/
public static TaskNode findNodeByCode(List<TaskNode> nodeDetails, Long nodeCode) {
Expand All @@ -263,8 +263,8 @@ public static TaskNode findNodeByCode(List<TaskNode> nodeDetails, Long nodeCode)
/**
* the task can be submit when all the depends nodes are forbidden or complete
*
* @param taskNode taskNode
* @param dag dag
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return can submit
*/
Expand Down Expand Up @@ -369,22 +369,20 @@ public static List<Long> parseConditionTask(Long nodeCode,
return conditionTaskList;
}
TaskInstance taskInstance = completeTaskList.get(nodeCode);
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
ConditionsParameters conditionsParameters = taskInstance.getConditionsParameters();
ConditionsParameters.ConditionResult conditionResult = taskInstance.getConditionResult();

List<Long> skipNodeList = new ArrayList<>();
if (taskInstance.getState().isSuccess()) {
conditionTaskList = conditionsParameters.getSuccessNode();
skipNodeList = conditionsParameters.getFailedNode();
} else if (taskInstance.getState().isFailure()) {
conditionTaskList = conditionsParameters.getFailedNode();
skipNodeList = conditionsParameters.getSuccessNode();
if (conditionsParameters.isConditionSuccess()) {
conditionTaskList = conditionResult.getSuccessNode();
skipNodeList = conditionResult.getFailedNode();
} else {
conditionTaskList.add(nodeCode);
conditionTaskList = conditionResult.getFailedNode();
skipNodeList = conditionResult.getSuccessNode();
}
// the skipNodeList maybe null if no next task
skipNodeList = Optional.ofNullable(skipNodeList).orElse(new ArrayList<>());
for (Long failedNode : skipNodeList) {
setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList);

if (CollectionUtils.isNotEmpty(skipNodeList)) {
skipNodeList.forEach(skipNode -> setTaskNodeSkip(skipNode, dag, completeTaskList, skipTaskNodeList));
}
// the conditionTaskList maybe null if no next task
conditionTaskList = Optional.ofNullable(conditionTaskList).orElse(new ArrayList<>());
Expand Down Expand Up @@ -447,6 +445,7 @@ public static List<Long> skipTaskNode4Switch(TaskNode taskNode,

/**
* get all downstream nodes of the branch that the switch node needs to execute
*
* @param taskCode
* @param dag
* @param switchNeedWorkCodes
Expand Down Expand Up @@ -480,6 +479,7 @@ private static void setSwitchTaskNodeSkip(Long skipNodeCode,
}
}
}

/**
* set task node and the post nodes skip flag
*/
Expand Down
Loading
Loading