Skip to content

Commit

Permalink
feat(mis): 增加定时同步scow与适配器中用户封锁状态及账户封锁/解封状态的功能 (#913)
Browse files Browse the repository at this point in the history
做了什么:
增加同步 scow 与适配器中用户封锁、账户封锁/解封状态的功能
1、增加同步 scow 与适配器中用户封锁、账户封锁/解封状态的接口,如果有部分账户/用户同步失败,跳过继续执行,接口返回失败账户/用户的信息
2、增加获取同步 scow 与适配器中用户封锁状态、账户封锁/解封状态的配置信息的接口
3、增加设置同步 scow 与适配器中用户封锁状态、账户封锁/解封状态的配置启动/关闭的接口
将原有接口updateBlockStatus置为deprecated

页面如下:

![image](https://github.com/PKUHPC/SCOW/assets/25954437/990dddbb-1786-4f4f-8403-47ee82f65e6b)

操作验证如下:先修改数据库,使一个账户在scow状态变为封锁,但用户仍旧能通过他它来提交作业。同步封锁状态后,提交的作业就无法运行了

![12](https://github.com/PKUHPC/SCOW/assets/25954437/335c8ee4-c277-4a9e-ae88-fc778ac660b1)

同理,只修改scow数据库中的账户的状态为解封,该作业未运行,同步后,即可运行:

![13](https://github.com/PKUHPC/SCOW/assets/25954437/ca399e0d-f19d-4440-80e6-bf51a97f1698)
  • Loading branch information
tongchong authored Nov 2, 2023
1 parent 8822114 commit 50d34d6
Show file tree
Hide file tree
Showing 24 changed files with 585 additions and 128 deletions.
5 changes: 5 additions & 0 deletions .changeset/little-mirrors-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@scow/config": minor
---

增加scow定时同步调度器用户封锁、账户封锁/解封状态的配置,可配置同步周期、是否启动
6 changes: 6 additions & 0 deletions .changeset/silver-masks-repeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@scow/mis-server": minor
"@scow/mis-web": minor
---

增加scow定时同步调度器用户封锁、账户封锁/解封状态的功能
7 changes: 7 additions & 0 deletions .changeset/warm-wasps-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@scow/grpc-api": minor
---
增加scow定时同步调度器用户封锁、账户封锁/解封状态的接口,返回失败的账户、用户的信息
增加获取scow定时同步调度器用户封锁、账户封锁/解封状态配置信息的接口
增加设置scow定时同步调度器用户封锁、账户封锁/解封状态配置启动/关闭的接口
后续版本版本将会删除updateBlockStatus接口
7 changes: 7 additions & 0 deletions apps/cli/assets/config/mis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ fetchJobs:
# 周期的cron表达式
cron: "10 */10 * * * *"

# 周期性同步scow与调度器(如slurm)账户用户封锁状态的配置
periodicSyncUserAccountBlockStatus:
# 是否开启
enabled: true
# 周期的cron表达式
cron: "0 4 * * *"

# 预定义的充值类型
predefinedChargingTypes:
- 测试
Expand Down
4 changes: 1 addition & 3 deletions apps/mis-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import { Server } from "@ddadaal/tsgrpc-server";
import { omitConfigSpec } from "@scow/lib-config";
import { readVersionFile } from "@scow/utils/build/version";
import { updateBlockStatusInSlurm } from "src/bl/block";
import { config } from "src/config/env";
import { plugins } from "src/plugins";
import { accountServiceServer } from "src/services/account";
Expand Down Expand Up @@ -53,8 +52,7 @@ export async function createServer() {
await server.register(configServiceServer);
await server.register(misConfigServiceServer);

const em = server.ext.orm.em.fork();
await updateBlockStatusInSlurm(em, server.ext.clusters, server.logger);
await server.ext.syncBlockStatus.sync();

return server;
}
108 changes: 85 additions & 23 deletions apps/mis-server/src/bl/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,62 +14,124 @@ import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { Logger } from "@ddadaal/tsgrpc-server";
import { Loaded } from "@mikro-orm/core";
import { MySqlDriver, SqlEntityManager } from "@mikro-orm/mysql";
import { BlockedFailedUserAccount } from "@scow/protos/build/server/admin";
import { Account } from "src/entities/Account";
import { SystemState } from "src/entities/SystemState";
import { UserAccount, UserStatus } from "src/entities/UserAccount";
import { ClusterPlugin } from "src/plugins/clusters";
import { callHook } from "src/plugins/hookClient";


/**
* Update block status of accounts and users in the slurm.
* If it is whitelisted, it doesn't block.
*
* @returns Updated number of blocked accounts and users
* @returns Block successful and failed accounts and users
**/
export async function updateBlockStatusInSlurm(
em: SqlEntityManager<MySqlDriver>, clusterPlugin: ClusterPlugin["clusters"], logger: Logger,
) {
const blockedAccounts: string[] = [];
const blockedFailedAccounts: string[] = [];
const accounts = await em.find(Account, { blocked: true });

for (const account of accounts) {
if (account.whitelist) {
continue;
}
await clusterPlugin.callOnAll(logger, async (client) =>
await asyncClientCall(client.account, "blockAccount", {
accountName: account.accountName,
}),
);

try {
await clusterPlugin.callOnAll(logger, async (client) =>
await asyncClientCall(client.account, "blockAccount", {
accountName: account.accountName,
}),
);
blockedAccounts.push(account.accountName);
} catch (error) {
blockedFailedAccounts.push(account.accountName);
}
}

const blockedUserAccounts: [string, string][] = [];
const blockedFailedUserAccounts: BlockedFailedUserAccount[] = [];
const userAccounts = await em.find(UserAccount, {
status: UserStatus.BLOCKED,
}, { populate: ["user", "account"]});

for (const ua of userAccounts) {
await clusterPlugin.callOnAll(logger, async (client) =>
await asyncClientCall(client.user, "blockUserInAccount", {
accountName: ua.account.getProperty("accountName"),
userId: ua.user.getProperty("userId"),
}),
);
try {
await clusterPlugin.callOnAll(logger, async (client) =>
await asyncClientCall(client.user, "blockUserInAccount", {
accountName: ua.account.$.accountName,
userId: ua.user.$.userId,
}),
);
blockedUserAccounts.push([ua.user.getProperty("userId"), ua.account.getProperty("accountName")]);
} catch (error) {
blockedFailedUserAccounts.push({
userId: ua.user.$.userId,
accountName: ua.account.$.accountName,
});
}
}
const updateBlockTime = await em.upsert(SystemState, {
key: SystemState.KEYS.UPDATE_SLURM_BLOCK_STATUS,
value: new Date().toISOString(),

logger.info("Updated block status in slurm of the following accounts: %o", blockedAccounts);
logger.info("Updated block status failed in slurm of the following accounts: %o", blockedFailedAccounts);

logger.info("Updated block status in slurm of the following user account: %o", blockedUserAccounts);
logger.info("Updated block status failed in slurm of the following user account: %o", blockedFailedUserAccounts);

return {
blockedAccounts,
blockedFailedAccounts,
blockedUserAccounts,
blockedFailedUserAccounts,
};

}


/**
* Update unblock status of accounts in the slurm.
* In order to ensure the stability of the service, serial is selected here.
*
* @returns Unblocked Block successful and failed accounts
**/
export async function updateUnblockStatusInSlurm(
em: SqlEntityManager<MySqlDriver>, clusterPlugin: ClusterPlugin["clusters"], logger: Logger,
) {
const accounts = await em.find(Account, {
$or: [
{ blocked: false },
{ whitelist: { $ne: null } },
],
});
await em.persistAndFlush(updateBlockTime);

logger.info("Updated block status in slurm of the following accounts: %o", accounts.map((x) => x.accountName));
logger.info("Updated block status in slurm of the following user account: %o",
userAccounts.map((x) => [x.user.getProperty("userId"), x.account.getProperty("accountName")]));
const unblockedAccounts: string[] = [];
const unblockedFailedAccounts: string[] = [];

for (const account of accounts) {
try {
await clusterPlugin.callOnAll(logger, async (client) =>
await asyncClientCall(client.account, "unblockAccount", {
accountName: account.accountName,
}),
);
unblockedAccounts.push(account.accountName);
} catch (error) {
unblockedFailedAccounts.push(account.accountName);
}
}

logger.info("Updated unblock status in slurm of the following accounts: %o", unblockedAccounts);
logger.info("Updated unblock status failed in slurm of the following accounts: %o", unblockedFailedAccounts);

return {
blockedAccounts: accounts.map((x) => x.id),
blockedUserAccounts: userAccounts.map((x) => x.id),
unblockedAccounts,
unblockedFailedAccounts,
};

}


/**
* Blocks the account in the slurm.
* If it is whitelisted, it doesn't block.
Expand Down
4 changes: 3 additions & 1 deletion apps/mis-server/src/plugins/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import { ClusterPlugin, clustersPlugin } from "src/plugins/clusters";
import { FetchPlugin, fetchPlugin } from "src/plugins/fetch";
import { ormPlugin } from "src/plugins/orm";
import { PricePlugin, pricePlugin } from "src/plugins/price";
import { SyncBlockStatusPlugin, syncBlockStatusPlugin } from "src/plugins/syncBlockStatus";

declare module "@ddadaal/tsgrpc-server" {
interface Extensions extends ClusterPlugin, PricePlugin, FetchPlugin {
interface Extensions extends ClusterPlugin, PricePlugin, FetchPlugin, SyncBlockStatusPlugin {
orm: MikroORM<MySqlDriver>;
capabilities: Capabilities;
}
Expand All @@ -43,6 +44,7 @@ export const plugins = [
pricePlugin,
fetchPlugin,
authServicePlugin,
syncBlockStatusPlugin,
];

if (commonConfig.scowApi) {
Expand Down
85 changes: 85 additions & 0 deletions apps/mis-server/src/plugins/syncBlockStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright (c) 2022 Peking University and Peking University Institute for Computing and Digital Economy
* SCOW is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/

import { plugin } from "@ddadaal/tsgrpc-server";
import { SyncBlockStatusResponse } from "@scow/protos/build/server/admin";
import cron from "node-cron";
import { misConfig } from "src/config/mis";
import { lastSyncTime, synchronizeBlockStatus } from "src/tasks/syncBlockStatus";

export interface SyncBlockStatusPlugin {
syncBlockStatus: {
started: () => boolean;
start: () => void;
stop: () => void;
schedule: string;
lastSyncTime: () => Date | null;
sync: () => Promise<SyncBlockStatusResponse>;
}
}

export const syncBlockStatusPlugin = plugin(async (f) => {
const synchronizeCron = misConfig.periodicSyncUserAccountBlockStatus?.cron ?? "0 4 * * *";
let synchronizeStarted = !!misConfig.periodicSyncUserAccountBlockStatus?.enabled;
let synchronizeIsRunning = false;

const logger = f.logger.child({ plugin: "syncBlockStatus" });
logger.info("misConfig.periodicSyncStatus?.cron: %s", misConfig.periodicSyncUserAccountBlockStatus?.cron);

const trigger = () => {
if (synchronizeIsRunning) return;

synchronizeIsRunning = true;
return synchronizeBlockStatus(f.ext.orm.em.fork(), logger, f.ext).finally(() => { synchronizeIsRunning = false; });
};

const task = cron.schedule(
synchronizeCron,
trigger,
{
timezone: "Asia/Shanghai",
scheduled: misConfig.periodicSyncUserAccountBlockStatus?.enabled,
},
);

logger.info("Sync block status started.");

f.addCloseHook(() => {
task.stop();
logger.info("Sync block status stopped.");
});

f.addExtension("syncBlockStatus", <SyncBlockStatusPlugin["syncBlockStatus"]>{
started: () => synchronizeStarted,
start: () => {
if (synchronizeStarted) {
logger.info("Sync is requested to start but already started");
} else {
task.start();
synchronizeStarted = true;
logger.info("Sync started");
}
},
stop: () => {
if (!synchronizeStarted) {
logger.info("Sync is requested to stop but already stopped");
} else {
task.stop();
synchronizeStarted = false;
logger.info("Sync stopped");
}
},
schedule: synchronizeCron,
lastSyncTime: () => lastSyncTime,
sync: trigger,
});
});
25 changes: 25 additions & 0 deletions apps/mis-server/src/services/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ export const adminServiceServer = plugin((server) => {
return [reply ? reply : { newJobsCount: 0 }];
},

getSyncBlockStatusInfo: async () => {
return [{
syncStarted: server.ext.syncBlockStatus.started(),
schedule: server.ext.syncBlockStatus.schedule,
lastSyncTime: server.ext.syncBlockStatus.lastSyncTime()?.toISOString() ?? undefined,
}];
},

setSyncBlockStatusState: async ({ request }) => {
const { started } = request;

if (started) {
server.ext.syncBlockStatus.start();
} else {
server.ext.syncBlockStatus.stop();
}

return [{}];
},

syncBlockStatus: async () => {
const reply = await server.ext.syncBlockStatus.sync();
return [reply];
},

updateBlockStatus: async ({ em, logger }) => {
await updateBlockStatusInSlurm(em, server.ext.clusters, logger);
return [{}];
Expand Down
38 changes: 38 additions & 0 deletions apps/mis-server/src/tasks/syncBlockStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright (c) 2022 Peking University and Peking University Institute for Computing and Digital Economy
* SCOW is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/

import { Logger } from "@ddadaal/tsgrpc-server";
import { MySqlDriver, SqlEntityManager } from "@mikro-orm/mysql";
import { updateBlockStatusInSlurm, updateUnblockStatusInSlurm } from "src/bl/block";
import { SystemState } from "src/entities/SystemState";
import { ClusterPlugin } from "src/plugins/clusters";

export let lastSyncTime: Date | null = null;

export async function synchronizeBlockStatus(
em: SqlEntityManager<MySqlDriver>,
logger: Logger,
clusterPlugin: ClusterPlugin,
) {
const { blockedFailedAccounts, blockedFailedUserAccounts } =
await updateBlockStatusInSlurm(em, clusterPlugin.clusters, logger);
const { unblockedFailedAccounts } = await updateUnblockStatusInSlurm(em, clusterPlugin.clusters, logger);

lastSyncTime = new Date();

const updateBlockTime = await em.upsert(SystemState, {
key: SystemState.KEYS.UPDATE_SLURM_BLOCK_STATUS,
value: new Date().toISOString(),
});
await em.persistAndFlush(updateBlockTime);
return { blockedFailedAccounts, blockedFailedUserAccounts, unblockedFailedAccounts };
}
Loading

0 comments on commit 50d34d6

Please sign in to comment.