Skip to content

Commit

Permalink
feat: initialize bull job
Browse files Browse the repository at this point in the history
  • Loading branch information
KagChi committed Jul 1, 2022
1 parent 42cc004 commit d23e8e3
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 12 deletions.
6 changes: 5 additions & 1 deletion .env_example
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
TOTAL_CLUSTERS = 6
TOTAL_CLUSTERS = 6
QUEUE_NAME = "scheduled-tasks"
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_PASSWORD =
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@nezuchan/scheduled-tasks",
"version": "1.0.0",
"description": "A Standalone scheduled tasks service for microserviced \"Discord\" bot.",
"main": "dist/dist.js",
"main": "dist/index.js",
"scripts": {
"start": "npm run build && node -r dotenv/config dist/index.js",
"lint": "eslint src --ext ts",
Expand Down
6 changes: 4 additions & 2 deletions src/Listeners/readyListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import { ApplyOptions } from "../Utilities/Decorators/ApplyOptions.js";
})

export class readyListener extends Listener {
public run(): void {
this.container.manager.logger.info(`Task Manager cluster ${this.container.manager.clusterId} is ready.`);
public async run(): Promise<void> {
const jobList = await this.container.manager.bull.getJobs(["delayed", "waiting"]);
const jobRepeatedList = await this.container.manager.bull.getRepeatableJobs(0, -1);
this.logger.info(`Scheduled Tasks cluster ${this.container.manager.clusterId} is ready. ${jobList.length} job(s), ${jobRepeatedList.length} repeatable job(s).`);
}
}
3 changes: 3 additions & 0 deletions src/Stores/Listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import { Piece } from "@sapphire/pieces";
import { Result } from "@sapphire/result";
import EventEmitter from "node:events";
import { Logger } from "pino";

export abstract class Listener extends Piece {
public readonly emitter: EventEmitter | null;
public readonly event: string | symbol;
public readonly once: boolean;
public readonly logger: Logger;
private _listener: ((...args: any[]) => void) | null;

public constructor(context: Piece.Context, public options: ListenerOptions) {
super(context, options);
this.logger = this.container.manager.logger;
this.emitter =
typeof options.emitter === "undefined"
? this.container.manager
Expand Down
26 changes: 23 additions & 3 deletions src/Structures/TaskManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/no-base-to-string */
/* eslint-disable class-methods-use-this */
import { container, Piece, Store, StoreRegistry } from "@sapphire/pieces";
import { Result } from "@sapphire/result";
import Bull from "bull";
import EventEmitter from "node:events";
import { resolve } from "node:path";
import pino from "pino";
Expand All @@ -8,7 +12,21 @@ import { Util } from "../Utilities/Util.js";

export class TaskManager extends EventEmitter {
public stores = new StoreRegistry();
public clusterId!: number;
public clusterId = parseInt(process.env.CLUSTER_ID!);

public bull = new Bull(`${process.env.QUEUE_NAME!}-cluster-${this.clusterId}`, {
redis: {
host: process.env.REDIS_HOST!,
port: parseInt(process.env.REDIS_PORT!),
password: process.env.REDIS_PASSWORD
},
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 2
}
});

public logger = pino({
name: "scheduled-tasks",
timestamp: true,
Expand All @@ -35,10 +53,12 @@ export class TaskManager extends EventEmitter {
}));
}

public async initialize(clusterId: number): Promise<void> {
this.clusterId = clusterId;
public async initialize(): Promise<void> {
container.manager = this;
this.logger.info(`Initializing Scheduled Tasks cluster ${this.clusterId}`);
// TODO: Forward to rabbitmq server.
const bullProcessResult = Result.from(() => void this.bull.process("*", () => { }));
if (bullProcessResult.isErr()) throw new Error(`Failed to initialize Scheduled Tasks cluster ${this.clusterId}, ${bullProcessResult.err()}`);
this.stores.register(new ListenerStore());
await Promise.all([...this.stores.values()].map((store: Store<Piece>) => store.loadAll()));
this.emit("ready", this);
Expand Down
12 changes: 7 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-loop-func */
/* eslint-disable @typescript-eslint/naming-convention */
/* eslint-disable @typescript-eslint/prefer-for-of */
import cluster from "node:cluster";
Expand All @@ -6,6 +7,7 @@ import { cpus } from "node:os";
import pino from "pino";
import { Util } from "./Utilities/Util.js";
import { TaskManager } from "./Structures/TaskManager.js";
import { Result } from "@sapphire/result";

if (cluster.isPrimary) {
const clusters = Number(process.env.TOTAL_CLUSTERS ?? cpus().length);
Expand Down Expand Up @@ -37,14 +39,14 @@ if (cluster.isPrimary) {
logger.info(`Starting Scheduled Tasks in ${clusters} clusters`);
for (let index = 0; index < clusters; index++) {
logger.info(`Launching Scheduled Tasks cluster ${index}`);
try {
cluster.fork({ CLUSTER_ID: index, ...process.env });
logger.info(`Launched Scheduled Tasks cluster ${index}`);
} catch {
const clusterResult = Result.from(() => cluster.fork({ CLUSTER_ID: index, ...process.env }));
if (clusterResult.isErr()) {
logger.error(`Failed to launch Scheduled Tasks cluster ${index}`);
continue;
} else {
logger.info(`Launched Scheduled Tasks cluster ${index}`);
}
}
} else {
await new TaskManager().initialize(Number(process.env.CLUSTER_ID));
await new TaskManager().initialize();
}

0 comments on commit d23e8e3

Please sign in to comment.