Skip to content

Commit

Permalink
Merge branch 'master' into fix-workDirectory-mountPoints
Browse files Browse the repository at this point in the history
  • Loading branch information
OYX-1 authored Aug 14, 2024
2 parents abb16dd + e776999 commit 2491ed0
Show file tree
Hide file tree
Showing 57 changed files with 1,136 additions and 175 deletions.
6 changes: 6 additions & 0 deletions .changeset/bright-cameras-matter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@scow/lib-web": patch
"@scow/docs": patch
---

UI扩展增加导航栏链接自动刷新功能
8 changes: 8 additions & 0 deletions .changeset/slow-falcons-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@scow/portal-server": patch
"@scow/scowd-protos": patch
"@scow/lib-scowd": patch
---

scowd 新增 app service 和 GetAppLastSubmission 接口

7 changes: 7 additions & 0 deletions .changeset/tasty-files-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@scow/portal-server": patch
"@scow/portal-web": patch
"@scow/ai": patch
---

ai 和 hpc 在提交作业和应用前检查一下是否重名
6 changes: 6 additions & 0 deletions .changeset/tender-spoons-doubt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@scow/lib-web": patch
"@scow/docs": patch
---

UI 扩展返回的导航项允许指定 navs[].hideIfNotActive 属性
11 changes: 11 additions & 0 deletions .changeset/warm-gifts-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@scow/portal-server": patch
"@scow/lib-operation-log": patch
"@scow/scowd-protos": patch
"@scow/portal-web": patch
"@scow/mis-web": patch
"@scow/lib-server": patch
"@scow/grpc-api": minor
---

接入 scowd 文件分片上传
22 changes: 20 additions & 2 deletions apps/ai/src/server/trpc/route/jobs/apps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,24 @@ export const createAppSession = procedure
model, mountPoints = [], account, partition, coreCount, nodeCount, gpuCount, memory,
maxTime, workingDirectory, customAttributes, gpuType } = input;

const userId = user.identityId;
const client = getAdapterClient(clusterId);

// 检查是否存在同名的作业
const existingJobName = await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [],states: [],jobName:appJobName,
},
}).then((resp) => resp.jobs);

if (existingJobName.length) {
throw new TRPCError({
code: "CONFLICT",
message: `appJobName ${appJobName} is already existed`,
});
}

const apps = getClusterAppConfigs(clusterId);
const app = checkAppExist(apps, appId);

Expand Down Expand Up @@ -381,7 +399,7 @@ export const createAppSession = procedure
throw clusterNotFound(clusterId);
}

const userId = user.identityId;

return await sshConnect(host, userId, logger, async (ssh) => {
const homeDir = await getUserHomedir(ssh, userId, logger);

Expand Down Expand Up @@ -470,7 +488,7 @@ export const createAppSession = procedure

// 将entry.sh写入后将路径传给适配器后启动容器
await sftpWriteFile(sftp)(remoteEntryPath, entryScript);
const client = getAdapterClient(clusterId);

const reply = await asyncClientCall(client.job, "submitJob", {
userId,
jobName: appJobName,
Expand Down
18 changes: 17 additions & 1 deletion apps/ai/src/server/trpc/route/jobs/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ procedure
throw clusterNotFound(clusterId);
}

const client = getAdapterClient(clusterId);

// 检查是否存在同名的作业
const existingJobName = await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [],states: [],jobName:trainJobName,
},
}).then((resp) => resp.jobs);

if (existingJobName.length) {
throw new TRPCError({
code: "CONFLICT",
message: `trainJobName ${trainJobName} is already existed`,
});
}

const em = await forkEntityManager();
const {
datasetVersion,
Expand Down Expand Up @@ -189,7 +206,6 @@ procedure
const entryScript = command;
await sftpWriteFile(sftp)(remoteEntryPath, entryScript);

const client = getAdapterClient(clusterId);
const reply = await asyncClientCall(client.job, "submitJob", {
userId,
jobName: trainJobName,
Expand Down
2 changes: 2 additions & 0 deletions apps/mis-web/src/i18n/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,8 @@ export default {
customEvent: "Custom Operation Event",
activateCluster: "Activate Cluster",
deactivateCluster: "Deactivate Cluster",
mergeFileChunks: "Merge and upload temporary file blocks",
initMultipartUpload: "Initial multipart upload file",
},
operationDetails: {
login: "User Login",
Expand Down
2 changes: 2 additions & 0 deletions apps/mis-web/src/i18n/zh_cn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,8 @@ export default {
customEvent: "自定义操作行为",
activateCluster: "启用集群",
deactivateCluster: "停用集群",
mergeFileChunks: "合并临时文件块",
initMultipartUpload: "初始化分片上传",
},
operationDetails: {
login: "用户登录",
Expand Down
4 changes: 4 additions & 0 deletions apps/mis-web/src/models/operationLog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ export const getOperationTypeTexts = (t: OperationTextsTransType): {[key in LibO
activateCluster: t(pTypes("activateCluster")),
deactivateCluster: t(pTypes("deactivateCluster")),
customEvent: t(pTypes("customEvent")),
mergeFileChunks: t(pTypes("mergeFileChunks")),
initMultipartUpload: t(pTypes("initMultipartUpload")),
};

};
Expand Down Expand Up @@ -202,6 +204,8 @@ export const OperationCodeMap: {[key in LibOperationType]: string } = {
moveFileItem: "010506",
copyFileItem: "010507",
submitFileItemAsJob: "010508",
mergeFileChunks: "010509",
initMultipartUpload: "010510",
setJobTimeLimit: "010601",
createImage:"010701",
updateImage:"010702",
Expand Down
75 changes: 59 additions & 16 deletions apps/portal-server/src/clusterops/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import { Status } from "@grpc/grpc-js/build/src/constants";
import { AppType } from "@scow/config/build/app";
import { getPlaceholderKeys } from "@scow/lib-config/build/parse";
import { formatTime } from "@scow/lib-scheduler-adapter";
import { getAppConnectionInfoFromAdapter, getEnvVariables } from "@scow/lib-server";
import { checkJobNameExisting, errorInfo, getAppConnectionInfoFromAdapter,getEnvVariables } from "@scow/lib-server";
import { getUserHomedir,
sftpChmod, sftpExists, sftpReaddir, sftpReadFile, sftpRealPath, sftpWriteFile } from "@scow/lib-ssh";
import { DetailedError, ErrorInfo, parseErrorDetails } from "@scow/rich-error-model";
import { DetailedError, parseErrorDetails } from "@scow/rich-error-model";
import { JobInfo, SubmitJobRequest } from "@scow/scheduler-adapter-protos/build/protos/job";
import fs from "fs";
import { join } from "path";
Expand All @@ -30,6 +30,7 @@ import { portalConfig } from "src/config/portal";
import { getClusterAppConfigs, splitSbatchArgs } from "src/utils/app";
import { callOnOne } from "src/utils/clusters";
import { getIpFromProxyGateway } from "src/utils/proxy";
import { getScowdClient } from "src/utils/scowd";
import { getClusterLoginNode, sshConnect } from "src/utils/ssh";
import { displayIdToPort, getTurboVNCBinPath, parseDisplayId,
refreshPassword, refreshPasswordByProxyGateway } from "src/utils/turbovnc";
Expand Down Expand Up @@ -62,9 +63,6 @@ const VNC_SESSION_INFO = "VNC_SESSION_INFO";
const APP_LAST_SUBMISSION_INFO = "last_submission.json";
const BIN_BASH_SCRIPT_HEADER = "#!/bin/bash -l\n";

const errorInfo = (reason: string) =>
ErrorInfo.create({ domain: "", reason: reason, metadata: {} });

export const appOps = (cluster: string): AppOps => {

const host = getClusterLoginNode(cluster);
Expand All @@ -78,8 +76,36 @@ export const appOps = (cluster: string): AppOps => {
const { appId, userId, account, coreCount, nodeCount, gpuCount, memory, maxTime, proxyBasePath,
partition, qos, customAttributes, appJobName } = request;

const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined;
const jobName = appJobName;

// 检查作业名是否重复
await callOnOne(
cluster,
logger,
async (client) => {
await checkJobNameExisting(client,userId,jobName,logger);
},
).catch((e) => {
const ex = e as ServiceError;
const errors = parseErrorDetails(ex.metadata);
if (errors[0] && errors[0].$type === "google.rpc.ErrorInfo"
&& errors[0].reason === "ALREADY_EXISTS") {
throw new DetailedError({
code: Status.ALREADY_EXISTS,
message: ex.details,
details: [errorInfo("ALREADY_EXISTS")],
});
}
else {
throw new DetailedError({
code: ex.code,
message: ex.details,
details: [errorInfo("SBATCH_FAILED")],
});
}
});

const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined;

const userSbatchOptions = customAttributes.sbatchOptions
? splitSbatchArgs(customAttributes.sbatchOptions)
Expand All @@ -96,8 +122,6 @@ export const appOps = (cluster: string): AppOps => {
});
}

const jobName = appJobName;

const workingDirectory = join(portalConfig.appJobsDir, jobName);

const lastSubmissionDirectory = join(portalConfig.appLastSubmissionDir, appId);
Expand Down Expand Up @@ -247,17 +271,36 @@ export const appOps = (cluster: string): AppOps => {
getAppLastSubmission: async (requset, logger) => {
const { userId, appId } = requset;

return await sshConnect(host, userId, logger, async (ssh) => {
const file = join(portalConfig.appLastSubmissionDir, appId, APP_LAST_SUBMISSION_INFO);

const sftp = await ssh.requestSFTP();
const file = join(portalConfig.appLastSubmissionDir, appId, APP_LAST_SUBMISSION_INFO);
const clusterInfo = configClusters[cluster];
if (clusterInfo.scowd?.enabled) {
const client = getScowdClient(cluster);

if (!await sftpExists(sftp, file)) { return { lastSubmissionInfo: undefined }; }
const content = await sftpReadFile(sftp)(file);
const data = JSON.parse(content.toString()) as SubmissionInfo;
const data = await client.app.getAppLastSubmission({ userId, filePath: file });

const submitTime = !data.fileData?.submitTime ? undefined
: new Date(Number((data.fileData.submitTime.seconds * BigInt(1000))
+ BigInt(data.fileData.submitTime.nanos / 1000000)));

return { lastSubmissionInfo: data.fileData ? {
...data.fileData,
submitTime: submitTime?.toISOString(),
} : undefined };

} else {
return await sshConnect(host, userId, logger, async (ssh) => {

const sftp = await ssh.requestSFTP();

if (!await sftpExists(sftp, file)) { return { lastSubmissionInfo: undefined }; }
const content = await sftpReadFile(sftp)(file);
const data = JSON.parse(content.toString()) as SubmissionInfo;

return { lastSubmissionInfo: data };
});
}

return { lastSubmissionInfo: data };
});
},

listAppSessions: async (request, logger) => {
Expand Down
4 changes: 2 additions & 2 deletions apps/portal-server/src/clusterops/file/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ export const fileOps = (cluster: string): FileOps => {
const clusterInfo = configClusters[cluster];
if (clusterInfo.scowd?.enabled) {
const client = getScowdClient(cluster);

return {
...scowdFileServices(client),
};
} else {
const host = getClusterLoginNode(cluster);

if (!host) { throw clusterNotFound(cluster); }

return {
...sshFileServices(host),
};
Expand Down
6 changes: 3 additions & 3 deletions apps/portal-server/src/clusterops/file/scowdFile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export const scowdFileServices = (client: ScowdClient): FileOps => ({
type: fileInfo_FileTypeFromJSON(info.fileType),
mtime: info.modTime,
mode: info.mode,
size: Number(info.size),
size: Number(info.sizeByte),
};
});
return { results };
Expand All @@ -124,7 +124,7 @@ export const scowdFileServices = (client: ScowdClient): FileOps => ({

try {
const readStream = client.file.download({
userId, path, chunkSize: config.DOWNLOAD_CHUNK_SIZE,
userId, path, chunkSizeByte: config.DOWNLOAD_CHUNK_SIZE,
});

for await (const response of readStream) {
Expand Down Expand Up @@ -183,7 +183,7 @@ export const scowdFileServices = (client: ScowdClient): FileOps => ({
try {
const res = await client.file.getFileMetadata({ userId, filePath: path });

return { size: Number(res.size), type: res.type === FileType.DIR ? "dir" : "file" };
return { size: Number(res.sizeByte), type: res.type === FileType.DIR ? "dir" : "file" };

} catch (err) {
throw mapTRPCExceptionToGRPC(err);
Expand Down
Loading

0 comments on commit 2491ed0

Please sign in to comment.