Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev 1.9.0 bug fix #629

Merged
merged 5 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ class StarrocksTimeExceedAlterSender extends Observer with Logging {
} else {
val jobHistory = a.asInstanceOf[JobHistory]
val timeValue =
HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, "linkis.jdbc.task.timeout.alter.time")
HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, "linkis.jdbc.task.timeout.alert.time")
val userValue =
HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, "linkis.jdbc.task.timeout.alter.user")
HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, "linkis.jdbc.task.timeout.alert.user")
var levelValue =
HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, "linkis.jdbc.task.timeout.alter.level")
HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, "linkis.jdbc.task.timeout.alert.level")
if (StringUtils.isNotBlank(timeValue) && StringUtils.isNotBlank(userValue)) {
val replaceParm: util.HashMap[String, String] = new util.HashMap[String, String]
replaceParm.put("$id", String.valueOf(jobHistory.getId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class StarrocksTimeExceedRule(hitObserver: Observer)
MapUtils.getString(datasourceTypeMap, "name", "").toLowerCase()
// 获取管理台配置需要使用的数据源类型(默认starrocks)
var datasourceType = HttpsUntils
.getJDBCConf(job.getSubmitUser, "linkis.jdbc.task.timeout.alter.datasource.type")
.getJDBCConf(job.getSubmitUser, "linkis.jdbc.task.timeout.alert.datasource.type")
.toLowerCase()
if (StringUtils.isBlank(datasourceType)) datasourceType = "starrocks"
if (datasourceType.contains(jobDatasourceType)) {
Expand All @@ -79,7 +79,7 @@ class StarrocksTimeExceedRule(hitObserver: Observer)
val timeValue =
HttpsUntils.getJDBCConf(
job.getSubmitUser,
"linkis.jdbc.task.timeout.alter.time"
"linkis.jdbc.task.timeout.alert.time"
)
if (StringUtils.isNotBlank(timeValue)) {
val timeoutInSeconds = timeValue.toDouble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class WorkspaceExceptionManager {
"80038",
"The name directory {0} specified by PKG-INFO does not exist. Please confirm that the {0} specified by PKG-INFO in the package matches the actual folder name (PKG-INFO指定Name目录{0}不存在,请确认包中PKG-INFO指定{0}和实际文件夹名称一致)");
put("80039", "File upload failed, error message: {0} (文件上传失败,错误信息:{0})");
put("80040", "{0} file not found in the archive ({0}文件不存在,请确认包中包含{0}文件)");
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,8 +1444,7 @@ public Message pythonUpload(
return Message.error("文件名称不能为空");
}
// 获取文件名称
String fileNameSuffix = fileName.substring(0, fileName.lastIndexOf("."));
if (!fileNameSuffix.matches("^[a-zA-Z][a-zA-Z0-9_.-]{0,49}$")) {
if (!fileName.matches("^[a-zA-Z][a-zA-Z0-9_.-]{0,49}$")) {
return Message.error("模块名称错误,仅支持数字字母下划线,且以字母开头,长度最大50");
}

Expand Down Expand Up @@ -1485,7 +1484,7 @@ public Message pythonUpload(
}

// 构建新的文件路径
String newPath = fsPath.getPath() + "/" + file.getOriginalFilename();
String newPath = fsPath.getPath() + FsPath.SEPARATOR + file.getOriginalFilename();
// 上传文件,tar包需要单独解压处理
if (!file.getOriginalFilename().endsWith(".tar.gz")) {
FsPath fsPathNew = new FsPath(newPath);
Expand All @@ -1504,15 +1503,19 @@ public Message pythonUpload(
return Message.error("文件上传失败:PKG-INFO 文件不存在");
}
is = FilesystemUtils.getZipInputStreamByTarInputStream(file, packageName);
newPath = fsPath.getPath() + FsPath.SEPARATOR + packageName + FsPath.CUR_DIR + "zip";
newPath = fsPath.getPath() + FsPath.SEPARATOR + fileName.replace(".tar.gz", ".zip");
FsPath fsPathNew = new FsPath(newPath);
outputStream = fileSystem.write(fsPathNew, true);
IOUtils.copy(is, outputStream);
} catch (IOException e) {
} catch (Exception e) {
return Message.error("文件上传失败:" + e.getMessage());
} finally {
outputStream.close();
is.close();
if (outputStream != null) {
IOUtils.closeQuietly(outputStream);
}
if (is != null) {
IOUtils.closeQuietly(is);
}
}
}
// 返回成功消息并包含文件地址
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.web.multipart.MultipartFile;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
Expand Down Expand Up @@ -84,36 +85,24 @@ public static void traverseFolder(FsPath fsPath, FileSystem fileSystem, Stack<Fs
* @throws IOException 如果文件读取失败。
*/
public static String findPackageName(InputStream inputStream) throws IOException {
int findPkgInfo = 0;
try (TarArchiveInputStream tarInput =
new TarArchiveInputStream(new GzipCompressorInputStream(inputStream))) {
TarArchiveEntry entry;
while ((entry = tarInput.getNextTarEntry()) != null) {
if (entry.getName().endsWith("PKG-INFO")) {
return readPackageName(tarInput);
findPkgInfo = 1;
String pkgInfoContent = IOUtils.toString(tarInput, StandardCharsets.UTF_8);
return pkgInfoContent.split("Name: ")[1].split("\n")[0].trim();
}
}
} catch (Exception e) {
throw WorkspaceExceptionManager.createException(80039, e.getMessage());
}
return null;
}

/**
* 从 tar.gz 文件中读取包名。
*
* @param tarInputStream tar.gz 文件的输入流。
* @return 包名。
* @throws IOException 如果文件读取失败。
*/
private static String readPackageName(TarArchiveInputStream tarInputStream) throws IOException {
StringBuilder content = new StringBuilder();
byte[] buffer = new byte[1024];
int length;
while ((length = tarInputStream.read(buffer)) != -1) {
content.append(new String(buffer, 0, length));
if (findPkgInfo == 0) {
throw WorkspaceExceptionManager.createException(80040, "PKG-INFO");
}
String pkgInfoContent = content.toString();
return pkgInfoContent.split("Name: ")[1].split("\n")[0].trim();
return null;
}

/**
Expand All @@ -129,9 +118,15 @@ private static String getRootPath(InputStream inputStream, String folder) throws
new TarArchiveInputStream(new GzipCompressorInputStream(inputStream))) {
TarArchiveEntry entry;
while ((entry = tarInput.getNextTarEntry()) != null) {
if (entry.isDirectory() && entry.getName().endsWith("/" + folder + "/")) {
if (entry.isDirectory()
&& entry.getName().endsWith(FsPath.SEPARATOR + folder + FsPath.SEPARATOR)) {
return entry.getName();
}
if (entry.getName().contains(FsPath.SEPARATOR + folder + FsPath.SEPARATOR)) {
String delimiter = FsPath.SEPARATOR + folder + FsPath.SEPARATOR;
int delimiterIndex = entry.getName().indexOf(delimiter);
return entry.getName().substring(0, delimiterIndex + delimiter.length());
}
}
} catch (Exception e) {
throw WorkspaceExceptionManager.createException(80039, e.getMessage());
Expand All @@ -157,7 +152,8 @@ private static InputStream createZipFile(InputStream inputStream, String folder,
try {
TarArchiveEntry entry;
while ((entry = tarInput.getNextTarEntry()) != null) {
if (!entry.isDirectory() && entry.getName().contains("/" + folder + "/")) {
if (!entry.isDirectory()
&& entry.getName().contains(FsPath.SEPARATOR + folder + FsPath.SEPARATOR)) {
// \dist\py_mysql-1.0.tar\py_mysql-1.0\py_mysql\lib\__init__.py
ZipEntry zipEntry = new ZipEntry(entry.getName().substring(rootPath.length()));
zos.putNextEntry(zipEntry);
Expand Down Expand Up @@ -232,26 +228,26 @@ public static List<String> getInstallRequestPythonModules(MultipartFile file) th
List<String> modules = new ArrayList<>();
String originalFilename = file.getOriginalFilename();
if (StringUtils.isNotBlank(originalFilename) && originalFilename.endsWith(".tar.gz")) {
int findSetup = 0;
// 读取 setup.py 文件的内容,并使用正则表达式提取 install_requires 字段。
// 解析 install_requires 字段中的依赖包信息
try (TarArchiveInputStream tarInput =
new TarArchiveInputStream(new GzipCompressorInputStream(file.getInputStream()))) {
TarArchiveEntry entry;
while ((entry = tarInput.getNextTarEntry()) != null) {
if (entry.getName().endsWith("setup.py")) {
StringBuilder content = new StringBuilder();
byte[] buffer = new byte[1024];
int length;
while ((length = tarInput.read(buffer)) != -1) {
content.append(new String(buffer, 0, length));
}
modules = extractDependencies(content.toString());
findSetup = 1;
String content = IOUtils.toString(tarInput, StandardCharsets.UTF_8);
modules = extractDependencies(content);
break;
}
}
} catch (Exception e) {
throw WorkspaceExceptionManager.createException(80039, e.getMessage());
}
if (findSetup == 0) {
throw WorkspaceExceptionManager.createException(80040, "setup.py");
}
}
return modules;
}
Expand Down
Loading