Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tracker service to track unfinished jobs #1474

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class TaskManagerConfig {
public static String SPARK_HOME;
public static int PREFETCH_JOBID_NUM;
public static String JOB_LOG_PATH;
public static boolean TRACK_UNFINISHED_JOBS;
public static int JOB_TRACKER_INTERVAL;
public static String SPARK_DEFAULT_CONF;
public static String SPARK_EVENTLOG_DIR;
public static int SPARK_YARN_MAXAPPATTEMPTS;
Expand Down Expand Up @@ -148,6 +150,13 @@ public static void parse() throws IOException, NumberFormatException, ConfigExce
}
}

TRACK_UNFINISHED_JOBS = Boolean.parseBoolean(prop.getProperty("track.unfinished.jobs", "true"));

JOB_TRACKER_INTERVAL = Integer.parseInt(prop.getProperty("job.tracker.interval", "30"));
if (JOB_TRACKER_INTERVAL <= 0) {
throw new ConfigException("job.tracker.interval", "interval should be larger than 0");
}

SPARK_DEFAULT_CONF = prop.getProperty("spark.default.conf", "");
if (!SPARK_DEFAULT_CONF.isEmpty()) {
String[] defaultSparkConfs = TaskManagerConfig.SPARK_DEFAULT_CONF.split(",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ public boolean isFinished() {
}

public boolean isYarnJob() {
return cluster.toLowerCase().equals("yarn");
return cluster.toLowerCase().startsWith("yarn");
}

public boolean isYarnClusterJob() {
return cluster.equalsIgnoreCase("yarn") || cluster.equalsIgnoreCase("yarn-cluster");
}

public void sync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com._4paradigm.openmldb.taskmanager.server;

import com._4paradigm.openmldb.taskmanager.tracker.JobTrackerService;
import com._4paradigm.openmldb.taskmanager.zk.FailoverWatcher;
import lombok.extern.slf4j.Slf4j;
import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig;
Expand All @@ -39,6 +40,12 @@ public void start() {
logger.info("The server runs and prepares for leader election");
if (failoverWatcher.blockUntilActive()) {
logger.info("The server becomes active master and prepare to do business logic");
if (TaskManagerConfig.TRACK_UNFINISHED_JOBS) {
// Start threads to track unfinished jobs
JobTrackerService.startTrackerThreads();
}

// Start brpc server
startBrpcServer();
}
failoverWatcher.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ server.worker_threads=4
server.io_threads=4
prefetch.jobid.num=1
job.log.path=./logs/
track.unfinished.jobs=true
job.tracker.interval=30

# OpenMLDB Config
zookeeper.cluster=0.0.0.0:2181
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com._4paradigm.openmldb.taskmanager.tracker

import com._4paradigm.openmldb.taskmanager.JobInfoManager
import org.apache.spark.launcher.SparkAppHandle.State
import org.slf4j.LoggerFactory

object JobTrackerService {
private val logger = LoggerFactory.getLogger(this.getClass)

def startTrackerThreads(): Unit = {
// Get unfinished jobs
val unfinishedJobs = JobInfoManager.getUnfinishedJobs()

for (job <- unfinishedJobs) {
if (job.isYarnClusterJob && job.getApplicationId.nonEmpty) {
logger.info("Start tracker thread to track job: " + job)

// Get submitted yarn jobs
val trackerThread = new YarnJobTrackerThread(job)

// Start thread to track state
trackerThread.start()
} else {
// Can not track local job, yarn-client job or job without application id, set state as LOST
logger.info("Unable to track job state: " + job)

job.setState(State.LOST.toString)
job.sync()
}
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com._4paradigm.openmldb.taskmanager.tracker


import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig
import com._4paradigm.openmldb.taskmanager.dao.JobInfo
import com._4paradigm.openmldb.taskmanager.yarn.YarnClientUtil
import com._4paradigm.openmldb.taskmanager.yarn.YarnClientUtil.parseAppIdStr
import org.apache.spark.launcher.SparkAppHandle.State

class YarnJobTrackerThread(job: JobInfo) extends Thread {

override def run() {
val yarnClient = YarnClientUtil.createYarnClient()

if (job.getApplicationId.isEmpty) {
job.setState(State.LOST.toString)
job.sync()
return
}

val appId = parseAppIdStr(job.getApplicationId)
val appReport = yarnClient.getApplicationReport(appId)
var lastYarnState = appReport.getYarnApplicationState.toString.toLowerCase()

while(true) {
// Get final state and exit
if (JobInfo.FINAL_STATE.contains(lastYarnState)) {
job.setState(lastYarnState)
job.sync()
return
}

// Sleep for interval time
Thread.sleep(TaskManagerConfig.JOB_TRACKER_INTERVAL * 1000)

val currentYarnState = appReport.getYarnApplicationState.toString.toLowerCase()

// Check if state changes
if (!currentYarnState.equals(lastYarnState)) {
job.setState(currentYarnState)
job.sync()
if (JobInfo.FINAL_STATE.contains(currentYarnState)) {
return
}

lastYarnState = currentYarnState
}

}

}

}