diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala index 417c377038..163d7aa4db 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala @@ -70,6 +70,8 @@ object Configuration extends Logging { val VARIABLE_OPERATION: Boolean = CommonVars("wds.linkis.variable.operation", false).getValue + val IS_VIEW_FS_ENV = CommonVars("wds.linkis.env.is.viewfs", true) + val ERROR_MSG_TIP = CommonVars( "linkis.jobhistory.error.msg.tip", diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala new file mode 100644 index 0000000000..c1f6bff1ed --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconn.computation.executor.hook + +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf +import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext} +import org.apache.linkis.engineconn.core.engineconn.EngineConnManager +import org.apache.linkis.engineconn.core.executor.ExecutorManager +import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.engine.RunType.RunType +import org.apache.linkis.rpc.Sender +import org.apache.linkis.udf.UDFClientConfiguration +import org.apache.linkis.udf.api.rpc.{RequestPythonModuleProtocol, ResponsePythonModuleProtocol} +import org.apache.linkis.udf.entity.PythonModuleInfoVO +import org.apache.commons.lang3.StringUtils +import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV + +import java.util +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The PythonModuleLoad class is designed to load Python modules into the execution environment + * dynamically. This class is not an extension of UDFLoad, but shares a similar philosophy of + * handling dynamic module loading based on user preferences and system configurations. + */ +abstract class PythonModuleLoad extends Logging { + + /** Abstract properties to be defined by the subclass */ + protected val engineType: String + protected val runType: RunType + + protected def getEngineType(): String = engineType + + protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String + + private def queryPythonModuleRpc( + userName: String, + engineType: String + ): java.util.List[PythonModuleInfoVO] = { + val infoList = Sender + .getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue) + .ask(RequestPythonModuleProtocol(userName, engineType)) + .asInstanceOf[ResponsePythonModuleProtocol] + .getModulesInfo() + infoList + } + + protected def getLoadPythonModuleCode: Array[String] = { + val engineCreationContext = + EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext + val user = engineCreationContext.getUser + + var infoList: util.List[PythonModuleInfoVO] = + Utils.tryAndWarn(queryPythonModuleRpc(user, getEngineType())) + if (infoList == null) { + logger.info("rpc get info is empty.") + infoList = new util.ArrayList[PythonModuleInfoVO]() + } + + if (infoList.isEmpty) { + val pmi = new PythonModuleInfoVO() + pmi.setPath("viewfs:///apps-data/hadoop/hello_world.py") + infoList.add(pmi) + + val pmi1 = new PythonModuleInfoVO() + pmi1.setPath("viewfs:///apps-data/hadoop/redis2.zip") + infoList.add(pmi1) + } + + // 替换Viewfs + if (IS_VIEW_FS_ENV.getValue) { + infoList.asScala.foreach { info => + val path = info.getPath + info.setPath(path.replace("hdfs://", "viewfs://")) + } + } + + logger.info(s"${user} load python modules: ") + infoList.asScala.foreach(l => logger.info(s"module name:${l.getName}, path:${l.getPath}\n")) + + // 创建加载code + val codes: mutable.Buffer[String] = infoList.asScala + .filter { info => StringUtils.isNotEmpty(info.getPath) } + .map(constructCode) + // 打印codes + val str: String = codes.mkString("\n") + logger.info(s"python codes: $str") + codes.toArray + } + + private def executeFunctionCode(codes: Array[String], executor: ComputationExecutor): Unit = { + if (null == codes || null == executor) { + return + } + codes.foreach { code => + logger.info("Submit function registration to engine, code: " + code) + Utils.tryCatch(executor.executeLine(new EngineExecutionContext(executor), code)) { + t: Throwable => + logger.error("Failed to load python module", t) + null + } + } + } + + /** + * Generate and execute the code necessary for loading Python modules. + * + * @param executor + * An object capable of executing code in the current engine context. + */ + protected def loadPythonModules(labels: Array[Label[_]]): Unit = { + + val codes = getLoadPythonModuleCode + logger.info(s"codes length: ${codes.length}") + if (null != codes && codes.nonEmpty) { + val executor = ExecutorManager.getInstance.getExecutorByLabels(labels) + if (executor != null) { + val className = executor.getClass.getName + logger.info(s"executor class: ${className}") + } else { + logger.error(s"Failed to load python, executor is null") + } + + executor match { + case computationExecutor: ComputationExecutor => + executeFunctionCode(codes, computationExecutor) + case _ => + } + } + logger.info(s"Successful to load python, engineType : ${engineType}") + } + +} + +// Note: The actual implementation of methods like `executeFunctionCode` and `construct diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala new file mode 100644 index 0000000000..9eb48c40cc --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconn.computation.executor.hook + +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineconn.common.creation.EngineCreationContext +import org.apache.linkis.engineconn.common.engineconn.EngineConn +import org.apache.linkis.engineconn.common.hook.EngineConnHook +import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel + +abstract class PythonModuleLoadEngineConnHook + extends PythonModuleLoad + with EngineConnHook + with Logging { + + override def afterExecutionExecute( + engineCreationContext: EngineCreationContext, + engineConn: EngineConn + ): Unit = { + Utils.tryAndWarnMsg{ + val codeLanguageLabel = new CodeLanguageLabel + codeLanguageLabel.setCodeType(runType.toString) + logger.info(s"engineType: ${engineType}") + val labels = Array[Label[_]](codeLanguageLabel) + loadPythonModules(labels) + }(s"Failed to load Python Modules: ${engineType}") + + } + + override def afterEngineServerStartFailed( + engineCreationContext: EngineCreationContext, + throwable: Throwable + ): Unit = { + logger.error(s"Failed to start Engine Server: ${throwable.getMessage}", throwable) + } + + override def beforeCreateEngineConn(engineCreationContext: EngineCreationContext): Unit = { + logger.info("Preparing to load Python Module...") + } + + override def beforeExecutionExecute( + engineCreationContext: EngineCreationContext, + engineConn: EngineConn + ): Unit = { + logger.info(s"Before executing command on load Python Module.") + } + +} diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala new file mode 100644 index 0000000000..0fe554f93d --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconn.computation.executor.hook + +import org.apache.linkis.manager.label.entity.engine.RunType +import org.apache.linkis.manager.label.entity.engine.RunType.RunType +import org.apache.linkis.udf.entity.PythonModuleInfoVO + +/** + * 定义一个用于Spark引擎的Python模块加载与执行挂钩的类 + */ +class PythonSparkEngineHook extends PythonModuleLoadEngineConnHook { + + // 设置engineType属性为"spark",表示此挂钩适用于Spark数据处理引擎 + override val engineType: String = "spark" + + // 设置runType属性为RunType.PYSPARK,表示此挂钩将执行PySpark类型的代码 + override protected val runType: RunType = RunType.PYSPARK + + // 重写constructCode方法,用于根据Python模块信息构造加载模块的代码 + override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String = { + // 使用pythonModuleInfo的path属性,构造SparkContext.addPyFile的命令字符串 + // 这个命令在PySpark环境中将模块文件添加到所有worker上,以便在代码中可以使用 + val path: String = pythonModuleInfo.getPath + val loadCode = s"sc.addPyFile('${path}')" + logger.info(s"pythonLoadCode: ${loadCode}") + loadCode + } + +} diff --git a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties index 3de8a6512b..a535e31ea0 100644 --- a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties @@ -24,7 +24,7 @@ wds.linkis.engineconn.debug.enable=true wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.spark.SparkEngineConnPlugin -wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook +wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook,org.apache.linkis.engineconn.computation.executor.hook.PythonSparkEngineHook linkis.spark.once.yarn.restful.url=http://127.0.0.1:8088 diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala index 762f008abc..e82517eeeb 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala @@ -17,23 +17,21 @@ package org.apache.linkis.jobhistory.util +import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.time.DateFormatUtils import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV import org.apache.linkis.common.io.FsPath import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.governance.common.entity.job.{JobRequest, SubJobDetail} -import org.apache.linkis.jobhistory.conf.JobhistoryConfiguration import org.apache.linkis.jobhistory.entity.JobHistory import org.apache.linkis.storage.FSFactory import org.apache.linkis.storage.fs.FileSystem import org.apache.linkis.storage.utils.{FileSystemUtils, StorageUtils} -import org.apache.commons.io.IOUtils -import org.apache.commons.lang3.time.DateFormatUtils - import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream} import java.text.SimpleDateFormat -import java.util -import java.util.{Arrays, Date} +import java.util.Date import java.util.regex.Pattern object QueryUtils extends Logging { @@ -44,7 +42,6 @@ object QueryUtils extends Logging { private val CODE_STORE_PREFIX_VIEW_FS = CommonVars("wds.linkis.query.store.prefix.viewfs", "hdfs:///apps-data/") - private val IS_VIEW_FS_ENV = CommonVars("wds.linkis.env.is.viewfs", true) private val CODE_STORE_SUFFIX = CommonVars("wds.linkis.query.store.suffix", "") private val CODE_STORE_LENGTH = CommonVars("wds.linkis.query.code.store.length", 50000) private val CHARSET = "utf-8" diff --git a/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/udf/entity/PythonModuleInfoVO.java b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/udf/entity/PythonModuleInfoVO.java new file mode 100644 index 0000000000..1c6a2af99a --- /dev/null +++ b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/udf/entity/PythonModuleInfoVO.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.udf.entity; + +import java.sql.Timestamp; + +/** PythonModuleInfo实体类,用于表示Python模块包信息。 这个类包含了模块的详细信息,如名称、描述、路径、引擎类型、加载状态、过期状态等。 */ +public class PythonModuleInfoVO { + // 自增id,用于唯一标识每一个模块 + private Long id; + + // Python模块名称 + private String name; + + // Python模块描述 + private String description; + + // HDFS路径,存储模块的物理位置 + private String path; + + // 引擎类型,例如:python, spark 或 all + private String engineType; + + // 创建用户,记录创建模块的用户信息 + private String createUser; + + // 修改用户,记录最后修改模块的用户信息 + private String updateUser; + + // 是否加载,0-未加载,1-已加载 + private boolean isLoad; + + // 是否过期,0-未过期,1-已过期 + private Boolean isExpire; + + // 创建时间,记录模块创建的时间 + private Timestamp createTime; + + // 修改时间,记录模块最后修改的时间 + private Timestamp updateTime; + + // 默认构造函数 + public PythonModuleInfoVO() {} + + // 具有所有参数的构造函数 + public PythonModuleInfoVO( + Long id, + String name, + String description, + String path, + String engineType, + String createUser, + String updateUser, + boolean isLoad, + Boolean isExpire, + Timestamp createTime, + Timestamp updateTime) { + this.id = id; + this.name = name; + this.description = description; + this.path = path; + this.engineType = engineType; + this.createUser = createUser; + this.updateUser = updateUser; + this.isLoad = isLoad; + this.isExpire = isExpire; + this.createTime = createTime; + this.updateTime = updateTime; + } + + // Getter和Setter方法 + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public String getEngineType() { + return engineType; + } + + public void setEngineType(String engineType) { + this.engineType = engineType; + } + + public String getCreateUser() { + return createUser; + } + + public void setCreateUser(String createUser) { + this.createUser = createUser; + } + + public String getUpdateUser() { + return updateUser; + } + + public void setUpdateUser(String updateUser) { + this.updateUser = updateUser; + } + + public boolean isLoad() { + return isLoad; + } + + public void setLoad(boolean isLoad) { + this.isLoad = isLoad; + } + + public Boolean isExpire() { + return isExpire; + } + + public void setExpire(Boolean isExpire) { + this.isExpire = isExpire; + } + + public Timestamp getCreateTime() { + return createTime; + } + + public void setCreateTime(Timestamp createTime) { + this.createTime = createTime; + } + + public Timestamp getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Timestamp updateTime) { + this.updateTime = updateTime; + } + + // 重写toString方法,用于调试和日志记录 + @Override + public String toString() { + return "PythonModuleInfo{" + + "id=" + + id + + ", name='" + + name + + '\'' + + ", description='" + + description + + '\'' + + ", path='" + + path + + '\'' + + ", engineType='" + + engineType + + '\'' + + ", createUser='" + + createUser + + '\'' + + ", updateUser='" + + updateUser + + '\'' + + ", isLoad=" + + isLoad + + ", isExpire=" + + isExpire + + ", createTime=" + + createTime + + ", updateTime=" + + updateTime + + '}'; + } +} diff --git a/linkis-public-enhancements/linkis-pes-common/src/main/scala/org/apache/linkis/udf/api/rpc/RequestPythonModuleProtocol.scala b/linkis-public-enhancements/linkis-pes-common/src/main/scala/org/apache/linkis/udf/api/rpc/RequestPythonModuleProtocol.scala new file mode 100644 index 0000000000..b581d86de5 --- /dev/null +++ b/linkis-public-enhancements/linkis-pes-common/src/main/scala/org/apache/linkis/udf/api/rpc/RequestPythonModuleProtocol.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.udf.api.rpc + +import org.apache.linkis.protocol.{CacheableProtocol, RetryableProtocol} + +trait PythonModuleProtocol + +case class RequestPythonModuleProtocol(userName: String, engineType: String) + extends RetryableProtocol + with CacheableProtocol + with PythonModuleProtocol diff --git a/linkis-public-enhancements/linkis-pes-common/src/main/scala/org/apache/linkis/udf/api/rpc/ResponsePythonModuleProtocol.scala b/linkis-public-enhancements/linkis-pes-common/src/main/scala/org/apache/linkis/udf/api/rpc/ResponsePythonModuleProtocol.scala new file mode 100644 index 0000000000..4ff5c0f8db --- /dev/null +++ b/linkis-public-enhancements/linkis-pes-common/src/main/scala/org/apache/linkis/udf/api/rpc/ResponsePythonModuleProtocol.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.udf.api.rpc + +import org.apache.linkis.udf.entity.PythonModuleInfoVO + +import scala.collection.JavaConverters._ + +class ResponsePythonModuleProtocol(val pythonModules: java.util.List[PythonModuleInfoVO]) + extends PythonModuleProtocol { + + // 如果PythonModuleProtocol需要实现某些方法,你可以在这里实现或覆盖它们 + // 例如,下面是一个假设的示例,展示如何可能实现或覆盖一个方法 + def getModulesInfo(): java.util.List[PythonModuleInfoVO] = { + pythonModules + } + +}