forked from apache/linkis
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Dev 1.7.1 add python module load hook (#584) * ai generate code init * 人工修改代码 * 人工修改提交 * 【1.7.1】python物料管理 (#583) * 代码由AI自动生成接口和AI插件生成 * 代码由AI自动生成接口和AI插件生成 * 代码由人工修改 --------- Co-authored-by: “v_kkhuang” <“[email protected]”> * 人工修改 code format and add rpc func * fix bug * Dev 1.7.0 python udf manager (#586) * 代码由AI自动生成接口和AI插件生成 * 代码由AI自动生成接口和AI插件生成 * 代码由人工修改 * bug fix * bug fix --------- Co-authored-by: “v_kkhuang” <“[email protected]”> * fix bug * code format * 使用ai生成单元测试案例 * 人工修改单元测试 * 人工修改单元测试 * Dev 1.7.1 webank test (#587) * 使用ai生成单元测试案例 * 人工修改单元测试 * 人工修改单元测试 * add instance info * update file permission (#590) Co-authored-by: “v_kkhuang” <“[email protected]”> * chore: 1.7.1 (#591) * chore: 1.7.1 * upd: config file --------- Co-authored-by: v-kkhuang <[email protected]> Co-authored-by: “v_kkhuang” <“[email protected]”> Co-authored-by: Yonghao Mei <[email protected]>
- Loading branch information
1 parent
722af32
commit d221ce5
Showing
89 changed files
with
4,853 additions
and
139 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
161 changes: 161 additions & 0 deletions
161
.../main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.linkis.engineconn.computation.executor.hook | ||
|
||
import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV | ||
import org.apache.linkis.common.utils.{Logging, Utils} | ||
import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf | ||
import org.apache.linkis.engineconn.computation.executor.execute.{ | ||
ComputationExecutor, | ||
EngineExecutionContext | ||
} | ||
import org.apache.linkis.engineconn.core.engineconn.EngineConnManager | ||
import org.apache.linkis.engineconn.core.executor.ExecutorManager | ||
import org.apache.linkis.manager.label.entity.Label | ||
import org.apache.linkis.manager.label.entity.engine.RunType.RunType | ||
import org.apache.linkis.rpc.Sender | ||
import org.apache.linkis.udf.UDFClientConfiguration | ||
import org.apache.linkis.udf.api.rpc.{RequestPythonModuleProtocol, ResponsePythonModuleProtocol} | ||
import org.apache.linkis.udf.entity.PythonModuleInfoVO | ||
|
||
import org.apache.commons.lang3.StringUtils | ||
|
||
import java.util | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable | ||
|
||
/** | ||
* The PythonModuleLoad class is designed to load Python modules into the execution environment | ||
* dynamically. This class is not an extension of UDFLoad, but shares a similar philosophy of | ||
* handling dynamic module loading based on user preferences and system configurations. | ||
*/ | ||
abstract class PythonModuleLoad extends Logging { | ||
|
||
/** Abstract properties to be defined by the subclass */ | ||
protected val engineType: String | ||
protected val runType: RunType | ||
|
||
protected def getEngineType(): String = engineType | ||
|
||
protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String | ||
|
||
private def queryPythonModuleRpc( | ||
userName: String, | ||
engineType: String | ||
): java.util.List[PythonModuleInfoVO] = { | ||
val infoList = Sender | ||
.getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue) | ||
.ask(RequestPythonModuleProtocol(userName, engineType)) | ||
.asInstanceOf[ResponsePythonModuleProtocol] | ||
.getModulesInfo() | ||
infoList | ||
} | ||
|
||
protected def getLoadPythonModuleCode: Array[String] = { | ||
val engineCreationContext = | ||
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext | ||
val user = engineCreationContext.getUser | ||
|
||
var infoList: util.List[PythonModuleInfoVO] = | ||
Utils.tryAndWarn(queryPythonModuleRpc(user, getEngineType())) | ||
if (infoList == null) { | ||
logger.info("rpc get info is empty.") | ||
infoList = new util.ArrayList[PythonModuleInfoVO]() | ||
} | ||
|
||
// 替换Viewfs | ||
if (IS_VIEW_FS_ENV.getValue) { | ||
infoList.asScala.foreach { info => | ||
val path = info.getPath | ||
logger.info(s"python path: ${path}") | ||
if (path.startsWith("hdfs") || path.startsWith("viewfs")) { | ||
info.setPath(path.replace("hdfs://", "viewfs://")) | ||
} else { | ||
info.setPath("viewfs://" + path) | ||
} | ||
} | ||
} else { | ||
|
||
infoList.asScala.foreach { info => | ||
val path = info.getPath | ||
logger.info(s"hdfs python path: ${path}") | ||
if (!path.startsWith("hdfs")) { | ||
info.setPath("hdfs://" + path) | ||
} | ||
} | ||
} | ||
|
||
logger.info(s"${user} load python modules: ") | ||
infoList.asScala.foreach(l => logger.info(s"module name:${l.getName}, path:${l.getPath}\n")) | ||
|
||
// 创建加载code | ||
val codes: mutable.Buffer[String] = infoList.asScala | ||
.filter { info => StringUtils.isNotEmpty(info.getPath) } | ||
.map(constructCode) | ||
// 打印codes | ||
val str: String = codes.mkString("\n") | ||
logger.info(s"python codes: $str") | ||
codes.toArray | ||
} | ||
|
||
private def executeFunctionCode(codes: Array[String], executor: ComputationExecutor): Unit = { | ||
if (null == codes || null == executor) { | ||
return | ||
} | ||
codes.foreach { code => | ||
logger.info("Submit function registration to engine, code: " + code) | ||
Utils.tryCatch(executor.executeLine(new EngineExecutionContext(executor), code)) { | ||
t: Throwable => | ||
logger.error("Failed to load python module", t) | ||
null | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Generate and execute the code necessary for loading Python modules. | ||
* | ||
* @param executor | ||
* An object capable of executing code in the current engine context. | ||
*/ | ||
protected def loadPythonModules(labels: Array[Label[_]]): Unit = { | ||
|
||
val codes = getLoadPythonModuleCode | ||
logger.info(s"codes length: ${codes.length}") | ||
if (null != codes && codes.nonEmpty) { | ||
val executor = ExecutorManager.getInstance.getExecutorByLabels(labels) | ||
if (executor != null) { | ||
val className = executor.getClass.getName | ||
logger.info(s"executor class: ${className}") | ||
} else { | ||
logger.error(s"Failed to load python, executor is null") | ||
} | ||
|
||
executor match { | ||
case computationExecutor: ComputationExecutor => | ||
executeFunctionCode(codes, computationExecutor) | ||
case _ => | ||
} | ||
} | ||
logger.info(s"Successful to load python, engineType : ${engineType}") | ||
} | ||
|
||
} | ||
|
||
// Note: The actual implementation of methods like `executeFunctionCode` and `construct |
64 changes: 64 additions & 0 deletions
64
...g/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.linkis.engineconn.computation.executor.hook | ||
|
||
import org.apache.linkis.common.utils.{Logging, Utils} | ||
import org.apache.linkis.engineconn.common.creation.EngineCreationContext | ||
import org.apache.linkis.engineconn.common.engineconn.EngineConn | ||
import org.apache.linkis.engineconn.common.hook.EngineConnHook | ||
import org.apache.linkis.manager.label.entity.Label | ||
import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel | ||
|
||
abstract class PythonModuleLoadEngineConnHook | ||
extends PythonModuleLoad | ||
with EngineConnHook | ||
with Logging { | ||
|
||
override def afterExecutionExecute( | ||
engineCreationContext: EngineCreationContext, | ||
engineConn: EngineConn | ||
): Unit = { | ||
Utils.tryAndWarnMsg { | ||
val codeLanguageLabel = new CodeLanguageLabel | ||
codeLanguageLabel.setCodeType(runType.toString) | ||
logger.info(s"engineType: ${engineType}") | ||
val labels = Array[Label[_]](codeLanguageLabel) | ||
loadPythonModules(labels) | ||
}(s"Failed to load Python Modules: ${engineType}") | ||
|
||
} | ||
|
||
override def afterEngineServerStartFailed( | ||
engineCreationContext: EngineCreationContext, | ||
throwable: Throwable | ||
): Unit = { | ||
logger.error(s"Failed to start Engine Server: ${throwable.getMessage}", throwable) | ||
} | ||
|
||
override def beforeCreateEngineConn(engineCreationContext: EngineCreationContext): Unit = { | ||
logger.info("Preparing to load Python Module...") | ||
} | ||
|
||
override def beforeExecutionExecute( | ||
engineCreationContext: EngineCreationContext, | ||
engineConn: EngineConn | ||
): Unit = { | ||
logger.info(s"Before executing command on load Python Module.") | ||
} | ||
|
||
} |
45 changes: 45 additions & 0 deletions
45
.../scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.linkis.engineconn.computation.executor.hook | ||
|
||
import org.apache.linkis.manager.label.entity.engine.RunType | ||
import org.apache.linkis.manager.label.entity.engine.RunType.RunType | ||
import org.apache.linkis.udf.entity.PythonModuleInfoVO | ||
|
||
/** | ||
* 定义一个用于Spark引擎的Python模块加载与执行挂钩的类 | ||
*/ | ||
class PythonSparkEngineHook extends PythonModuleLoadEngineConnHook { | ||
|
||
// 设置engineType属性为"spark",表示此挂钩适用于Spark数据处理引擎 | ||
override val engineType: String = "spark" | ||
|
||
// 设置runType属性为RunType.PYSPARK,表示此挂钩将执行PySpark类型的代码 | ||
override protected val runType: RunType = RunType.PYSPARK | ||
|
||
// 重写constructCode方法,用于根据Python模块信息构造加载模块的代码 | ||
override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String = { | ||
// 使用pythonModuleInfo的path属性,构造SparkContext.addPyFile的命令字符串 | ||
// 这个命令在PySpark环境中将模块文件添加到所有worker上,以便在代码中可以使用 | ||
val path: String = pythonModuleInfo.getPath | ||
val loadCode = s"sc.addPyFile('${path}')" | ||
logger.info(s"pythonLoadCode: ${loadCode}") | ||
loadCode | ||
} | ||
|
||
} |
82 changes: 82 additions & 0 deletions
82
...ache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHookTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.linkis.engineconn.computation.executor.hook | ||
|
||
import org.apache.linkis.engineconn.common.creation.{DefaultEngineCreationContext, EngineCreationContext} | ||
import org.apache.linkis.engineconn.common.engineconn.DefaultEngineConn | ||
import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel | ||
import org.junit.jupiter.api.Test | ||
import org.mockito.Mockito.{mock, verify, when} | ||
|
||
|
||
// 单元测试案例 | ||
class PythonModuleLoadEngineConnHookTest { | ||
|
||
@Test | ||
def testAfterExecutionExecute(): Unit = { | ||
// 创建模拟对象 | ||
val mockEngineCreationContext = new DefaultEngineCreationContext | ||
val mockEngineConn = mock[DefaultEngineConn] | ||
val hook = new PythonSparkEngineHook | ||
|
||
// 设置模拟行为 | ||
var labels = new CodeLanguageLabel | ||
labels.setCodeType("spark") | ||
|
||
// 执行测试方法 | ||
hook.afterExecutionExecute(mockEngineCreationContext, mockEngineConn) | ||
|
||
} | ||
|
||
@Test | ||
def testAfterEngineServerStartFailed(): Unit = { | ||
// 创建模拟对象 | ||
val mockEngineCreationContext = mock[EngineCreationContext] | ||
val mockThrowable = mock[Throwable] | ||
val hook = new PythonSparkEngineHook | ||
|
||
// 设置模拟行为 | ||
var labels = new CodeLanguageLabel | ||
labels.setCodeType("spark") | ||
|
||
// 执行测试方法 | ||
hook.afterEngineServerStartFailed(mockEngineCreationContext, mockThrowable) | ||
|
||
} | ||
|
||
@Test | ||
def testBeforeCreateEngineConn(): Unit = { | ||
// 创建模拟对象 | ||
|
||
// 验证调用 | ||
|
||
} | ||
|
||
@Test | ||
def testBeforeExecutionExecute(): Unit = { | ||
// 创建模拟对象 | ||
val mockEngineCreationContext = mock[EngineCreationContext] | ||
val mockEngineConn = mock[DefaultEngineConn] | ||
val hook = new PythonSparkEngineHook | ||
|
||
// 执行测试方法 | ||
hook.beforeExecutionExecute(mockEngineCreationContext, mockEngineConn) | ||
|
||
|
||
} | ||
} |
Oops, something went wrong.