Skip to content

Commit

Permalink
feat(ai): k8s集群支持containerd运行时 (#1141)
Browse files Browse the repository at this point in the history
ai系统增加对k8s集群containerd运行时的支持,镜像相关功能的命令更换为nerdctl
  • Loading branch information
ZihanChen821 authored Feb 28, 2024
1 parent 63d1873 commit d822db7
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 24 deletions.
6 changes: 6 additions & 0 deletions .changeset/six-wolves-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@scow/ai": patch
"@scow/docs": patch
---

ai 系统新增支持 k8s 集群的 containerd 运行时
5 changes: 5 additions & 0 deletions .changeset/swift-geese-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@scow/config": patch
---

集群配置增加 k8s 选项指明容器运行时
26 changes: 22 additions & 4 deletions apps/ai/src/server/trpc/route/image/image.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,23 @@ export const createImage = procedure
}

// 本地镜像时docker加载镜像
localImageUrl = await getLoadedImage({ ssh, logger, sourcePath }).catch((e) => {
localImageUrl = await getLoadedImage({
ssh,
clusterId: processClusterId,
logger,
sourcePath,
}).catch((e) => {
const ex = e as ServiceError;
throw new Error(`createImage failed, ${ex.message}`);
});
} else {
// 远程镜像需先拉取到本地
localImageUrl = await getPulledImage({ ssh, logger, sourcePath }).catch((e) => {
localImageUrl = await getPulledImage({
ssh,
clusterId: processClusterId,
logger,
sourcePath,
}).catch((e) => {
const ex = e as ServiceError;
throw new Error(`createImage failed, ${ex.message}`);
});
Expand All @@ -240,6 +250,7 @@ export const createImage = procedure
// 制作镜像,上传至harbor
await pushImageToHarbor({
ssh,
clusterId: processClusterId,
logger,
localImageUrl,
harborImageUrl,
Expand Down Expand Up @@ -339,7 +350,7 @@ export const deleteImage = procedure
});
}

// 获取harrbor中的reference以删除镜像
// 获取harbor中的reference以删除镜像
const getReferenceUrl = `${aiConfig.harborConfig.protocol}://${aiConfig.harborConfig.url}/api/v2.0/projects`
+ `/${aiConfig.harborConfig.project}/repositories/${user.identityId}%252F${image.name}/artifacts`;
const getReferenceRes = await fetch(getReferenceUrl, {
Expand Down Expand Up @@ -571,7 +582,13 @@ export const copyImage = procedure
if (sharedImage.path === undefined) {
throw new Error(`copyImage error: shared image ${id} do not have path`);
}
const localImageUrl = await getPulledImage({ ssh, logger, sourcePath: sharedImage.path })
const localImageUrl = await getPulledImage({
ssh,
clusterId: processClusterId,
logger,
sourcePath:
sharedImage.path,
})
.catch((e) => {
const ex = e as ServiceError;
throw new InternalServerError(ex.message, "Copy");
Expand All @@ -585,6 +602,7 @@ export const copyImage = procedure
// 制作镜像上传
await pushImageToHarbor({
ssh,
clusterId: processClusterId,
logger,
localImageUrl,
harborImageUrl,
Expand Down
20 changes: 17 additions & 3 deletions apps/ai/src/server/trpc/route/jobs/apps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ import { checkAppExist, checkCreateAppEntity, getClusterAppConfigs } from "src/s
import { getAdapterClient } from "src/server/utils/clusters";
import { clusterNotFound } from "src/server/utils/errors";
import { forkEntityManager } from "src/server/utils/getOrm";
import { commitContainerImage, createHarborImageUrl, pushImageToHarbor } from "src/server/utils/image";
import {
commitContainerImage,
createHarborImageUrl,
formatContainerId,
pushImageToHarbor,
} from "src/server/utils/image";
import { logger } from "src/server/utils/logger";
import { paginate, paginationSchema } from "src/server/utils/pagination";
import { getClusterLoginNode, sshConnect } from "src/server/utils/ssh";
Expand Down Expand Up @@ -467,7 +472,8 @@ export const saveImage =
const { node, containerId } = await asyncClientCall(client.app, "getRunningJobNodeInfo", {
jobId,
});
const formateContainerId = containerId.replace("docker://", "");

const formateContainerId = formatContainerId(clusterId, containerId);

// 连接到该节点
return await sshConnect(node, "root", logger, async (ssh) => {
Expand All @@ -476,11 +482,19 @@ export const saveImage =
const localImageUrl = `${userId}/${imageName}:${imageTag}`;

// commit镜像
await commitContainerImage({ node, ssh, logger, formateContainerId, localImageUrl });
await commitContainerImage({
node,
ssh,
clusterId,
logger,
formateContainerId,
localImageUrl,
});

// 保存镜像至harbor
await pushImageToHarbor({
ssh,
clusterId,
logger,
localImageUrl,
harborImageUrl,
Expand Down
72 changes: 56 additions & 16 deletions apps/ai/src/server/utils/image.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { NodeSSH } from "node-ssh";
import { Logger } from "ts-log";

import { aiConfig } from "../config/ai";
import { clusters } from "../config/clusters";

const LOADED_IMAGE_REGEX = "Loaded image: ([\\w./-]+(?::[\\w.-]+)?)";

Expand All @@ -29,27 +30,48 @@ export function createHarborImageUrl(imageName: string, imageTag: string, userId
return `${harborUrl}/${project}/${userId}/${imageName}:${imageTag}`;
};

export enum k8sRuntime {
docker = "docker",
containerd = "containerd"
}

const runtimeCommands = {
[k8sRuntime.docker]: "docker",
// -n namespace,k8s集群相关的容器命令必须加上该参数才有对应数据
[k8sRuntime.containerd]: "nerdctl -n k8s.io",
};

export enum Container {
DOCKER = "DOCKER",
CONTAINER_D = "CONTAINER_D"
const runtimeContainerIdPrefix = {
[k8sRuntime.docker]: "docker",
[k8sRuntime.containerd]: "containerd",
};

export function getRuntimeCommand(clusterId: string): string {
const runtime = clusters[clusterId].k8s?.runtime;
return runtimeCommands[runtime ?? k8sRuntime.docker];
}

// Container === Container.DOCKER的情况
// TODO:其他容器的情况
function getContainerIdPrefix(clusterId: string): string {
const runtime = clusters[clusterId].k8s?.runtime;
return runtimeContainerIdPrefix[runtime ?? k8sRuntime.docker];
}

// 加载本地镜像
export async function getLoadedImage({
ssh,
logger,
sourcePath,
clusterId,
}: {
ssh: NodeSSH,
logger: Logger,
sourcePath: string,
clusterId: string,
}): Promise<string | undefined> {

const loadedResp = await loggedExec(ssh, logger, true, "docker", ["load", "-i", sourcePath]);
const command = getRuntimeCommand(clusterId);

const loadedResp = await loggedExec(ssh, logger, true, command, ["load", "-i", sourcePath]);
const match = loadedResp.stdout.match(loadedImageRegex);
return match && match.length > 1 ? match[1] : undefined;
}
Expand All @@ -59,13 +81,18 @@ export async function getPulledImage({
ssh,
logger,
sourcePath,
clusterId,
}: {
ssh: NodeSSH,
logger: Logger,
sourcePath: string,
clusterId: string,
}): Promise<string | undefined> {

const pulledResp = await loggedExec(ssh, logger, true, "docker", ["pull", sourcePath]);
const command = getRuntimeCommand(clusterId);

const pulledResp = await loggedExec(ssh, logger, true, command, ["pull", sourcePath]);

return pulledResp ? sourcePath : undefined;
}

Expand All @@ -75,25 +102,29 @@ export async function pushImageToHarbor({
logger,
localImageUrl,
harborImageUrl,
clusterId,
}: {
ssh: NodeSSH,
logger: Logger,
localImageUrl: string,
harborImageUrl: string,
clusterId: string,
}): Promise<void> {

// docker login harbor
await loggedExec(ssh, logger, true, "docker", ["login", harborUrl, "-u", harborUser, "-p", password]);
const command = getRuntimeCommand(clusterId);

// login harbor
await loggedExec(ssh, logger, true, command, ["login", harborUrl, "-u", harborUser, "-p", password]);

// docker tag
await loggedExec(ssh, logger, true, "docker", ["tag", localImageUrl, harborImageUrl]);
// tag
await loggedExec(ssh, logger, true, command, ["tag", localImageUrl, harborImageUrl]);

// push 镜像至harbor
await loggedExec(ssh, logger, true, "docker", ["push", harborImageUrl]);
await loggedExec(ssh, logger, true, command, ["push", harborImageUrl]);

// 清除本地镜像
await loggedExec(ssh, logger, true, "docker", ["rmi", harborImageUrl]);
await loggedExec(ssh, logger, true, "docker", ["rmi", localImageUrl]);
await loggedExec(ssh, logger, true, command, ["rmi", harborImageUrl]);
await loggedExec(ssh, logger, true, command, ["rmi", localImageUrl]);
}

// commit制作本地镜像
Expand All @@ -103,16 +134,19 @@ export async function commitContainerImage({
logger,
formateContainerId,
localImageUrl,
clusterId,
}: {
node: string,
ssh: NodeSSH,
logger: Logger,
formateContainerId: string,
localImageUrl: string,
clusterId: string,
}): Promise<void> {

const command = getRuntimeCommand(clusterId);
const resp = await loggedExec(ssh, logger, true, "sh",
["-c", `docker ps --no-trunc | grep ${formateContainerId}`]);
["-c", `${command} ps --no-trunc | grep ${formateContainerId}`]);
if (!resp.stdout) {
throw new TRPCError({
code: "NOT_FOUND",
Expand All @@ -121,6 +155,12 @@ export async function commitContainerImage({
}

// commit镜像
await loggedExec(ssh, logger, true, "docker",
await loggedExec(ssh, logger, true, command,
["commit", formateContainerId, localImageUrl]);
}


export const formatContainerId = (clusterId: string, containerId: string) => {
const prefix = getContainerIdPrefix(clusterId);
return containerId.replace(`${prefix}://`, "");
};
2 changes: 1 addition & 1 deletion docs/docs/deploy/config/ai/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ imageTag: ai-beta.1

**AI 系统(beta)** 需要用户在使用时提前部署 K8S 的集群环境。

当前 **AI 系统(beta)** 为试用版本,我们暂时只支持在 `docker` 容器运行时中执行镜像相关的服务,后续会陆续推出支持`ContainerD`等其他主流容器运行时的 AI 系统。
当前 **AI 系统(beta)** 为试用版本,我们目前已经支持 `docker` 和 `containerd` 两种容器运行时的 k8s集群中使用 AI 系统。 若集群为`containerd` 运行时,需要在集群的节点上安装 [nerdctl](https://github.com/containerd/nerdctl)

当前试用版本中 K8S 部署的主要版本信息如下:

Expand Down
9 changes: 9 additions & 0 deletions libs/config/src/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import { createI18nStringSchema } from "src/i18n";

const CLUSTER_CONFIG_BASE_PATH = "clusters";

export enum k8sRuntime {
docker = "docker",
containerd = "containerd",
}

const LoginNodeConfigSchema =
Type.Object(
{
Expand Down Expand Up @@ -95,6 +100,10 @@ export const ClusterConfigSchema = Type.Object({
enabled: Type.Boolean({ description: "是否开启跨集群传输功能", default: false }),
transferNode: Type.Optional(Type.String({ description: "跨集群传输文件的节点" })),
})),
k8s: Type.Optional(Type.Object({
runtime: Type.Enum(k8sRuntime, { description: "k8s 集群运行时, ai系统的镜像功能的命令取决于该值, 可选 docker 或者 containerd",
default: "containerd" }),
}, { description: "k8s 集群配置" })),
});


Expand Down

0 comments on commit d822db7

Please sign in to comment.