diff --git a/bun.lockb b/bun.lockb index 265ae53..88c0753 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/docker-compose.yml b/docker-compose.yml index 36cac67..07676b3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,8 +22,62 @@ services: volumes: - redis-data:/data + zookeeper: + image: bitnami/zookeeper:latest + container_name: zookeeper + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + ports: + - "2181:2181" + volumes: + - zookeeper-data:/bitnami/zookeeper + networks: + - app-network + + kafka: + image: bitnami/kafka:latest + container_name: kafka + environment: + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + - ALLOW_PLAINTEXT_LISTENER=yes + ports: + - "9092:9092" + - "29092:29092" + volumes: + - kafka-data:/bitnami/kafka + depends_on: + - zookeeper + networks: + - app-network + + akhq: + image: tchiotludo/akhq:latest + container_name: akhq + ports: + - "8080:8080" + environment: + AKHQ_CONFIGURATION: | + akhq: + connections: + docker-kafka-server: + properties: + bootstrap.servers: "kafka:9092" + networks: + - app-network + +networks: + app-network: + driver: bridge + volumes: mongo-data: driver: local redis-data: driver: local + zookeeper-data: + driver: local + kafka-data: + driver: local diff --git a/package.json b/package.json index 1451389..5b59e95 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "express": "^4.19.2", "graphql": "^16.8.1", "jsonwebtoken": "^9.0.2", + "kafkajs": "^2.2.4", "mongoose": "^8.4.1", "reflect-metadata": "^0.2.2", "type-graphql": "^2.0.0-rc.1", diff --git a/schema.graphql b/schema.graphql index 1b16a0b..a2ef421 100644 --- a/schema.graphql +++ b/schema.graphql @@ -3,15 +3,48 @@ # !!! DO NOT MODIFY THIS FILE BY YOURSELF !!! # ----------------------------------------------- +type Cart { + items: [Item!]! +} + +type Item { + description: String! + name: String! + price: Float! + productId: String! + quantity: Int! +} + +input ItemInput { + description: String! + name: String! + price: Float! + productId: String! + quantity: Int! +} + type Mutation { - createUser(email: String!, name: String!): User! + addItemToCart(cartId: String!, item: ItemInput!): Cart! + addRole(permissions: [String!]!, role: String!): Boolean! + assignRole(role: String!, userId: String!): Boolean! + createSample(name: String!): Sample! + login(email: String!, password: String!): String! + register(email: String!, name: String!, password: String!, role: String! = "user"): User! } type Query { + getCarts: [Cart!]! + samples: [Sample!]! users: [User!]! } +type Sample { + name: String! +} + type User { email: String! name: String! + password: String! + role: String! } \ No newline at end of file diff --git a/src/event/event-system.ts b/src/event/event-system.ts deleted file mode 100644 index 9b9921c..0000000 --- a/src/event/event-system.ts +++ /dev/null @@ -1,24 +0,0 @@ -type EventHandler = (...args: any[]) => void; - -class EventSystem { - private events: { [key: string]: EventHandler[] } = {}; - - on(event: string, handler: EventHandler) { - if (!this.events[event]) { - this.events[event] = []; - } - this.events[event].push(handler); - } - - off(event: string, handler: EventHandler) { - if (!this.events[event]) return; - this.events[event] = this.events[event].filter((h) => h !== handler); - } - - emit(event: string, ...args: any[]) { - if (!this.events[event]) return; - this.events[event].forEach((handler) => handler(...args)); - } -} - -export const eventSystem = new EventSystem(); diff --git a/src/event/kafka-event-service.ts b/src/event/kafka-event-service.ts new file mode 100644 index 0000000..799543f --- /dev/null +++ b/src/event/kafka-event-service.ts @@ -0,0 +1,84 @@ +import { Kafka, type Producer, type Consumer } from 'kafkajs'; +import { Container, Service } from 'typedi'; + +@Service() +class KafkaEventService { + private kafka: Kafka; + private readonly producer: Producer; + private readonly consumer: Consumer; + private eventHandlers: { [key: string]: ((message: any) => void)[] } = {}; + private topics: string[] = []; + + constructor() { + this.kafka = new Kafka({ + clientId: 'my-app', + brokers: [process.env.KAFKA_BROKER || 'localhost:29092'], // Use environment variable or default to localhost + }); + + this.producer = this.kafka.producer(); + this.consumer = this.kafka.consumer({ groupId: 'sample-group' }); + + this.initialize(); + } + + private async initialize() { + await this.retryConnect(this.producer.connect.bind(this.producer), 5); + await this.retryConnect(this.consumer.connect.bind(this.consumer), 5); + + for (const topic of this.topics) { + await this.consumer.subscribe({ topic, fromBeginning: true }); + } + + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + if (message.value) { + const handlers = this.eventHandlers[topic] || []; + const parsedMessage = JSON.parse(message.value.toString()); + handlers.forEach(handler => handler(parsedMessage)); + } + }, + }); + } + + private async retryConnect(connectFn: () => Promise, retries: number) { + for (let i = 0; i < retries; i++) { + try { + await connectFn(); + return; + } catch (err) { + if (err instanceof Error) { + console.error(`Connection attempt ${i + 1} failed: ${err.message}`); + } else { + console.error(`Connection attempt ${i + 1} failed: ${err}`); + } + if (i === retries - 1) throw err; + await new Promise(res => setTimeout(res, 2000)); // Wait for 2 seconds before retrying + } + } + } + + registerTopic(topic: string) { + if (!this.topics.includes(topic)) { + this.topics.push(topic); + } + } + + async emitEvent(topic: string, message: any) { + await this.producer.send({ + topic, + messages: [{ value: JSON.stringify(message) }], + }); + } + + subscribeToEvent(topic: string, handler: (message: any) => void) { + if (!this.eventHandlers[topic]) { + this.eventHandlers[topic] = []; + } + this.eventHandlers[topic].push(handler); + } +} + +// Register the service with typedi +Container.set(KafkaEventService, new KafkaEventService()); + +export default KafkaEventService; diff --git a/src/plugins/plugin-loader.ts b/src/plugins/plugin-loader.ts index d51d3ad..0309543 100644 --- a/src/plugins/plugin-loader.ts +++ b/src/plugins/plugin-loader.ts @@ -2,7 +2,7 @@ import { Container } from 'typedi'; import { GraphQLSchema } from 'graphql'; import { buildSchema, type NonEmptyArray } from 'type-graphql'; import { statSync } from 'fs'; -import { join } from 'path'; +import path, { join } from 'path'; import logger from '../config/logger'; import mongoose, { Schema } from 'mongoose'; import { type GlobalContext } from './global-context'; @@ -101,6 +101,7 @@ class PluginLoader { return await buildSchema({ resolvers: allResolvers as unknown as NonEmptyArray, container: Container, + emitSchemaFile: path.resolve(__dirname, '../../schema.graphql'), }); } catch (error) { logger.error(`Error building schema: ${error}`, loggerCtx); diff --git a/src/plugins/sample-plugin/index.ts b/src/plugins/sample-plugin/index.ts index e560cf1..2407abc 100644 --- a/src/plugins/sample-plugin/index.ts +++ b/src/plugins/sample-plugin/index.ts @@ -1,16 +1,34 @@ +import { Container } from 'typedi'; +import { getModelForClass } from '@typegoose/typegoose'; +import { type GlobalContext } from '../global-context'; +import { Sample } from './models/sample'; import { SampleResolver } from './resolvers/sample-resolver'; -import type { Plugin } from '../plugin-interface'; -import FunctionRegistry from '../function-registry'; +import { SampleService } from './services/sample-service'; +import KafkaEventService from '../../event/kafka-event-service'; -const samplePlugin: Plugin = { - name: 'Sample Plugin', - type: 'sample', +export default { + name: 'sample-plugin', + type: 'demonstration', resolvers: [SampleResolver], - register: (container: any) => { - // Perform any additional registration if necessary - const functionRegistry = FunctionRegistry.getInstance(); - functionRegistry.registerFunction('sample', () => console.log('Sample function called')); + register(container: typeof Container, context: GlobalContext) { + const SampleModel = getModelForClass(Sample); + context.models['Sample'] = { schema: SampleModel.schema, model: SampleModel }; + container.set('SampleModel', SampleModel); + + // Register SampleService and KafkaEventService with typedi + container.set(SampleService, new SampleService(Container.get(KafkaEventService))); + + // Ensure SampleResolver is added to context resolvers + context.resolvers['Sample'] = [SampleResolver]; + + // Register the topic with KafkaEventService + const eventService = Container.get(KafkaEventService); + eventService.registerTopic('sampleCreated'); + + // Set up event handlers using the centralized event service + eventService.subscribeToEvent('sampleCreated', (sample) => { + console.log('Sample created:', sample); + // Additional handling logic here + }); }, }; - -export default samplePlugin; diff --git a/src/plugins/sample-plugin/entities/sample.ts b/src/plugins/sample-plugin/models/sample.ts similarity index 100% rename from src/plugins/sample-plugin/entities/sample.ts rename to src/plugins/sample-plugin/models/sample.ts diff --git a/src/plugins/sample-plugin/resolvers/sample-resolver.ts b/src/plugins/sample-plugin/resolvers/sample-resolver.ts index bfead58..e183b27 100644 --- a/src/plugins/sample-plugin/resolvers/sample-resolver.ts +++ b/src/plugins/sample-plugin/resolvers/sample-resolver.ts @@ -1,14 +1,15 @@ import { Resolver, Query, Mutation, Arg } from 'type-graphql'; import { Inject, Service } from 'typedi'; -import { Sample } from '../entities/sample'; +import { Sample } from '../models/sample'; import { SampleService } from '../services/sample-service'; import FunctionRegistry from '../../function-registry'; @Service() @Resolver() export class SampleResolver { - @Inject(() => SampleService) - private readonly sampleService!: SampleService; + constructor( + @Inject(() => SampleService) private readonly sampleService: SampleService + ) {} @Query(() => [Sample]) async samples() { @@ -23,7 +24,6 @@ export class SampleResolver { const functionRegistry = FunctionRegistry.getInstance(); const userFunctions = functionRegistry.getFunctionsOfType('user'); userFunctions.forEach((fn) => fn()); - return sample; } } diff --git a/src/plugins/sample-plugin/services/sample-service.ts b/src/plugins/sample-plugin/services/sample-service.ts index 424d4dc..f147dd1 100644 --- a/src/plugins/sample-plugin/services/sample-service.ts +++ b/src/plugins/sample-plugin/services/sample-service.ts @@ -1,14 +1,20 @@ +// src/services/sample-service.ts import { Service } from 'typedi'; -import { Sample, SampleModel } from '../entities/sample'; +import { Sample, SampleModel } from '../models/sample'; +import KafkaEventService from '../../../event/kafka-event-service'; @Service() export class SampleService { + constructor(private eventService: KafkaEventService) {} + async getAllSamples(): Promise { return SampleModel.find().exec(); } async createSample(name: string): Promise { const sample = new SampleModel({ name }); - return sample.save(); + const savedSample = await sample.save(); + await this.eventService.emitEvent('sampleCreated', savedSample); // Emit event using the centralized service + return savedSample; } }