From 2b604c0162b6c653dc6074232a11a44f10dce903 Mon Sep 17 00:00:00 2001 From: davidhua Date: Mon, 4 Jul 2022 14:01:52 +0800 Subject: [PATCH 1/2] Enable the streamis to support different version of Linkis --- .../conf/JobLauncherConfiguration.scala | 5 ++ .../job/manager/FlinkJobLaunchManager.scala | 2 +- .../job/manager/LinkisJobLaunchManager.scala | 90 ++++++++++++++++++- .../manager/SimpleFlinkJobLaunchManager.scala | 1 - .../src/main/resources/linkis.properties | 5 +- 5 files changed, 99 insertions(+), 4 deletions(-) diff --git a/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/conf/JobLauncherConfiguration.scala b/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/conf/JobLauncherConfiguration.scala index 79d3d0ae1..812475016 100644 --- a/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/conf/JobLauncherConfiguration.scala +++ b/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/conf/JobLauncherConfiguration.scala @@ -35,6 +35,10 @@ object JobLauncherConfiguration { */ val FLINK_CHECKPOINT_PATH: CommonVars[String] = CommonVars("wds.streamis.launch.flink.checkpoint.dir", "/flink/flink-checkpoints") + /** + * Linkis release version + */ + val FLINK_LINKIS_RELEASE_VERSION: CommonVars[String] = CommonVars("wds.streamis.launch.flink.linkis.release.version", "") /** * Variable: savepoint path */ @@ -44,4 +48,5 @@ object JobLauncherConfiguration { * Variable: flink app */ val VAR_FLINK_APP_NAME: CommonVars[String] = CommonVars("wds.streamis.launch.variable.flink.app.name", "flink.app.name") + } diff --git a/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/FlinkJobLaunchManager.scala b/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/FlinkJobLaunchManager.scala index 99374099c..a00edd8e8 100644 --- a/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/FlinkJobLaunchManager.scala +++ b/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/FlinkJobLaunchManager.scala @@ -48,7 +48,7 @@ trait FlinkJobLaunchManager extends LinkisJobLaunchManager with Logging { * @param jobState job state used to launch * @return the job id. */ - override def launch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] = { + override def innerLaunch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] = { // Transform the JobState into the params in LaunchJob Option(jobState).foreach(state => { val startUpParams = TaskUtils.getStartupMap(job.getParams) diff --git a/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/LinkisJobLaunchManager.scala b/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/LinkisJobLaunchManager.scala index 978d4a682..26d76cfdf 100644 --- a/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/LinkisJobLaunchManager.scala +++ b/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/LinkisJobLaunchManager.scala @@ -1,9 +1,97 @@ package com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager import com.webank.wedatasphere.streamis.jobmanager.launcher.job.manager.JobLaunchManager +import com.webank.wedatasphere.streamis.jobmanager.launcher.job.state.JobState +import com.webank.wedatasphere.streamis.jobmanager.launcher.job.{JobClient, LaunchJob} +import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.conf.JobLauncherConfiguration import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.LinkisJobInfo +import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager.LinkisJobLaunchManager.LINKIS_JAR_VERSION_PATTERN +import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.computation.client.LinkisJob +import org.apache.linkis.protocol.utils.TaskUtils -trait LinkisJobLaunchManager extends JobLaunchManager[LinkisJobInfo] { +import java.util +import scala.collection.JavaConverters._ +import scala.util.matching.Regex +trait LinkisJobLaunchManager extends JobLaunchManager[LinkisJobInfo] with Logging{ + /** + * This method is used to launch a new job. + * + * @param job a StreamisJob wanted to be launched. + * @param jobState job state used to launch + * @return the job id. + */ + override def launch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] = { + // Support different version of Linkis + var linkisVersion = JobLauncherConfiguration.FLINK_LINKIS_RELEASE_VERSION.getValue + if (StringUtils.isBlank(linkisVersion)) { + val linkisJarPath = classOf[LinkisJob].getProtectionDomain.getCodeSource.getLocation.getPath; + val lastSplit = linkisJarPath.lastIndexOf(IOUtils.DIR_SEPARATOR); + if (lastSplit >= 0) { + linkisVersion = linkisJarPath.substring(lastSplit + 1) + } + } + if (StringUtils.isNotBlank(linkisVersion)) { + Utils.tryAndWarn { + val LINKIS_JAR_VERSION_PATTERN(version) = linkisVersion + linkisVersion = version + } + } + if (StringUtils.isNotBlank(linkisVersion)){ + val versionSplitter: Array[String] = linkisVersion.split("\\.") + val major = Integer.valueOf(versionSplitter(0)) + val sub = Integer.valueOf(versionSplitter(1)) + val fix = Integer.valueOf(versionSplitter(2)) + val versionNum = major * 10000 + sub * 100 + fix + info(s"Recognized the linkis release version: [${linkisVersion}, version number: [${versionNum}]") + if (versionNum <= 10101){ + warn("Linkis version number is less than [10101], should compatible the startup params in launcher.") + val startupParams = TaskUtils.getStartupMap(job.getParams) + // Change the unit of memory params for linkis older version + changeUnitOfMemoryToG(startupParams, "flink.taskmanager.memory") + changeUnitOfMemoryToG(startupParams, "flink.jobmanager.memory") + // Avoid the _FLINK_CONFIG_. prefix for linkis older version + val newParams = avoidParamsPrefix(startupParams, "_FLINK_CONFIG_.") + startupParams.clear(); + startupParams.putAll(newParams) + } + } + innerLaunch(job, jobState) + } + private def changeUnitOfMemoryToG(params: util.Map[String, Any], name: String): Unit = { + params.get(name) match { + case memory: String => + var actualMem = Integer.valueOf(memory) / 1024 + actualMem = if (actualMem <= 0) 1 else actualMem + info(s"Change the unit of startup param: [${name}], value [${memory}] => [${actualMem}]") + params.put(name, actualMem) + case _ => // Ignores + } + } + + /** + * Avoid params prefix + * @param params params + * @param prefix prefix + */ + private def avoidParamsPrefix(params: util.Map[String, Any], prefix: String): util.Map[String, Any] = { + params.asScala.map{ + case (key, value) => + if (key.startsWith(prefix)){ + info(s"Avoid the prefix of startup param: [${key}] => [${key.substring(prefix.length)}]") + (key.substring(prefix.length), value) + } else { + (key, value) + } + }.toMap.asJava + } + def innerLaunch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] +} + +object LinkisJobLaunchManager{ + val LINKIS_JAR_VERSION_PATTERN: Regex = "^[\\s\\S]*([\\d]+\\.[\\d]+\\.[\\d]+)[\\s\\S]*$".r } diff --git a/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/SimpleFlinkJobLaunchManager.scala b/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/SimpleFlinkJobLaunchManager.scala index 420bbda52..6ad2e4f88 100644 --- a/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/SimpleFlinkJobLaunchManager.scala +++ b/streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/SimpleFlinkJobLaunchManager.scala @@ -130,5 +130,4 @@ class SimpleFlinkJobLaunchManager extends FlinkJobLaunchManager { object SimpleFlinkJobLaunchManager{ val INSTANCE_NAME = "simpleFlink"; - } diff --git a/streamis-server/src/main/resources/linkis.properties b/streamis-server/src/main/resources/linkis.properties index b330b69dc..161378438 100644 --- a/streamis-server/src/main/resources/linkis.properties +++ b/streamis-server/src/main/resources/linkis.properties @@ -59,4 +59,7 @@ wds.linkis.server.mybatis.BasePackage=com.webank.wedatasphere.streamis.datasourc com.webank.wedatasphere.streamis.projectmanager.dao # Make sure that can fetch the application info finally -wds.streamis.application.info.fetch.max=20 \ No newline at end of file +wds.streamis.application.info.fetch.max=20 + +# To use the complete features of streamis in linkis 1.1.2 +#wds.streamis.launch.flink.linkis.release.version=1.1.2 \ No newline at end of file From ac9f82426780fe72eeb936438114fcbfe3770a80 Mon Sep 17 00:00:00 2001 From: davidhua Date: Mon, 4 Jul 2022 14:08:02 +0800 Subject: [PATCH 2/2] Change the format of linkis.properties --- .../src/main/resources/linkis.properties | 128 +++++++++--------- 1 file changed, 64 insertions(+), 64 deletions(-) diff --git a/streamis-server/src/main/resources/linkis.properties b/streamis-server/src/main/resources/linkis.properties index 161378438..b80410c10 100644 --- a/streamis-server/src/main/resources/linkis.properties +++ b/streamis-server/src/main/resources/linkis.properties @@ -1,65 +1,65 @@ -# -# Copyright 2021 WeBank -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -#wds.linkis.test.mode=true -wds.linkis.server.mybatis.datasource.url=jdbc:mysql://localhost:3306/streamis?characterEncoding=UTF-8 -wds.linkis.server.mybatis.datasource.username=user1 - -wds.linkis.server.mybatis.datasource.password=pwd1 -wds.linkis.gateway.ip= -wds.linkis.gateway.port= -wds.linkis.gateway.url=http://localhost:9001 - -wds.linkis.mysql.is.encrypt=false -##restful -wds.linkis.log.clear=true -wds.linkis.server.version=v1 -#wds.linkis.test.user=user1 - - - -##restful -wds.linkis.server.restful.scan.packages=com.webank.wedatasphere.streamis.datasource.server.restful.api,\ - com.webank.wedatasphere.streamis.project.server.restful,\ - com.webank.wedatasphere.streamis.jobmanager.restful.api,\ - com.webank.wedatasphere.streamis.datasource.execute.rest,\ - com.webank.wedatasphere.streamis.projectmanager.restful.api -##mybatis -wds.linkis.server.mybatis.mapperLocations=\ - classpath*:com/webank/wedatasphere/streamis/datasource/manager/dao/impl/*.xml,\ - classpath*:com/webank/wedatasphere/streamis/project/server/dao/impl/*.xml,\ - classpath*:com/webank/wedatasphere/streamis/jobmanager/launcher/dao/impl/*.xml,\ - classpath*:com/webank/wedatasphere/streamis/jobmanager/manager/dao/impl/*.xml,\ - classpath*:com/webank/wedatasphere/streamis/projectmanager/dao/impl/*.xml - -wds.linkis.server.mybatis.typeAliasesPackage=com.webank.wedatasphere.streamis.datasource.manager.domain,\ - com.webank.wedatasphere.streamis.jobmanager.launcher.entity,\ - com.webank.wedatasphere.streamis.jobmanager.manager.entity,\ - com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo,\ - com.webank.wedatasphere.streamis.jobmanager.launcher.entity.vo,\ - com.webank.wedatasphere.streamis.projectmanager.entity - - -wds.linkis.server.mybatis.BasePackage=com.webank.wedatasphere.streamis.datasource.manager.dao,\ - org.apache.linkis.bml.dao,\ - com.webank.wedatasphere.streamis.project.server.dao,\ - com.webank.wedatasphere.streamis.jobmanager.launcher.dao,\ - com.webank.wedatasphere.streamis.jobmanager.manager.dao,\ - com.webank.wedatasphere.streamis.projectmanager.dao - -# Make sure that can fetch the application info finally -wds.streamis.application.info.fetch.max=20 - -# To use the complete features of streamis in linkis 1.1.2 +# +# Copyright 2021 WeBank +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +#wds.linkis.test.mode=true +wds.linkis.server.mybatis.datasource.url=jdbc:mysql://localhost:3306/streamis?characterEncoding=UTF-8 +wds.linkis.server.mybatis.datasource.username=user1 + +wds.linkis.server.mybatis.datasource.password=pwd1 +wds.linkis.gateway.ip= +wds.linkis.gateway.port= +wds.linkis.gateway.url=http://localhost:9001 + +wds.linkis.mysql.is.encrypt=false +##restful +wds.linkis.log.clear=true +wds.linkis.server.version=v1 +#wds.linkis.test.user=user1 + + + +##restful +wds.linkis.server.restful.scan.packages=com.webank.wedatasphere.streamis.datasource.server.restful.api,\ + com.webank.wedatasphere.streamis.project.server.restful,\ + com.webank.wedatasphere.streamis.jobmanager.restful.api,\ + com.webank.wedatasphere.streamis.datasource.execute.rest,\ + com.webank.wedatasphere.streamis.projectmanager.restful.api +##mybatis +wds.linkis.server.mybatis.mapperLocations=\ + classpath*:com/webank/wedatasphere/streamis/datasource/manager/dao/impl/*.xml,\ + classpath*:com/webank/wedatasphere/streamis/project/server/dao/impl/*.xml,\ + classpath*:com/webank/wedatasphere/streamis/jobmanager/launcher/dao/impl/*.xml,\ + classpath*:com/webank/wedatasphere/streamis/jobmanager/manager/dao/impl/*.xml,\ + classpath*:com/webank/wedatasphere/streamis/projectmanager/dao/impl/*.xml + +wds.linkis.server.mybatis.typeAliasesPackage=com.webank.wedatasphere.streamis.datasource.manager.domain,\ + com.webank.wedatasphere.streamis.jobmanager.launcher.entity,\ + com.webank.wedatasphere.streamis.jobmanager.manager.entity,\ + com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo,\ + com.webank.wedatasphere.streamis.jobmanager.launcher.entity.vo,\ + com.webank.wedatasphere.streamis.projectmanager.entity + + +wds.linkis.server.mybatis.BasePackage=com.webank.wedatasphere.streamis.datasource.manager.dao,\ + org.apache.linkis.bml.dao,\ + com.webank.wedatasphere.streamis.project.server.dao,\ + com.webank.wedatasphere.streamis.jobmanager.launcher.dao,\ + com.webank.wedatasphere.streamis.jobmanager.manager.dao,\ + com.webank.wedatasphere.streamis.projectmanager.dao + +# Make sure that can fetch the application info finally +wds.streamis.application.info.fetch.max=20 + +# To use the complete features of streamis in linkis 1.1.2 #wds.streamis.launch.flink.linkis.release.version=1.1.2 \ No newline at end of file