diff --git a/bin/test.ts b/bin/test.ts index 49fa395..830f74d 100644 --- a/bin/test.ts +++ b/bin/test.ts @@ -6,6 +6,7 @@ await Runner.setTsEnv() .addAssertPlugin() .addPlugin(request()) .addPlugin(command()) + .setForceExit() .addPath('tests/e2e/**/*.ts') .addPath('tests/unit/**/*.ts') .setCliArgs(process.argv.slice(2)) diff --git a/package.json b/package.json index b98d544..ff6a5eb 100644 --- a/package.json +++ b/package.json @@ -135,6 +135,7 @@ }, "athenna": { "services": [ + "#src/helpers/queue", "#src/services/user.service", "#src/services/auth.service" ], @@ -146,7 +147,8 @@ "@athenna/view/providers/ViewProvider", "@athenna/mail/providers/MailProvider", "@athenna/mail/providers/SmtpServerProvider", - "@athenna/view/providers/ViewProvider" + "@athenna/view/providers/ViewProvider", + "#src/providers/queueworker.provider" ], "controllers": [ "#src/controllers/user.controller", diff --git a/src/config/mail.ts b/src/config/mail.ts index c226352..5e0eab0 100644 --- a/src/config/mail.ts +++ b/src/config/mail.ts @@ -36,6 +36,10 @@ export default { driver: 'smtp', host: Env('MAIL_HOST', 'localhost'), port: Env('MAIL_PORT', 587), + auth: { + user: Env('MAIL_USERNAME'), + pass: Env('MAIL_PASSWORD') + }, tls: { rejectUnauthorized: false } diff --git a/src/helpers/queue.ts b/src/helpers/queue.ts new file mode 100644 index 0000000..3aa602a --- /dev/null +++ b/src/helpers/queue.ts @@ -0,0 +1,146 @@ +import { Log } from '@athenna/logger' +import { Service } from '@athenna/ioc' +import { File, Path } from '@athenna/common' + +class VanillaQueue { + private queueName = 'default' + + private getFile() { + const path = Path.storage('queues.json') + + return new File(path, JSON.stringify({ default: [], deadletter: [] })) + } + + public async truncate() { + const path = Path.storage('queues.json') + + return new File(path, '').setContent( + JSON.stringify({ default: [], deadletter: [] }) + ) + } + + public async queue(name: string) { + const file = this.getFile() + const queues = file.getContentAsJsonSync() + + this.queueName = name + + if (!queues[name]) { + queues[name] = [] + } + + file.setContentSync(JSON.stringify(queues)) + + return this + } + + public async add(item: T) { + const file = this.getFile() + const queues = file.getContentAsJsonSync() + + queues[this.queueName].push(item) + + file.setContentSync(JSON.stringify(queues)) + + return this + } + + public async pop() { + const file = this.getFile() + const queues = file.getContentAsJsonSync() + + if (!queues[this.queueName].length) { + return null + } + + const item = queues[this.queueName].shift() + + file.setContentSync(JSON.stringify(queues)) + + return item + } + + public async peek() { + const file = this.getFile() + const queues = file.getContentAsJsonSync() + + if (!queues[this.queueName].length) { + return null + } + + return queues[this.queueName][0] + } + + public async length() { + const file = this.getFile() + const queues = file.getContentAsJsonSync() + + return queues[this.queueName].length + } + + public async process(processor: (item: T) => any | Promise) { + const data = await this.pop() + + try { + await processor(data) + } catch (err) { + console.log(err) + Log.error( + `Adding data of ({yellow} "${this.queueName}") to deadletter queue due to:`, + err + ) + + const queue = await new QueueImpl().queue('deadletter') + + await queue.add({ queue: this.queueName, data }) + } + } + + public async isEmpty() { + const file = this.getFile() + const queues = file.getContentAsJsonSync() + + return !queues[this.queueName].length + } +} + +@Service({ alias: 'App/Helpers/Queue' }) +export class QueueImpl { + public driver = new VanillaQueue() + + public async truncate() { + await this.driver.truncate() + + return this + } + + public async queue(name: string) { + await this.driver.queue(name) + + return this + } + + public async add(item: T) { + await this.driver.add(item) + } + + public async pop() { + return this.driver.pop() + } + + public async peek() { + return this.driver.peek() + } + + public async length() { + return this.driver.length() + } + + public async process(cb: (item: T) => any | Promise) { + return this.driver.process(cb) + } + + public async isEmpty() { + return this.driver.isEmpty() + } +} diff --git a/src/providers/facades/queue.ts b/src/providers/facades/queue.ts new file mode 100644 index 0000000..cf4d051 --- /dev/null +++ b/src/providers/facades/queue.ts @@ -0,0 +1,4 @@ +import { Facade } from '@athenna/ioc' +import type { QueueImpl } from '#src/helpers/queue' + +export const Queue = Facade.createFor('App/Helpers/Queue') diff --git a/src/providers/queueworker.provider.ts b/src/providers/queueworker.provider.ts new file mode 100644 index 0000000..20cfef0 --- /dev/null +++ b/src/providers/queueworker.provider.ts @@ -0,0 +1,46 @@ +import { Mail } from '@athenna/mail' +import { Log } from '@athenna/logger' +import { ServiceProvider } from '@athenna/ioc' +import { Queue } from '#src/providers/facades/queue' + +export default class QueueWorkerProvider extends ServiceProvider { + public intervals = [] + + public async boot() { + this.processByQueue('user:register', async user => { + return Mail.from('noreply@athenna.io') + .to(user.email) + .subject('Athenna Account Activation') + .view('mail/register', { user }) + .send() + }) + } + + public async shutdown() { + this.intervals.forEach(interval => clearInterval(interval)) + } + + public processByQueue(queueName: string, processor: any) { + const interval = setInterval(async () => { + const queue = await Queue.queue(queueName) + + if (await queue.isEmpty()) { + return + } + + Log.info(`Processing jobs of ({yellow} "${queueName}") queue`) + + await queue.process(processor) + + const jobsLength = await queue.length() + + if (jobsLength) { + Log.info( + `Still has ({yellow} ${jobsLength}) jobs to process on ({yellow} "${queueName}")` + ) + } + }, 5000) + + this.intervals.push(interval) + } +} diff --git a/src/services/auth.service.ts b/src/services/auth.service.ts index efb551d..8692ac8 100644 --- a/src/services/auth.service.ts +++ b/src/services/auth.service.ts @@ -1,11 +1,11 @@ import bcrypt from 'bcrypt' import jwt from 'jsonwebtoken' -import { Mail } from '@athenna/mail' import { Log } from '@athenna/logger' import { Uuid } from '@athenna/common' import { Service } from '@athenna/ioc' import { Config } from '@athenna/config' import type { User } from '#src/models/user' +import { Queue } from '#src/providers/facades/queue' import { UnauthorizedException } from '@athenna/http' import type { UserService } from '#src/services/user.service' @@ -47,12 +47,7 @@ export class AuthService { const user = await this.userService.create(data) - // TODO Move this to a queue - Mail.from('noreply@athenna.io') - .to(user.email) - .subject('Athenna Account Activation') - .view('mail/register', { user }) - .send() + await Queue.queue('user:register').then(q => q.add(user)) return user } diff --git a/storage/queues.json b/storage/queues.json new file mode 100644 index 0000000..73cb0c4 --- /dev/null +++ b/storage/queues.json @@ -0,0 +1 @@ +{"default":[],"deadletter":[],"user:register":[]} \ No newline at end of file diff --git a/tests/e2e/auth.controller.test.ts b/tests/e2e/auth.controller.test.ts index c1a2f41..45e8c73 100644 --- a/tests/e2e/auth.controller.test.ts +++ b/tests/e2e/auth.controller.test.ts @@ -6,6 +6,7 @@ import { Config } from '@athenna/config' import { SmtpServer } from '@athenna/mail' import { Database } from '@athenna/database' import { RoleUser } from '#src/models/roleuser' +import { Queue } from '#src/providers/facades/queue' import { BaseHttpTest } from '@athenna/core/testing/BaseHttpTest' import { Test, type Context, AfterAll, BeforeAll } from '@athenna/test' @@ -18,6 +19,7 @@ export default class AuthControllerTest extends BaseHttpTest { @AfterAll() public async afterAll() { + await Queue.truncate() await SmtpServer.close() await User.truncate() await Role.truncate() @@ -131,6 +133,9 @@ export default class AuthControllerTest extends BaseHttpTest { } }) + const queue = await Queue.queue('user:register') + + assert.deepEqual(await queue.length(), 1) assert.isTrue(await User.exists({ email: 'test@athenna.io' })) response.assertStatusCode(201) response.assertBodyContains({ diff --git a/tests/unit/auth.service.test.ts b/tests/unit/auth.service.test.ts index fc2eff2..b9f049a 100644 --- a/tests/unit/auth.service.test.ts +++ b/tests/unit/auth.service.test.ts @@ -1,10 +1,11 @@ import bcrypt from 'bcrypt' +import { Mail } from '@athenna/mail' import { Uuid } from '@athenna/common' +import { Queue } from '#src/providers/facades/queue' import { UserService } from '#src/services/user.service' import { AuthService } from '#src/services/auth.service' import { NotFoundException, UnauthorizedException } from '@athenna/http' import { Test, type Context, Mock, AfterEach, BeforeEach } from '@athenna/test' -import { Mail } from '@athenna/mail' export default class AuthServiceTest { private userService: UserService @@ -83,6 +84,7 @@ export default class AuthServiceTest { } Mail.when('send').resolve(undefined) + Queue.when('queue').resolve({ add: () => {} }) Mock.when(this.userService, 'create').resolve(userToRegister) const authService = new AuthService(this.userService)