Skip to content

Commit

Permalink
feat: add check for taskmanager config (#1262)
Browse files Browse the repository at this point in the history
* Add check for taskmanager config

* Add config exception

* Add check for spark.default.conf

* Add config of hadoop.conf.dir

* Ignore the taskmanager client which starts local taskmanager server
  • Loading branch information
tobegit3hub authored Feb 21, 2022
1 parent 37e4a6f commit d4a07c5
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com._4paradigm.openmldb.taskmanager.config;

/**
* The exception for incorrect configuration.
*/
public class ConfigException extends Exception {

public ConfigException(String config, String message) {
super(String.format("Error of config '%s': %s", config, message));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

/**
* The global configuration of TaskManager.
*
* Need to call TaskManagerConfig.parser() before accessing configuration.
*/
public class TaskManagerConfig {
private static Logger logger = LoggerFactory.getLogger(TaskManagerConfig.class);

public static String HOST = "127.0.0.1";
public static int PORT = 9902;
public static int WORKER_THREAD = 4;
public static int IO_THREAD = 4;
public static String HOST;
public static int PORT;
public static int WORKER_THREAD;
public static int IO_THREAD;
public static String ZK_CLUSTER;
public static String ZK_ROOT_PATH;
public static String ZK_TASKMANAGER_PATH;
Expand All @@ -38,68 +45,169 @@ public class TaskManagerConfig {
public static int ZK_BASE_SLEEP_TIME;
public static int ZK_MAX_CONNECT_WAIT_TIME;
public static int ZK_MAX_RETRIES;
public static String OFFLINE_DATA_PREFIX;
public static String SPARK_MASTER;
public static String BATCHJOB_JAR_PATH;
public static String SPARK_YARN_JARS;
public static String SPARK_HOME;
public static int PREFETCH_JOBID_NUM;
public static String NAMENODE_URI;
public static String JOB_LOG_PATH;
public static String SPARK_DEFAULT_CONF;
public static String SPARK_EVENTLOG_DIR;
public static int SPARK_YARN_MAXAPPATTEMPTS;
public static String OFFLINE_DATA_PREFIX;
public static String NAMENODE_URI;
public static String BATCHJOB_JAR_PATH;
public static String HADOOP_CONF_DIR;

static {
try {
Properties prop = new Properties();
prop.load(TaskManagerConfig.class.getClassLoader().getResourceAsStream("taskmanager.properties"));
HOST = prop.getProperty("server.host", "127.0.0.1");
PORT = Integer.parseInt(prop.getProperty("server.port", "9902"));
WORKER_THREAD = Integer.parseInt(prop.getProperty("server.worker_threads", "4"));
IO_THREAD = Integer.parseInt(prop.getProperty("server.io_threads", "4"));
ZK_SESSION_TIMEOUT = Integer.parseInt(prop.getProperty("zookeeper.session_timeout", "5000"));
ZK_CLUSTER = prop.getProperty("zookeeper.cluster", "");
ZK_ROOT_PATH = prop.getProperty("zookeeper.root_path", "");
ZK_TASKMANAGER_PATH = ZK_ROOT_PATH + "/taskmanager";
ZK_MAX_JOB_ID_PATH = ZK_TASKMANAGER_PATH + "/max_job_id";
ZK_CONNECTION_TIMEOUT = Integer.parseInt(prop.getProperty("zookeeper.connection_timeout", "5000"));
ZK_BASE_SLEEP_TIME = Integer.parseInt(prop.getProperty("zookeeper.base_sleep_time", "1000"));
ZK_MAX_RETRIES = Integer.parseInt(prop.getProperty("zookeeper.max_retries", "10"));
ZK_MAX_CONNECT_WAIT_TIME = Integer.parseInt(prop.getProperty("zookeeper.max_connect_waitTime", "30000"));
OFFLINE_DATA_PREFIX = prop.getProperty("offline.data.prefix");
SPARK_MASTER = prop.getProperty("spark.master", "yarn");
BATCHJOB_JAR_PATH = prop.getProperty("batchjob.jar.path", "");
if (BATCHJOB_JAR_PATH.isEmpty()) {
BATCHJOB_JAR_PATH = BatchJobUtil.findLocalBatchJobJar();
public static void parse() throws IOException, NumberFormatException, ConfigException {
Properties prop = new Properties();
prop.load(TaskManagerConfig.class.getClassLoader().getResourceAsStream("taskmanager.properties"));

HOST = prop.getProperty("server.host", "0.0.0.0");
PORT = Integer.parseInt(prop.getProperty("server.port", "9902"));
if (PORT < 1 || PORT > 65535) {
throw new ConfigException("server.port", "invalid port, should be in range of 1 through 65535");
}
WORKER_THREAD = Integer.parseInt(prop.getProperty("server.worker_threads", "4"));
IO_THREAD = Integer.parseInt(prop.getProperty("server.io_threads", "4"));
ZK_SESSION_TIMEOUT = Integer.parseInt(prop.getProperty("zookeeper.session_timeout", "5000"));

ZK_CLUSTER = prop.getProperty("zookeeper.cluster", "");
if (ZK_CLUSTER.isEmpty()) {
throw new ConfigException("zookeeper.cluster", "should not be empty");
}

ZK_ROOT_PATH = prop.getProperty("zookeeper.root_path", "");
if (ZK_ROOT_PATH.isEmpty()) {
throw new ConfigException("zookeeper.root_path", "should not be empty");
}

ZK_TASKMANAGER_PATH = ZK_ROOT_PATH + "/taskmanager";
ZK_MAX_JOB_ID_PATH = ZK_TASKMANAGER_PATH + "/max_job_id";
ZK_CONNECTION_TIMEOUT = Integer.parseInt(prop.getProperty("zookeeper.connection_timeout", "5000"));
ZK_BASE_SLEEP_TIME = Integer.parseInt(prop.getProperty("zookeeper.base_sleep_time", "1000"));
ZK_MAX_RETRIES = Integer.parseInt(prop.getProperty("zookeeper.max_retries", "10"));
ZK_MAX_CONNECT_WAIT_TIME = Integer.parseInt(prop.getProperty("zookeeper.max_connect_waitTime", "30000"));

SPARK_MASTER = prop.getProperty("spark.master", "local").toLowerCase();
if (!Arrays.asList("local", "yarn", "yarn-cluster", "yarn-client").contains(SPARK_MASTER) ) {
throw new ConfigException("spark.master", "should be one of local, yarn, yarn-cluster or yarn-client");
}
boolean isLocal = SPARK_MASTER.equals("local");
boolean isYarn = SPARK_MASTER.startsWith("yarn");
boolean isYarnCluster = SPARK_MASTER.equals("yarn") || SPARK_MASTER.equals("yarn-cluster");

SPARK_YARN_JARS = prop.getProperty("spark.yarn.jars", "");
if (isLocal && !SPARK_YARN_JARS.isEmpty()) {
logger.warn("Ignore the config of spark.yarn.jars which is invalid for local mode");
}
if (isYarn) {
if (!SPARK_YARN_JARS.isEmpty() && SPARK_YARN_JARS.startsWith("file://")) {
throw new ConfigException("spark.yarn.jars", "should not use local filesystem for yarn mode");
}
SPARK_YARN_JARS = prop.getProperty("spark.yarn.jars");
SPARK_HOME = prop.getProperty("spark.home");
PREFETCH_JOBID_NUM = Integer.parseInt(prop.getProperty("prefetch.jobid.num", "10"));
NAMENODE_URI = prop.getProperty("namenode.uri", "");
JOB_LOG_PATH = prop.getProperty("job.log.path", "../log/");
SPARK_DEFAULT_CONF = prop.getProperty("spark.default.conf", "");
SPARK_EVENTLOG_DIR = prop.getProperty("spark.eventLog.dir", "");
SPARK_YARN_MAXAPPATTEMPTS = Integer.parseInt(prop.getProperty("spark.yarn.maxAppAttempts", "1"));

if (!JOB_LOG_PATH.isEmpty()) {
createJobLogPath(JOB_LOG_PATH);
}

SPARK_HOME = prop.getProperty("spark.home", "");
if (SPARK_HOME.isEmpty()) {
try {
if(System.getenv("SPARK_HOME") == null) {
throw new ConfigException("spark.home", "should set config 'spark.home' or environment variable 'SPARK_HOME'");
} else {
SPARK_HOME = System.getenv("SPARK_HOME");
}
} catch (Exception e) {
throw new ConfigException("spark.home", "should set environment variable 'SPARK_HOME' if 'spark.home' is null");
}
} catch (Exception e) {
e.printStackTrace();
}
}
// TODO: Check if we can get spark-submit

static void createJobLogPath(String logPath) throws Exception {
File directory = new File(String.valueOf(logPath));
if (!directory.exists()) {
logger.info("The log path of job does not exist: " + logPath);
boolean created = directory.mkdirs();
logger.info("Try to create log path and get result: " + created);
PREFETCH_JOBID_NUM = Integer.parseInt(prop.getProperty("prefetch.jobid.num", "1"));
if (PREFETCH_JOBID_NUM < 1) {
throw new ConfigException("prefetch.jobid.num", "should be larger or equal to 1");
}

NAMENODE_URI = prop.getProperty("namenode.uri", "");
if (!NAMENODE_URI.isEmpty()) {
logger.warn("Config of 'namenode.uri' will be deprecated later");
}

JOB_LOG_PATH = prop.getProperty("job.log.path", "../log/");
if (JOB_LOG_PATH.isEmpty()) {
throw new ConfigException("job.log.path", "should not be null");
} else {
logger.debug("The log path of job already exists: " + logPath);
if (JOB_LOG_PATH.startsWith("hdfs") || JOB_LOG_PATH.startsWith("s3")) {
throw new ConfigException("job.log.path", "only support local filesystem");
}

File directory = new File(JOB_LOG_PATH);
if (!directory.exists()) {
logger.info("The log path does not exist, try to create directory: " + JOB_LOG_PATH);
boolean created = directory.mkdirs();
if (created) {
throw new ConfigException("job.log.path", "fail to create log path");
}
}
}

SPARK_DEFAULT_CONF = prop.getProperty("spark.default.conf", "");
if (!SPARK_DEFAULT_CONF.isEmpty()) {
String[] defaultSparkConfs = TaskManagerConfig.SPARK_DEFAULT_CONF.split(",");
for (String sparkConfMap: defaultSparkConfs) {
if (!sparkConfMap.isEmpty()) {
String[] kv = sparkConfMap.split("=");
if (kv.length != 2) {
throw new ConfigException("spark.default.conf", String.format("error format of %s", sparkConfMap));
} else if (!kv[0].startsWith("spark")) {
throw new ConfigException("spark.default.conf", String.format("config key should start with 'spark' but get %s", kv[0]));
}
}
}
}

SPARK_EVENTLOG_DIR = prop.getProperty("spark.eventLog.dir", "");
if (!SPARK_EVENTLOG_DIR.isEmpty() && isYarn) {
// TODO: Check if we can use local filesystem with yarn-client mode
if (SPARK_EVENTLOG_DIR.startsWith("file://")) {
throw new ConfigException("spark.eventLog.dir", "should not use local filesystem for yarn mode");
}
}

SPARK_YARN_MAXAPPATTEMPTS = Integer.parseInt(prop.getProperty("spark.yarn.maxAppAttempts", "1"));
if (SPARK_YARN_MAXAPPATTEMPTS < 1) {
throw new ConfigException("spark.yarn.maxAppAttempts", "should be larger or equal to 1");
}
}

OFFLINE_DATA_PREFIX = prop.getProperty("offline.data.prefix", "file:///tmp/openmldb_offline_storage/");
if (OFFLINE_DATA_PREFIX.isEmpty()) {
throw new ConfigException("offline.data.prefix", "should not be null");
} else {
if (isYarnCluster && OFFLINE_DATA_PREFIX.startsWith("file://") ) {
throw new ConfigException("offline.data.prefix", "should not use local filesystem for yarn mode");
}
}

BATCHJOB_JAR_PATH = prop.getProperty("batchjob.jar.path", "");
if (BATCHJOB_JAR_PATH.isEmpty()) {
try {
BATCHJOB_JAR_PATH = BatchJobUtil.findLocalBatchJobJar();
} catch (Exception e) {
throw new ConfigException("batchjob.jar.path", "config is null and fail to load default openmldb-batchjob jar");
}
}

HADOOP_CONF_DIR = prop.getProperty("hadoop.conf.dir", "");
if (HADOOP_CONF_DIR.isEmpty()) {
try {
if(System.getenv("HADOOP_CONF_DIR") == null) {
throw new ConfigException("hadoop.conf.dir", "should set config 'hadoop.conf.dir' or environment variable 'HADOOP_CONF_DIR'");
} else {
HADOOP_CONF_DIR = System.getenv("HADOOP_CONF_DIR");
}
} catch (Exception e) {
throw new ConfigException("hadoop.conf.dir", "should set environment variable 'HADOOP_CONF_DIR' if 'hadoop.conf.dir' is null");
}
}
// TODO: Check if we can get core-site.xml

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.baidu.brpc.server.RpcServerOptions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;

@Slf4j
Expand Down Expand Up @@ -82,6 +81,13 @@ public void shutdown() {
}

public static void main(String[] args) {
try {
TaskManagerConfig.parse();
} catch (Exception e) {
logger.error(e.getMessage());
logger.error("Fail to parse config file or validate configurations, exit now");
return;
}
TaskManagerServer server = new TaskManagerServer();
server.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@ zookeeper.max_retries=10
zookeeper.base_sleep_time=1000
zookeeper.max_connect_waitTime=30000

# BatchJob Config
batchjob.jar.path=

# Hadoop Config
namenode.uri=
offline.data.prefix=file:///tmp/openmldb_offline_storage/

# Spark Config
spark.home=
spark.master=local
spark.yarn.jars=
spark.default.conf=
spark.eventLog.dir=
spark.yarn.maxAppAttempts=1
batchjob.jar.path=
namenode.uri=
offline.data.prefix=file:///tmp/openmldb_offline_storage/
hadoop.conf.dir=
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com._4paradigm.openmldb.taskmanager.yarn

import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, YarnApplicationState}
Expand All @@ -24,13 +25,10 @@ object YarnClientUtil {
}

def createYarnClient(): YarnClient = {
// TODO: Read yarn config from environment
val confPath = Thread.currentThread.getContextClassLoader.getResource("").getPath + File.separator + "conf"
val configuration = new Configuration()
System.out.println(confPath + File.separator + "core-site.xml");
configuration.addResource(new Path(confPath + File.separator + "core-site.xml"));
configuration.addResource(new Path(confPath + File.separator + "hdfs-site.xml"));
configuration.addResource(new Path(confPath + File.separator + "yarn-site.xml"));
configuration.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR + File.separator + "core-site.xml"));
configuration.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR + File.separator + "hdfs-site.xml"));
configuration.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR + File.separator + "yarn-site.xml"));

// Create yarn client
val yarnClient = YarnClient.createYarnClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.testng.annotations.BeforeClass;
import java.util.List;

@Ignore
public class TestTaskManagerClient {
TaskManagerServer server;
TaskManagerClient client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ batchjob.jar.path=

# Hadoop Config
namenode.uri=
offline.data.prefix=file:///tmp/openmldb_offline_storage/
offline.data.prefix=

# Spark Config
spark.master=local
Expand Down

0 comments on commit d4a07c5

Please sign in to comment.