From 842bf926d68e7d4f1c08541339d36a8dc30facb0 Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Sat, 11 May 2024 23:48:36 +0100 Subject: [PATCH] feat(queue): create job concept --- package.json | 6 ++ src/jobs/userconfirm.job.ts | 20 +++++++ src/jobs/useremail.job.ts | 22 +++++++ src/jobs/useremailpassword.job.ts | 23 ++++++++ src/jobs/userpassword.job.ts | 22 +++++++ src/providers/queueworker.provider.ts | 82 +++++++++------------------ 6 files changed, 119 insertions(+), 56 deletions(-) create mode 100644 src/jobs/userconfirm.job.ts create mode 100644 src/jobs/useremail.job.ts create mode 100644 src/jobs/useremailpassword.job.ts create mode 100644 src/jobs/userpassword.job.ts diff --git a/package.json b/package.json index 12cfc01..d45b490 100644 --- a/package.json +++ b/package.json @@ -287,6 +287,12 @@ "#src/validators/login.validator", "#src/validators/register.validator", "#src/validators/update.validator" + ], + "jobs": [ + "#src/jobs/userconfirm.job", + "#src/jobs/useremail.job", + "#src/jobs/useremailpassword.job", + "#src/jobs/userpassword.job" ] } } diff --git a/src/jobs/userconfirm.job.ts b/src/jobs/userconfirm.job.ts new file mode 100644 index 0000000..6205d1b --- /dev/null +++ b/src/jobs/userconfirm.job.ts @@ -0,0 +1,20 @@ +import { Mail } from '@athenna/mail' +import type { User } from '#src/models/user' + +type Item = { + user: User +} + +export class UserConfirmJob { + public static queue() { + return 'user:confirm' + } + + public async handle({ user }: Item) { + await Mail.from('noreply@athenna.io') + .to(user.email) + .subject('Athenna Account Confirmation') + .view('mail/confirm', { user }) + .send() + } +} diff --git a/src/jobs/useremail.job.ts b/src/jobs/useremail.job.ts new file mode 100644 index 0000000..f2888e8 --- /dev/null +++ b/src/jobs/useremail.job.ts @@ -0,0 +1,22 @@ +import { Mail } from '@athenna/mail' +import type { User } from '#src/models/user' + +type Item = { + user: User + email: string + token: string +} + +export class UserEmailJob { + public static queue() { + return 'user:email' + } + + public async handle({ user, email, token }: Item) { + await Mail.from('noreply@athenna.io') + .to(user.email) + .subject('Athenna Email Change') + .view('mail/change-email', { user, email, token }) + .send() + } +} diff --git a/src/jobs/useremailpassword.job.ts b/src/jobs/useremailpassword.job.ts new file mode 100644 index 0000000..4738395 --- /dev/null +++ b/src/jobs/useremailpassword.job.ts @@ -0,0 +1,23 @@ +import { Mail } from '@athenna/mail' +import type { User } from '#src/models/user' + +type Item = { + user: User + email: string + password: string + token: string +} + +export class UserEmailPasswordJob { + public static queue() { + return 'user:email:password' + } + + public async handle({ user, email, password, token }: Item) { + await Mail.from('noreply@athenna.io') + .to(user.email) + .subject('Athenna Email & Password Change') + .view('mail/change-email-password', { user, email, password, token }) + .send() + } +} diff --git a/src/jobs/userpassword.job.ts b/src/jobs/userpassword.job.ts new file mode 100644 index 0000000..b6d8d24 --- /dev/null +++ b/src/jobs/userpassword.job.ts @@ -0,0 +1,22 @@ +import { Mail } from '@athenna/mail' +import type { User } from '#src/models/user' + +type Item = { + user: User + password: string + token: string +} + +export class UserPasswordJob { + public static queue() { + return 'user:password' + } + + public async handle({ user, password, token }: Item) { + await Mail.from('noreply@athenna.io') + .to(user.email) + .subject('Athenna Password Change') + .view('mail/change-password', { user, password, token }) + .send() + } +} diff --git a/src/providers/queueworker.provider.ts b/src/providers/queueworker.provider.ts index 8e225fa..3e049ab 100644 --- a/src/providers/queueworker.provider.ts +++ b/src/providers/queueworker.provider.ts @@ -1,76 +1,46 @@ -import { Mail } from '@athenna/mail' import { Log } from '@athenna/logger' import { ServiceProvider } from '@athenna/ioc' +import { Exec, Module } from '@athenna/common' import { Queue } from '#src/providers/facades/queue' export default class QueueWorkerProvider extends ServiceProvider { public intervals = [] public async boot() { - this.processByQueue('user:confirm', async user => { - return Mail.from('noreply@athenna.io') - .to(user.email) - .subject('Athenna Account Confirmation') - .view('mail/confirm', { user }) - .send() - }) - - this.processByQueue('user:email', async ({ user, token, email }) => { - return Mail.from('noreply@athenna.io') - .to(user.email) - .subject('Athenna Email Change') - .view('mail/change-email', { user, email, token }) - .send() - }) + const jobs = Config.get('rc.jobs', []) - this.processByQueue('user:password', async ({ user, token, password }) => { - return Mail.from('noreply@athenna.io') - .to(user.email) - .subject('Athenna Password Change') - .view('mail/change-password', { user, password, token }) - .send() - }) + await Exec.concurrently(jobs, async jobPath => { + const Job = await Module.resolve(jobPath, import.meta.url) + const alias = `App/Jobs/${Job.name}` - this.processByQueue( - 'user:email:password', - async ({ user, token, email, password }) => { - return Mail.from('noreply@athenna.io') - .to(user.email) - .subject('Athenna Email & Password Change') - .view('mail/change-email-password', { user, email, password, token }) - .send() - } - ) - } + const queueName = Job.queue() + const job = this.container.transient(Job, alias).use(alias) - public async shutdown() { - this.intervals.forEach(interval => clearInterval(interval)) - } + const interval = setInterval(async () => { + const queue = Queue.queue(queueName) - public processByQueue( - queueName: string, - processor: (data: any) => any | Promise - ) { - const interval = setInterval(async () => { - const queue = Queue.queue(queueName) + if (queue.isEmpty()) { + return + } - if (queue.isEmpty()) { - return - } + Log.info(`Processing jobs of ({yellow} "${queueName}") queue`) - Log.info(`Processing jobs of ({yellow} "${queueName}") queue`) + await queue.process(job.handle.bind(job)) - await queue.process(processor) + const jobsLength = queue.length() - const jobsLength = queue.length() + if (jobsLength) { + Log.info( + `Still has ({yellow} ${jobsLength}) jobs to process on ({yellow} "${queueName}")` + ) + } + }, 5000) - if (jobsLength) { - Log.info( - `Still has ({yellow} ${jobsLength}) jobs to process on ({yellow} "${queueName}")` - ) - } - }, 5000) + this.intervals.push(interval) + }) + } - this.intervals.push(interval) + public async shutdown() { + this.intervals.forEach(interval => clearInterval(interval)) } }