Skip to content

Commit

Permalink
Fix worker timeout/yide (#369)
Browse files Browse the repository at this point in the history
* fix kill worker

fix bug

* add log
  • Loading branch information
shikimoe authored Jan 18, 2021
1 parent 916b580 commit bffdf37
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public boolean recover() {
List<TaskAttempt> taskAttemptList = taskRunDao.fetchUnStartedTaskAttemptList();
logger.debug("recover taskAttempt to queue count = {}", taskAttemptList.size());
List<TaskAttempt> runningTaskAttemptList = taskRunDao.fetchRunningTaskAttemptList();
logger.debug("recover taskAttempt to workerPool count = {}", runningTaskAttemptList);
logger.debug("recover taskAttempt to workerPool count = {}", runningTaskAttemptList.size());
for (TaskAttempt taskAttempt : runningTaskAttemptList) {
workerStarterThreadPool.submit(new WorkerStartRunner(taskAttempt));
}
Expand Down Expand Up @@ -384,12 +384,19 @@ public void run() {
}

private void handleTimeoutAttempt(Long taskAttemptId) {
workerToken.release();
logger.debug("taskAttemptId = {} release worker token, current size = {}", taskAttemptId, workerToken.availablePermits());
workerPool.remove(taskAttemptId);
miscService.changeTaskAttemptStatus(taskAttemptId, TaskRunStatus.ERROR);
TaskAttempt taskAttempt = taskRunDao.fetchAttemptById(taskAttemptId).get();
submit(taskAttempt, true);
//kill worker when worker time out
if (workerPool.containsKey(taskAttemptId)) {
Worker worker = workerFactory.getWorker(workerPool.get(taskAttemptId));
worker.shutdown();
logger.debug("worker is shutdown,taskAttemptId = {}", taskAttemptId);
workerPool.remove(taskAttemptId);
logger.debug("taskAttempt is removed from worker pool,taskAttemptId = {}", taskAttemptId);
workerToken.release();
logger.debug("taskAttemptId = {} release worker token, current size = {}", taskAttemptId, workerToken.availablePermits());
miscService.changeTaskAttemptStatus(taskAttemptId, TaskRunStatus.ERROR);
TaskAttempt taskAttempt = taskRunDao.fetchAttemptById(taskAttemptId).get();
submit(taskAttempt, true);
}
}

class WaitAbort implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void start(ExecCommand command) {
public void bind(HeartBeatMessage heartBeatMessage) {
this.processId = heartBeatMessage.getWorkerId();
this.port = heartBeatMessage.getPort();
this.taskAttemptId = taskAttemptId;
this.taskAttemptId = heartBeatMessage.getTaskAttemptId();
}

private List<String> buildCommand(String inputFile, String configFile, String outputFile, Long taskAttemptId) {
Expand Down

0 comments on commit bffdf37

Please sign in to comment.