From 94bafb4f0bba1ce1ba106feb383c9cc69776a9c2 Mon Sep 17 00:00:00 2001 From: tobe Date: Tue, 12 Jul 2022 16:04:01 +0800 Subject: [PATCH 1/2] Pass sql file absolute path and fix for yarn-client mode --- .../openmldb/batchjob/util/OpenmldbJobUtil.scala | 8 ++++++-- .../openmldb/taskmanager/OpenmldbBatchjobManager.scala | 10 +++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala b/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala index f6a080097b7..aeba3a394a9 100644 --- a/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala +++ b/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala @@ -31,13 +31,17 @@ object OpenmldbJobUtil { def getSqlFromFile(spark: SparkSession, sqlFilePath: String): String = { val sparkMaster = spark.conf.get("spark.master") + val sparkDeployMode = spark.conf.get("spark.submit.deployMode") - val actualSqlFilePath = if (sparkMaster.equals("local")) { - SparkFiles.get(sqlFilePath) + val actualSqlFilePath = if (sparkMaster.equalsIgnoreCase("yarn") && + sparkDeployMode.equalsIgnoreCase("cluster")) { + sqlFilePath.split("/").last } else { sqlFilePath } + println("tobe1: actualSqlFilePath: " + actualSqlFilePath) + if (!File(actualSqlFilePath).exists) { throw new Exception("SQL file does not exist in " + actualSqlFilePath) } diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala index da1b77a0cd2..52396332b69 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala @@ -49,7 +49,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.RunBatchSql" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) val jobInfo = SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb, blocking=true) @@ -62,7 +62,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.RunBatchAndShow" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) @@ -73,7 +73,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.ImportOnlineData" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) @@ -84,7 +84,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.ImportOfflineData" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) @@ -95,7 +95,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.ExportOfflineData" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) From c58a67a99f12f95001fc0d0869051b60d48af011 Mon Sep 17 00:00:00 2001 From: tobe Date: Thu, 14 Jul 2022 11:41:29 +0800 Subject: [PATCH 2/2] Remove the debug code --- .../com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala b/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala index aeba3a394a9..aa2a21153cf 100644 --- a/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala +++ b/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala @@ -40,8 +40,6 @@ object OpenmldbJobUtil { sqlFilePath } - println("tobe1: actualSqlFilePath: " + actualSqlFilePath) - if (!File(actualSqlFilePath).exists) { throw new Exception("SQL file does not exist in " + actualSqlFilePath) }