Skip to content

Commit

Permalink
Dev 1.7.1 add python module load hook (#584)
Browse files Browse the repository at this point in the history
* ai generate code init

* 人工修改代码

* 人工修改提交
  • Loading branch information
aiceflower authored Aug 24, 2024
1 parent 780a2e3 commit 934b205
Show file tree
Hide file tree
Showing 9 changed files with 537 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ object Configuration extends Logging {

val VARIABLE_OPERATION: Boolean = CommonVars("wds.linkis.variable.operation", false).getValue

val IS_VIEW_FS_ENV = CommonVars("wds.linkis.env.is.viewfs", true)

val ERROR_MSG_TIP =
CommonVars(
"linkis.jobhistory.error.msg.tip",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconn.computation.executor.hook

import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
import org.apache.linkis.engineconn.core.engineconn.EngineConnManager
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
import org.apache.linkis.rpc.Sender
import org.apache.linkis.udf.UDFClientConfiguration
import org.apache.linkis.udf.api.rpc.{RequestPythonModuleProtocol, ResponsePythonModuleProtocol}
import org.apache.linkis.udf.entity.PythonModuleInfoVO
import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV

import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* The PythonModuleLoad class is designed to load Python modules into the execution environment
* dynamically. This class is not an extension of UDFLoad, but shares a similar philosophy of
* handling dynamic module loading based on user preferences and system configurations.
*/
abstract class PythonModuleLoad extends Logging {

/** Abstract properties to be defined by the subclass */
protected val engineType: String
protected val runType: RunType

protected def getEngineType(): String = engineType

protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String

private def queryPythonModuleRpc(
userName: String,
engineType: String
): java.util.List[PythonModuleInfoVO] = {
val infoList = Sender
.getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue)
.ask(RequestPythonModuleProtocol(userName, engineType))
.asInstanceOf[ResponsePythonModuleProtocol]
.getModulesInfo()
infoList
}

protected def getLoadPythonModuleCode: Array[String] = {
val engineCreationContext =
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext
val user = engineCreationContext.getUser

var infoList: util.List[PythonModuleInfoVO] =
Utils.tryAndWarn(queryPythonModuleRpc(user, getEngineType()))
if (infoList == null) {
logger.info("rpc get info is empty.")
infoList = new util.ArrayList[PythonModuleInfoVO]()
}

if (infoList.isEmpty) {
val pmi = new PythonModuleInfoVO()
pmi.setPath("viewfs:///apps-data/hadoop/hello_world.py")
infoList.add(pmi)

val pmi1 = new PythonModuleInfoVO()
pmi1.setPath("viewfs:///apps-data/hadoop/redis2.zip")
infoList.add(pmi1)
}

// 替换Viewfs
if (IS_VIEW_FS_ENV.getValue) {
infoList.asScala.foreach { info =>
val path = info.getPath
info.setPath(path.replace("hdfs://", "viewfs://"))
}
}

logger.info(s"${user} load python modules: ")
infoList.asScala.foreach(l => logger.info(s"module name:${l.getName}, path:${l.getPath}\n"))

// 创建加载code
val codes: mutable.Buffer[String] = infoList.asScala
.filter { info => StringUtils.isNotEmpty(info.getPath) }
.map(constructCode)
// 打印codes
val str: String = codes.mkString("\n")
logger.info(s"python codes: $str")
codes.toArray
}

private def executeFunctionCode(codes: Array[String], executor: ComputationExecutor): Unit = {
if (null == codes || null == executor) {
return
}
codes.foreach { code =>
logger.info("Submit function registration to engine, code: " + code)
Utils.tryCatch(executor.executeLine(new EngineExecutionContext(executor), code)) {
t: Throwable =>
logger.error("Failed to load python module", t)
null
}
}
}

/**
* Generate and execute the code necessary for loading Python modules.
*
* @param executor
* An object capable of executing code in the current engine context.
*/
protected def loadPythonModules(labels: Array[Label[_]]): Unit = {

val codes = getLoadPythonModuleCode
logger.info(s"codes length: ${codes.length}")
if (null != codes && codes.nonEmpty) {
val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
if (executor != null) {
val className = executor.getClass.getName
logger.info(s"executor class: ${className}")
} else {
logger.error(s"Failed to load python, executor is null")
}

executor match {
case computationExecutor: ComputationExecutor =>
executeFunctionCode(codes, computationExecutor)
case _ =>
}
}
logger.info(s"Successful to load python, engineType : ${engineType}")
}

}

// Note: The actual implementation of methods like `executeFunctionCode` and `construct
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconn.computation.executor.hook

import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.common.hook.EngineConnHook
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel

abstract class PythonModuleLoadEngineConnHook
extends PythonModuleLoad
with EngineConnHook
with Logging {

override def afterExecutionExecute(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {
Utils.tryAndWarnMsg{
val codeLanguageLabel = new CodeLanguageLabel
codeLanguageLabel.setCodeType(runType.toString)
logger.info(s"engineType: ${engineType}")
val labels = Array[Label[_]](codeLanguageLabel)
loadPythonModules(labels)
}(s"Failed to load Python Modules: ${engineType}")

}

override def afterEngineServerStartFailed(
engineCreationContext: EngineCreationContext,
throwable: Throwable
): Unit = {
logger.error(s"Failed to start Engine Server: ${throwable.getMessage}", throwable)
}

override def beforeCreateEngineConn(engineCreationContext: EngineCreationContext): Unit = {
logger.info("Preparing to load Python Module...")
}

override def beforeExecutionExecute(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {
logger.info(s"Before executing command on load Python Module.")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconn.computation.executor.hook

import org.apache.linkis.manager.label.entity.engine.RunType
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
import org.apache.linkis.udf.entity.PythonModuleInfoVO

/**
* 定义一个用于Spark引擎的Python模块加载与执行挂钩的类
*/
class PythonSparkEngineHook extends PythonModuleLoadEngineConnHook {

// 设置engineType属性为"spark",表示此挂钩适用于Spark数据处理引擎
override val engineType: String = "spark"

// 设置runType属性为RunType.PYSPARK,表示此挂钩将执行PySpark类型的代码
override protected val runType: RunType = RunType.PYSPARK

// 重写constructCode方法,用于根据Python模块信息构造加载模块的代码
override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String = {
// 使用pythonModuleInfo的path属性,构造SparkContext.addPyFile的命令字符串
// 这个命令在PySpark环境中将模块文件添加到所有worker上,以便在代码中可以使用
val path: String = pythonModuleInfo.getPath
val loadCode = s"sc.addPyFile('${path}')"
logger.info(s"pythonLoadCode: ${loadCode}")
loadCode
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ wds.linkis.engineconn.debug.enable=true

wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.spark.SparkEngineConnPlugin

wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook
wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook,org.apache.linkis.engineconn.computation.executor.hook.PythonSparkEngineHook

linkis.spark.once.yarn.restful.url=http://127.0.0.1:8088

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,21 @@

package org.apache.linkis.jobhistory.util

import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.time.DateFormatUtils
import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.governance.common.entity.job.{JobRequest, SubJobDetail}
import org.apache.linkis.jobhistory.conf.JobhistoryConfiguration
import org.apache.linkis.jobhistory.entity.JobHistory
import org.apache.linkis.storage.FSFactory
import org.apache.linkis.storage.fs.FileSystem
import org.apache.linkis.storage.utils.{FileSystemUtils, StorageUtils}

import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.time.DateFormatUtils

import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream}
import java.text.SimpleDateFormat
import java.util
import java.util.{Arrays, Date}
import java.util.Date
import java.util.regex.Pattern

object QueryUtils extends Logging {
Expand All @@ -44,7 +42,6 @@ object QueryUtils extends Logging {
private val CODE_STORE_PREFIX_VIEW_FS =
CommonVars("wds.linkis.query.store.prefix.viewfs", "hdfs:///apps-data/")

private val IS_VIEW_FS_ENV = CommonVars("wds.linkis.env.is.viewfs", true)
private val CODE_STORE_SUFFIX = CommonVars("wds.linkis.query.store.suffix", "")
private val CODE_STORE_LENGTH = CommonVars("wds.linkis.query.code.store.length", 50000)
private val CHARSET = "utf-8"
Expand Down
Loading

0 comments on commit 934b205

Please sign in to comment.