Skip to content

Commit

Permalink
feat(deployment): implement clean up of managed deployments
Browse files Browse the repository at this point in the history
refs #395
  • Loading branch information
ygrishajev committed Nov 8, 2024
1 parent 1f76b8b commit 882fac4
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 21 deletions.
2 changes: 1 addition & 1 deletion apps/api/mvm.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"@akashnetwork/database": "1.0.0",
"@akashnetwork/env-loader": "1.0.1",
"@akashnetwork/http-sdk": "1.0.8",
"@akashnetwork/logging": "1.0.0"
"@akashnetwork/logging": "1.0.1"
}
}
2 changes: 2 additions & 0 deletions apps/api/src/billing/providers/type-registry.provider.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "@src/utils/protobuf";

import { getAkashTypeRegistry } from "@akashnetwork/akashjs/build/stargate";
import { Registry } from "@cosmjs/proto-signing";
import { defaultRegistryTypes } from "@cosmjs/stargate";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { DepositDeploymentAuthorization } from "@akashnetwork/akash-api/v1beta3";
import { DepositDeploymentAuthorization, MsgCloseDeployment } from "@akashnetwork/akash-api/v1beta3";
import { MsgRevoke } from "cosmjs-types/cosmos/authz/v1beta1/tx";
import { BasicAllowance } from "cosmjs-types/cosmos/feegrant/v1beta1/feegrant";
import { MsgGrantAllowance } from "cosmjs-types/cosmos/feegrant/v1beta1/tx";
import addYears from "date-fns/addYears";
import Long from "long";
import { singleton } from "tsyringe";

export interface SpendingAuthorizationMsgOptions {
Expand Down Expand Up @@ -83,4 +84,16 @@ export class RpcMessageService {
})
};
}

getCloseDeploymentMsg(address: string, dseq: number) {
return {
typeUrl: `/${MsgCloseDeployment.$type}`,
value: {
id: {
owner: address,
dseq: Long.fromString(dseq.toString(), true)
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class TxSignerService {
});
}

private async getClientForAddressIndex(addressIndex: number): Promise<SimpleSigningStargateClient> {
async getClientForAddressIndex(addressIndex: number): Promise<SimpleSigningStargateClient> {
const wallet = await this.getWalletForAddressIndex(addressIndex);
const client = await SigningStargateClient.connectWithSigner(this.config.RPC_NODE_ENDPOINT, wallet, {
registry: this.registry
Expand Down
12 changes: 11 additions & 1 deletion apps/api/src/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,19 @@ program
});
});

program
.command("cleanup-stale-deployments")
.description("Close deployments without leases created at least 10min ago")
.action(async (options, command) => {
await executeCliHandler(command.name(), async () => {
await container.resolve(TopUpDeploymentsController).cleanUpStaleDeployment();
});
});

const logger = new LoggerService({ context: "CLI" });

async function executeCliHandler(name: string, handler: () => Promise<void>) {
await context.with(trace.setSpan(context.active(), tracer.startSpan(name)), async () => {
const logger = new LoggerService({ context: "CLI" });
logger.info({ event: "COMMAND_START", name });
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { migratePG, closeConnections } = await require("./core/providers/postgres.provider");
Expand Down
30 changes: 27 additions & 3 deletions apps/api/src/core/repositories/base.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,42 @@ export abstract class BaseRepository<
return this.toOutput(first(items));
}

async find(query?: Partial<Output>, select?: Array<keyof Output>) {
async find(query?: Partial<Output>, options?: { select?: Array<keyof Output>; limit?: number; offset?: number }) {
const params: DBQueryConfig<"many", true> = {
where: this.queryToWhere(query)
};

if (select) {
params.columns = select.reduce((acc, field) => ({ ...acc, [field]: true }), {});
if (options?.select) {
params.columns = options.select.reduce((acc, field) => ({ ...acc, [field]: true }), {});
}

if (options?.limit) {
params.limit = options.limit;
}

if (options?.offset) {
params.offset = options.offset;
}

return this.toOutputList(await this.queryCursor.findMany(params));
}

async paginate(options: { select?: Array<keyof Output>; limit?: number; query?: Partial<Output> }, cb: (page: Output[]) => Promise<void>) {
let offset = 0;
let hasNextPage = true;
const limit = options?.limit || 100;

while (hasNextPage) {
const items = await this.find(options.query, { select: options.select, offset, limit });
offset += items.length;
hasNextPage = items.length === limit;

if (items.length) {
await cb(items);
}
}
}

async updateById(id: Output["id"], payload: Partial<Input>, options?: MutationOptions): Promise<Output>;
async updateById(id: Output["id"], payload: Partial<Input>): Promise<void>;
async updateById(id: Output["id"], payload: Partial<Input>, options?: MutationOptions): Promise<void | Output> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import { singleton } from "tsyringe";

import { StaleManagedDeploymentsCleanerService } from "@src/deployment/services/stale-managed-deployments-cleaner/stale-managed-deployments-cleaner.service";
import { TopUpCustodialDeploymentsService } from "@src/deployment/services/top-up-custodial-deployments/top-up-custodial-deployments.service";

@singleton()
export class TopUpDeploymentsController {
constructor(private readonly topUpDeploymentsService: TopUpCustodialDeploymentsService) {}
constructor(
private readonly topUpDeploymentsService: TopUpCustodialDeploymentsService,
private readonly staleDeploymentsCleanerService: StaleManagedDeploymentsCleanerService
) {}

async topUpDeployments() {
await this.topUpDeploymentsService.topUpDeployments();
}

async cleanUpStaleDeployment() {
await this.staleDeploymentsCleanerService.cleanup();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Deployment, Lease } from "@akashnetwork/database/dbSchemas/akash";
import { literal, Op } from "sequelize";
import { singleton } from "tsyringe";

export interface StaleDeploymentsOptions {
createdHeight: number;
owner: string;
}

export interface StaleDeploymentsOutput {
dseq: number;
}

@singleton()
export class DeploymentRepository {
async findStaleDeployments(options: StaleDeploymentsOptions): Promise<StaleDeploymentsOutput[]> {
const deployments = await Deployment.findAll({
attributes: ["dseq"],
include: [
{
model: Lease,
attributes: [],
required: false
}
],
where: {
owner: options.owner,
[Op.and]: [
{
createdHeight: {
[Op.lt]: options.createdHeight
}
},
{
closedHeight: null
}
]
},
group: ["deployment.dseq"],
having: literal(`COUNT("leases"."deploymentId") = 0`),
raw: true
});

return deployments ? (deployments as unknown as StaleDeploymentsOutput[]) : [];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { LoggerService } from "@akashnetwork/logging";
import { secondsInMinute } from "date-fns/constants";
import { singleton } from "tsyringe";

import { UserWalletOutput, UserWalletRepository } from "@src/billing/repositories";
import { RpcMessageService } from "@src/billing/services";
import { TxSignerService } from "@src/billing/services/tx-signer/tx-signer.service";
import { ErrorService } from "@src/core/services/error/error.service";
import { DeploymentRepository } from "@src/deployment/repositories/deployment/deployment.repository";
import { BlockHttpService } from "@src/deployment/services/block-http/block-http.service";
import { averageBlockTime } from "@src/utils/constants";

@singleton()
export class StaleManagedDeploymentsCleanerService {
private readonly logger = new LoggerService({ context: StaleManagedDeploymentsCleanerService.name });

private readonly MAX_LIVE_BLOCKS = Math.floor((10 * secondsInMinute) / averageBlockTime);

constructor(
private readonly userWalletRepository: UserWalletRepository,
private readonly deploymentRepository: DeploymentRepository,
private readonly blockHttpService: BlockHttpService,
private readonly rpcMessageService: RpcMessageService,
private readonly txSignerService: TxSignerService,
private readonly errorService: ErrorService
) {}

async cleanup() {
await this.userWalletRepository.paginate({ limit: 10 }, async wallets => {
const cleanUpAllWallets = wallets.map(async wallet => {
await this.errorService.execWithErrorHandler({ wallet, event: "DEPLOYMENT_CLEAN_UP_ERROR" }, () => this.cleanUpForWallet(wallet));
});

await Promise.all(cleanUpAllWallets);
});
}

private async cleanUpForWallet(wallet: UserWalletOutput) {
const currentHeight = await this.blockHttpService.getCurrentHeight();
const client = await this.txSignerService.getClientForAddressIndex(wallet.id);
const deployments = await this.deploymentRepository.findStaleDeployments({
owner: wallet.address,
createdHeight: currentHeight - this.MAX_LIVE_BLOCKS
});

const closeAllWalletStaleDeployments = deployments.map(async deployment => {
const message = this.rpcMessageService.getCloseDeploymentMsg(wallet.address, deployment.dseq);
this.logger.info({ event: "DEPLOYMENT_CLEAN_UP", params: { owner: wallet.address, dseq: deployment.dseq } });

await client.signAndBroadcast([message]);

this.logger.info({ event: "DEPLOYMENT_CLEAN_UP_SUCCESS" });
});

await Promise.all(closeAllWalletStaleDeployments);
}
}
2 changes: 1 addition & 1 deletion apps/deploy-web/next-env.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
/// <reference types="next/image-types/global" />

// NOTE: This file should not be edited
// see https://nextjs.org/docs/basic-features/typescript for more information.
// see https://nextjs.org/docs/pages/building-your-application/configuring/typescript for more information.
2 changes: 1 addition & 1 deletion packages/logging/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@akashnetwork/logging",
"version": "1.0.0",
"version": "1.0.1",
"description": "Package containing logging tools",
"main": "src/index.ts",
"scripts": {
Expand Down
23 changes: 12 additions & 11 deletions packages/logging/src/servicies/logger/logger.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@ export class LoggerService implements Logger {
private initPino(bindings?: Bindings): Logger {
const destinations: Writable[] = [];

let options: LoggerOptions = {
level: config.LOG_LEVEL,
mixin: () => {
const currentSpan = trace.getSpan(context.active());
return { ...currentSpan?.spanContext() };
}
};

if (config.STD_OUT_LOG_FORMAT === "pretty") {
destinations.push(pretty({ sync: true }));
} else {
options = gcpLogOptions(options as any);
destinations.push(process.stdout);
}

Expand All @@ -32,14 +41,6 @@ export class LoggerService implements Logger {
destinations.push(fluentd);
}

const options = gcpLogOptions({
level: config.LOG_LEVEL,
mixin: () => {
const currentSpan = trace.getSpan(context.active());
return currentSpan?.spanContext() || {};
}
}) as LoggerOptions;

let instance = pino(options, this.combineDestinations(destinations));

if (bindings) {
Expand Down Expand Up @@ -96,9 +97,9 @@ export class LoggerService implements Logger {
const loggableInput = { status: message.status, message: message.message, stack: message.stack, data: message.data };
return "originalError" in message
? {
...loggableInput,
originalError: message.stack
}
...loggableInput,
originalError: message.stack
}
: loggableInput;
}

Expand Down

0 comments on commit 882fac4

Please sign in to comment.