diff --git a/kun-workflow/kun-workflow-executor/src/main/java/com/miotech/kun/workflow/executor/local/LocalExecutor.java b/kun-workflow/kun-workflow-executor/src/main/java/com/miotech/kun/workflow/executor/local/LocalExecutor.java index 5e1284f39..9fb89b287 100644 --- a/kun-workflow/kun-workflow-executor/src/main/java/com/miotech/kun/workflow/executor/local/LocalExecutor.java +++ b/kun-workflow/kun-workflow-executor/src/main/java/com/miotech/kun/workflow/executor/local/LocalExecutor.java @@ -275,7 +275,7 @@ public boolean recover() { List taskAttemptList = taskRunDao.fetchUnStartedTaskAttemptList(); logger.debug("recover taskAttempt to queue count = {}", taskAttemptList.size()); List runningTaskAttemptList = taskRunDao.fetchRunningTaskAttemptList(); - logger.debug("recover taskAttempt to workerPool count = {}", runningTaskAttemptList); + logger.debug("recover taskAttempt to workerPool count = {}", runningTaskAttemptList.size()); for (TaskAttempt taskAttempt : runningTaskAttemptList) { workerStarterThreadPool.submit(new WorkerStartRunner(taskAttempt)); } @@ -384,12 +384,19 @@ public void run() { } private void handleTimeoutAttempt(Long taskAttemptId) { - workerToken.release(); - logger.debug("taskAttemptId = {} release worker token, current size = {}", taskAttemptId, workerToken.availablePermits()); - workerPool.remove(taskAttemptId); - miscService.changeTaskAttemptStatus(taskAttemptId, TaskRunStatus.ERROR); - TaskAttempt taskAttempt = taskRunDao.fetchAttemptById(taskAttemptId).get(); - submit(taskAttempt, true); + //kill worker when worker time out + if (workerPool.containsKey(taskAttemptId)) { + Worker worker = workerFactory.getWorker(workerPool.get(taskAttemptId)); + worker.shutdown(); + logger.debug("worker is shutdown,taskAttemptId = {}", taskAttemptId); + workerPool.remove(taskAttemptId); + logger.debug("taskAttempt is removed from worker pool,taskAttemptId = {}", taskAttemptId); + workerToken.release(); + logger.debug("taskAttemptId = {} release worker token, current size = {}", taskAttemptId, workerToken.availablePermits()); + miscService.changeTaskAttemptStatus(taskAttemptId, TaskRunStatus.ERROR); + TaskAttempt taskAttempt = taskRunDao.fetchAttemptById(taskAttemptId).get(); + submit(taskAttempt, true); + } } class WaitAbort implements Runnable { diff --git a/kun-workflow/kun-workflow-worker/src/main/java/com/miotech/kun/workflow/worker/local/LocalWorker.java b/kun-workflow/kun-workflow-worker/src/main/java/com/miotech/kun/workflow/worker/local/LocalWorker.java index fa0240681..e1eebe643 100644 --- a/kun-workflow/kun-workflow-worker/src/main/java/com/miotech/kun/workflow/worker/local/LocalWorker.java +++ b/kun-workflow/kun-workflow-worker/src/main/java/com/miotech/kun/workflow/worker/local/LocalWorker.java @@ -106,7 +106,7 @@ public void start(ExecCommand command) { public void bind(HeartBeatMessage heartBeatMessage) { this.processId = heartBeatMessage.getWorkerId(); this.port = heartBeatMessage.getPort(); - this.taskAttemptId = taskAttemptId; + this.taskAttemptId = heartBeatMessage.getTaskAttemptId(); } private List buildCommand(String inputFile, String configFile, String outputFile, Long taskAttemptId) {