Skip to content

Commit

Permalink
Merge pull request #19 from Phoenix-Commerce/feat/worker-modes
Browse files Browse the repository at this point in the history
feat: add bullmq workers and queues
  • Loading branch information
brent-hoover authored Jun 17, 2024
2 parents 9879d1b + 0e51ed8 commit 7376a61
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 5 deletions.
Binary file modified bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"@typegoose/typegoose": "^12.5.0",
"apollo-server-express": "^3.13.0",
"bcrypt": "^5.1.1",
"bullmq": "^5.8.2",
"casbin": "^5.30.0",
"casbin-mongoose-adapter": "^5.3.1",
"dotenv": "^16.4.5",
Expand Down
4 changes: 4 additions & 0 deletions src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ const env = cleanEnv(process.env, {
MONGO_URI: str({ desc: 'MongoDB connection string' }),
PORT: port({ default: 4000 }),
JWT_SECRET: str({ desc: 'Secret key for JWT token' }),
JTW_EXPIRY: str({ default: '1y' }),
MODE: str({ choices: ['server', 'worker', 'dev'], default: 'dev' }),
REDIS_HOST: str({ default: 'localhost' }),
REDIS_PORT: port({ default: 6379 }),
});

export default env;
20 changes: 19 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { isIntrospectionQuery } from './utils/introspection-check';
import { shouldBypassAuth } from './utils/should-bypass-auth';
import { bootstrap } from './plugins/auth-plugin/bootstrap';
import sanitizeLog from './sanitize-log';
import { startWorker } from './worker';

const loggerCtx = { context: 'index' };

Expand Down Expand Up @@ -95,4 +96,21 @@ async function startServer() {
}
}

startServer();
async function startApp() {
switch (env.MODE) {
case 'server':
await startServer();
break;
case 'worker':
await startWorker();
break;
case 'dev':
await startServer();
await startWorker();
break;
default:
logger.error('Unknown mode specified. Please set MODE to "server", "worker", or "dev".', loggerCtx);
}
}

startApp();
3 changes: 2 additions & 1 deletion src/plugins/auth-plugin/resolvers/auth-resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { UserService } from '../services/user-service.ts';
import { Service } from 'typedi';
import jwt from 'jsonwebtoken';
import { getEnforcer } from '../../../rbac';
import env from '../../../config/config.ts';

@Service() // Register AuthResolver with Typedi
@Resolver()
Expand Down Expand Up @@ -39,7 +40,7 @@ export class AuthResolver {
throw new Error('Invalid credentials');
}

const token = jwt.sign({ role: user.role, id: user._id }, process.env.JWT_SECRET!, { expiresIn: '1h' });
const token = jwt.sign({ role: user.role, id: user._id }, process.env.JWT_SECRET!, { expiresIn: env.JTW_EXPIRY });
return token;
}

Expand Down
2 changes: 2 additions & 0 deletions src/plugins/global-context.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Schema } from 'mongoose';
import { type WorkerOptions } from 'bullmq';

export type ResolverMap = {
[resolverName: string]: Function;
Expand All @@ -10,4 +11,5 @@ export interface GlobalContext {
extendModel: (name: string, extension: (schema: Schema) => void) => void;
extendResolvers: (name: string, extension: Function[]) => void;
wrapResolver: (name: string, resolver: string, wrapper: Function) => void;
queues: { [key: string]: { processor: (job: any) => Promise<void>; options: WorkerOptions } };
}
34 changes: 34 additions & 0 deletions src/plugins/plugin-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import mongoose, { Schema } from 'mongoose';
import { type GlobalContext } from './global-context';
import { type Plugin } from './plugin-interface';
import pluginsList from './plugins-list';
import { Queue, Worker, QueueEvents, type WorkerOptions } from 'bullmq';
import env from '../config/config';

const loggerCtx = { context: 'plugin-loader' };

class PluginLoader {
private plugins: Plugin[] = [];
context: GlobalContext = this.createInitialContext();
private queues: Record<string, Queue> = {};

private createInitialContext(): GlobalContext {
return {
Expand All @@ -22,6 +25,7 @@ class PluginLoader {
extendModel: this.extendModel.bind(this),
extendResolvers: this.extendResolvers.bind(this),
wrapResolver: this.wrapResolver.bind(this),
queues: {}
};
}

Expand Down Expand Up @@ -114,6 +118,36 @@ class PluginLoader {
mongoose.model(modelName, this.context.models[modelName].schema);
});
}

initializeQueues(): void {
Object.keys(this.context.queues).forEach((queueName) => {
const { processor, options } = this.context.queues[queueName];
const queue = new Queue(queueName, options);
const worker = new Worker(queueName, processor, options);
const queueEvents = new QueueEvents(queueName, options);

queueEvents.on('completed', (job) => {
logger.info(`Job ${job.jobId} in queue ${queueName} completed!`, loggerCtx);
});

queueEvents.on('failed', (job, err) => {
const errorMessage = this.extractErrorMessage(err);
logger.error(`Job ${job.jobId} in queue ${queueName} failed with error: ${errorMessage}`, loggerCtx);;
});

this.queues[queueName] = queue;
});
}

private extractErrorMessage(err: unknown): string {
if (typeof err === 'string') {
return err;
}
if (err instanceof Error) {
return err.message;
}
return JSON.stringify(err);
}
}

export default PluginLoader;
29 changes: 28 additions & 1 deletion src/plugins/sample-plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import { Sample } from './models/sample';
import { SampleResolver } from './resolvers/sample-resolver';
import { SampleService } from './services/sample-service';
import KafkaEventService from '../../event/kafka-event-service';
import { Queue, Job } from 'bullmq';

const sampleJobProcessor = async (job: Job) => {
console.log(`Processing job ${job.id}`);
// Add job processing logic here
};

export default {
name: 'sample-plugin',
Expand All @@ -15,8 +21,19 @@ export default {
context.models['Sample'] = { schema: SampleModel.schema, model: SampleModel };
container.set('SampleModel', SampleModel);

// Define and register the queue for this plugin
const sampleQueue = new Queue('sampleQueue', {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: Number(process.env.REDIS_PORT) || 6379,
},
});

// Register sampleQueue in the container
container.set('sampleQueue', sampleQueue);

// Register SampleService and KafkaEventService with typedi
container.set(SampleService, new SampleService(Container.get(KafkaEventService)));
container.set(SampleService, new SampleService(Container.get(KafkaEventService), sampleQueue));

// Ensure SampleResolver is added to context resolvers
context.resolvers['Sample'] = [SampleResolver];
Expand All @@ -30,5 +47,15 @@ export default {
console.log('Sample created:', sample);
// Additional handling logic here
});

context.queues['sampleQueue'] = {
processor: sampleJobProcessor,
options: {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: Number(process.env.REDIS_PORT) || 6379,
},
},
};
},
};
15 changes: 13 additions & 2 deletions src/plugins/sample-plugin/services/sample-service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// src/services/sample-service.ts
import { Service } from 'typedi';
import { Sample, SampleModel } from '../models/sample';
import KafkaEventService from '../../../event/kafka-event-service';
import { Queue } from 'bullmq';

@Service()
export class SampleService {
constructor(private eventService: KafkaEventService) {}
constructor(
private eventService: KafkaEventService,
private sampleQueue: Queue
) {}

async getAllSamples(): Promise<Sample[]> {
return SampleModel.find().exec();
Expand All @@ -15,6 +18,14 @@ export class SampleService {
const sample = new SampleModel({ name });
const savedSample = await sample.save();
await this.eventService.emitEvent('sampleCreated', savedSample); // Emit event using the centralized service

// Add job to the sampleQueue
await this.sampleQueue.add('processSample', { sampleId: savedSample.id });

return savedSample;
}

async getSampleById(id: string): Promise<Sample | null> {
return SampleModel.findById(id).exec();
}
}
50 changes: 50 additions & 0 deletions src/plugins/sample-plugin/services/sample-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { Worker, Job } from 'bullmq';
import { Container } from 'typedi';
import env from '../../../config/config';
import logger from '../../../config/logger';
import { SampleService } from './sample-service';

const loggerCtx = { context: 'sample-worker' };

const sampleWorker = new Worker(
'sampleQueue',
async (job: Job | undefined) => {
if (!job) {
logger.error('Received an undefined job', loggerCtx);
return;
}

const sampleService = Container.get(SampleService);
const sampleId = job.data.sampleId;

// Process the sample
const sample = await sampleService.getSampleById(sampleId);
logger.info(`Processing sample: ${sample?.name}`);
// Add your processing logic here
},
{
connection: {
host: env.REDIS_HOST,
port: env.REDIS_PORT,
},
}
);

sampleWorker.on('completed', (job: Job | undefined) => {
if (job) {
logger.info(`Job ${job.id} in sampleQueue completed!`, loggerCtx);
} else {
logger.error('Completed job is undefined', loggerCtx);
}
});

sampleWorker.on('failed', (job: Job | undefined, err: unknown) => {
const errorMessage = typeof err === 'string' ? err : (err instanceof Error ? err.message : JSON.stringify(err));
if (job) {
logger.error(`Job ${job.id} in sampleQueue failed with error: ${errorMessage}`, loggerCtx);
} else {
logger.error(`A job failed with error: ${errorMessage}`, loggerCtx);
}
});

export default sampleWorker;
31 changes: 31 additions & 0 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { connectToDatabase } from './config/database';
import logger from './config/logger';
import { initEnforcer } from './rbac';
import { bootstrap } from './plugins/auth-plugin/bootstrap';
import PluginLoader from './plugins/plugin-loader';

const loggerCtx = { context: 'worker' };

export async function startWorker() {
try {
await connectToDatabase();
await initEnforcer(); // Initialize Casbin
await bootstrap(); // Bootstrap the application with a superuser

const pluginLoader = new PluginLoader();
pluginLoader.loadPlugins();

// Register models before initializing plugins
pluginLoader.registerModels();

// Initialize plugins (extend models and resolvers)
pluginLoader.initializePlugins();

// Initialize queues
pluginLoader.initializeQueues();

logger.info('Worker started and ready to process jobs', loggerCtx);
} catch (error) {
logger.error('Failed to start worker:', error, loggerCtx);
}
}

0 comments on commit 7376a61

Please sign in to comment.