Skip to content

Commit

Permalink
fix: ai和hpc在提交作业和应用前检查一下是否重名 (#1375)
Browse files Browse the repository at this point in the history
Co-authored-by: Chen Junda <[email protected]>
  • Loading branch information
OYX-1 and ddadaal authored Aug 13, 2024
1 parent eec12d8 commit e776999
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 25 deletions.
7 changes: 7 additions & 0 deletions .changeset/tasty-files-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@scow/portal-server": patch
"@scow/portal-web": patch
"@scow/ai": patch
---

ai 和 hpc 在提交作业和应用前检查一下是否重名
22 changes: 20 additions & 2 deletions apps/ai/src/server/trpc/route/jobs/apps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,24 @@ export const createAppSession = procedure
model, mountPoints = [], account, partition, coreCount, nodeCount, gpuCount, memory,
maxTime, workingDirectory, customAttributes, gpuType } = input;

const userId = user.identityId;
const client = getAdapterClient(clusterId);

// 检查是否存在同名的作业
const existingJobName = await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [],states: [],jobName:appJobName,
},
}).then((resp) => resp.jobs);

if (existingJobName.length) {
throw new TRPCError({
code: "CONFLICT",
message: `appJobName ${appJobName} is already existed`,
});
}

const apps = getClusterAppConfigs(clusterId);
const app = checkAppExist(apps, appId);

Expand Down Expand Up @@ -381,7 +399,7 @@ export const createAppSession = procedure
throw clusterNotFound(clusterId);
}

const userId = user.identityId;

return await sshConnect(host, userId, logger, async (ssh) => {
const homeDir = await getUserHomedir(ssh, userId, logger);

Expand Down Expand Up @@ -470,7 +488,7 @@ export const createAppSession = procedure

// 将entry.sh写入后将路径传给适配器后启动容器
await sftpWriteFile(sftp)(remoteEntryPath, entryScript);
const client = getAdapterClient(clusterId);

const reply = await asyncClientCall(client.job, "submitJob", {
userId,
jobName: appJobName,
Expand Down
18 changes: 17 additions & 1 deletion apps/ai/src/server/trpc/route/jobs/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ procedure
throw clusterNotFound(clusterId);
}

const client = getAdapterClient(clusterId);

// 检查是否存在同名的作业
const existingJobName = await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [],states: [],jobName:trainJobName,
},
}).then((resp) => resp.jobs);

if (existingJobName.length) {
throw new TRPCError({
code: "CONFLICT",
message: `trainJobName ${trainJobName} is already existed`,
});
}

const em = await forkEntityManager();
const {
datasetVersion,
Expand Down Expand Up @@ -189,7 +206,6 @@ procedure
const entryScript = command;
await sftpWriteFile(sftp)(remoteEntryPath, entryScript);

const client = getAdapterClient(clusterId);
const reply = await asyncClientCall(client.job, "submitJob", {
userId,
jobName: trainJobName,
Expand Down
39 changes: 31 additions & 8 deletions apps/portal-server/src/clusterops/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import { Status } from "@grpc/grpc-js/build/src/constants";
import { AppType } from "@scow/config/build/app";
import { getPlaceholderKeys } from "@scow/lib-config/build/parse";
import { formatTime } from "@scow/lib-scheduler-adapter";
import { getAppConnectionInfoFromAdapter, getEnvVariables } from "@scow/lib-server";
import { checkJobNameExisting, errorInfo, getAppConnectionInfoFromAdapter,getEnvVariables } from "@scow/lib-server";
import { getUserHomedir,
sftpChmod, sftpExists, sftpReaddir, sftpReadFile, sftpRealPath, sftpWriteFile } from "@scow/lib-ssh";
import { DetailedError, ErrorInfo, parseErrorDetails } from "@scow/rich-error-model";
import { DetailedError, parseErrorDetails } from "@scow/rich-error-model";
import { JobInfo, SubmitJobRequest } from "@scow/scheduler-adapter-protos/build/protos/job";
import fs from "fs";
import { join } from "path";
Expand Down Expand Up @@ -63,9 +63,6 @@ const VNC_SESSION_INFO = "VNC_SESSION_INFO";
const APP_LAST_SUBMISSION_INFO = "last_submission.json";
const BIN_BASH_SCRIPT_HEADER = "#!/bin/bash -l\n";

const errorInfo = (reason: string) =>
ErrorInfo.create({ domain: "", reason: reason, metadata: {} });

export const appOps = (cluster: string): AppOps => {

const host = getClusterLoginNode(cluster);
Expand All @@ -79,8 +76,36 @@ export const appOps = (cluster: string): AppOps => {
const { appId, userId, account, coreCount, nodeCount, gpuCount, memory, maxTime, proxyBasePath,
partition, qos, customAttributes, appJobName } = request;

const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined;
const jobName = appJobName;

// 检查作业名是否重复
await callOnOne(
cluster,
logger,
async (client) => {
await checkJobNameExisting(client,userId,jobName,logger);
},
).catch((e) => {
const ex = e as ServiceError;
const errors = parseErrorDetails(ex.metadata);
if (errors[0] && errors[0].$type === "google.rpc.ErrorInfo"
&& errors[0].reason === "ALREADY_EXISTS") {
throw new DetailedError({
code: Status.ALREADY_EXISTS,
message: ex.details,
details: [errorInfo("ALREADY_EXISTS")],
});
}
else {
throw new DetailedError({
code: ex.code,
message: ex.details,
details: [errorInfo("SBATCH_FAILED")],
});
}
});

const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined;

const userSbatchOptions = customAttributes.sbatchOptions
? splitSbatchArgs(customAttributes.sbatchOptions)
Expand All @@ -97,8 +122,6 @@ export const appOps = (cluster: string): AppOps => {
});
}

const jobName = appJobName;

const workingDirectory = join(portalConfig.appJobsDir, jobName);

const lastSubmissionDirectory = join(portalConfig.appLastSubmissionDir, appId);
Expand Down
11 changes: 10 additions & 1 deletion apps/portal-server/src/services/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { ServiceError } from "@ddadaal/tsgrpc-common";
import { plugin } from "@ddadaal/tsgrpc-server";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { jobInfoToPortalJobInfo, jobInfoToRunningjob } from "@scow/lib-scheduler-adapter";
import { checkSchedulerApiVersion } from "@scow/lib-server";
import { checkJobNameExisting, checkSchedulerApiVersion } from "@scow/lib-server";
import { createDirectoriesRecursively, sftpReadFile, sftpStat, sftpWriteFile } from "@scow/lib-ssh";
import { AccountStatusFilter, JobServiceServer, JobServiceService, TimeUnit } from "@scow/protos/build/portal/job";
import { parseErrorDetails } from "@scow/rich-error-model";
Expand Down Expand Up @@ -233,6 +233,15 @@ export const jobServiceServer = plugin((server) => {
, errorOutput, memory, scriptOutput } = request;
await checkActivatedClusters({ clusterIds: cluster });

// 检查作业名是否重复
await callOnOne(
cluster,
logger,
async (client) => {
await checkJobNameExisting(client,userId,jobName,logger);
},
);

// make sure working directory exists
const host = getClusterLoginNode(cluster);
if (!host) { throw clusterNotFound(cluster); }
Expand Down
9 changes: 8 additions & 1 deletion apps/portal-web/src/pageComponents/app/LaunchAppForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export const LaunchAppForm: React.FC<Props> = ({ clusterId, appId, attributes, a
maxTime,
customAttributes: customFormKeyValue,
} })
.httpError(409, (e) => {
.httpError(500, (e) => {
if (e.code === "SBATCH_FAILED") {
createErrorModal(e.message);
} else {
Expand All @@ -137,6 +137,13 @@ export const LaunchAppForm: React.FC<Props> = ({ clusterId, appId, attributes, a
throw e;
}
})
.httpError(409, (e) => {
if (e.code === "ALREADY_EXISTS") {
createErrorModal(e.message);
} else {
throw e;
}
})
.then(() => {
message.success(t(p("successMessage")));
Router.push(`/apps/${clusterId}/sessions`);
Expand Down
10 changes: 10 additions & 0 deletions apps/portal-web/src/pageComponents/job/SubmitJobForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ export const SubmitJobForm: React.FC<Props> = ({ initial = initialValues, submit
throw e;
}
})
.httpError(409, (e) => {
if (e.code === "ALREADY_EXISTS") {
modal.error({
title: t(p("errorMessage")),
content: e.message,
});
} else {
throw e;
}
})
.then(({ jobId }) => {
message.success(t(p("successMessage")) + jobId);
Router.push("/jobs/runningJobs");
Expand Down
11 changes: 9 additions & 2 deletions apps/portal-web/src/pages/api/app/createAppSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ export const CreateAppSessionSchema = typeboxRouteSchema({
message: Type.String(),
}),

409: Type.Object({
code: Type.Literal("ALREADY_EXISTS"),
message: Type.String(),
}),

404: Type.Object({
code: Type.Literal("APP_NOT_FOUND"),
message: Type.String(),
}),

409: Type.Object({
500: Type.Object({
code: Type.Literal("SBATCH_FAILED"),
message: Type.String(),
}),
Expand Down Expand Up @@ -126,11 +131,13 @@ export default /* #__PURE__*/route(CreateAppSessionSchema, async (req, res) => {
if (errors[0] && errors[0].$type === "google.rpc.ErrorInfo") {
switch (errors[0].reason) {
case "SBATCH_FAILED":
return { 409: { code: "SBATCH_FAILED" as const, message: ex.details } };
return { 500: { code: "SBATCH_FAILED" as const, message: ex.details } };
case "NOT FOUND":
return { 404: { code: "APP_NOT_FOUND" as const, message: ex.details } };
case "INVALID ARGUMENT":
return { 400: { code: "INVALID_INPUT" as const, message: ex.details } };
case "ALREADY EXISTS":
return { 409: { code: "ALREADY_EXISTS" as const, message: ex.details } };
default:
return e;
}
Expand Down
6 changes: 6 additions & 0 deletions apps/portal-web/src/pages/api/job/submitJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ export const SubmitJobSchema = typeboxRouteSchema({
message: Type.String(),
}),

409: Type.Object({
code: Type.Literal("ALREADY_EXISTS"),
message: Type.String(),
}),

500: Type.Object({
code: Type.Literal("SCHEDULER_FAILED"),
message: Type.String(),
Expand Down Expand Up @@ -137,6 +142,7 @@ export default route(SubmitJobSchema, async (req, res) => {
.catch(handlegRPCError({
[status.INTERNAL]: (err) => ({ 500: { code: "SCHEDULER_FAILED", message: err.details } } as const),
[status.NOT_FOUND]: (err) => ({ 404: { code: "NOT_FOUND", message: err.details } } as const),
[status.ALREADY_EXISTS]: (err) => ({ 409: { code: "ALREADY_EXISTS", message: err.details } } as const),
},
async () => await callLog(
{ ...logInfo,
Expand Down
1 change: 1 addition & 0 deletions libs/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
export * from "./apiAuthPlugin";
export * from "./app";
export * from "./date";
export * from "./job";
export * from "./misCommon/clustersActivation";
export * from "./scheduleAdapter";
export * from "./systemLanguage";
Expand Down
59 changes: 59 additions & 0 deletions libs/server/src/job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright (c) 2022 Peking University and Peking University Institute for Computing and Digital Economy
* SCOW is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/

import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { SchedulerAdapterClient } from "@scow/lib-scheduler-adapter";
import { DetailedError, ErrorInfo } from "@scow/rich-error-model";
import { ApiVersion } from "@scow/utils/build/version";
import { Logger } from "ts-log";

import { compareSchedulerApiVersion, getSchedulerApiVersion } from "./scheduleAdapter";

export const errorInfo = (reason: string) =>
ErrorInfo.create({ domain: "", reason: reason, metadata: {} });

/**
* HPC提交作业前检查作业名和应用名是否重复
* @param client
* @param userId
* @param jobName
* @param logger
*/
export const checkJobNameExisting = async (client: SchedulerAdapterClient,userId: string,jobName: string,
logger: Logger) => {
// 检查作业重名的最低调度器接口版本
const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 };

const scheduleApiVersion = await getSchedulerApiVersion(client, logger);

if (compareSchedulerApiVersion(scheduleApiVersion,minRequiredApiVersion)) {
const existingJobName = await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [], states: [], jobName,
},
}).then((resp) => resp.jobs);

if (existingJobName.length) {

throw new DetailedError({
code: Status.ALREADY_EXISTS,
message: `jobName ${jobName} is already existed`,
details: [errorInfo("ALREADY_EXISTS")],
});
}
} else {
logger.info("Adapter version lower than 1.6.0, do not perform check for duplicate job names");
}

};
Loading

0 comments on commit e776999

Please sign in to comment.