diff --git a/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala b/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala index f6a080097b7..aa2a21153cf 100644 --- a/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala +++ b/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/util/OpenmldbJobUtil.scala @@ -31,9 +31,11 @@ object OpenmldbJobUtil { def getSqlFromFile(spark: SparkSession, sqlFilePath: String): String = { val sparkMaster = spark.conf.get("spark.master") + val sparkDeployMode = spark.conf.get("spark.submit.deployMode") - val actualSqlFilePath = if (sparkMaster.equals("local")) { - SparkFiles.get(sqlFilePath) + val actualSqlFilePath = if (sparkMaster.equalsIgnoreCase("yarn") && + sparkDeployMode.equalsIgnoreCase("cluster")) { + sqlFilePath.split("/").last } else { sqlFilePath } diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala index da1b77a0cd2..52396332b69 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala @@ -49,7 +49,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.RunBatchSql" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) val jobInfo = SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb, blocking=true) @@ -62,7 +62,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.RunBatchAndShow" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) @@ -73,7 +73,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.ImportOnlineData" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) @@ -84,7 +84,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.ImportOfflineData" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) @@ -95,7 +95,7 @@ object OpenmldbBatchjobManager { val mainClass = "com._4paradigm.openmldb.batchjob.ExportOfflineData" val tempSqlFile = SqlFileUtil.createTempSqlFile(sql) - val args = List(tempSqlFile.getName) + val args = List(tempSqlFile.getAbsolutePath) SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb)