Skip to content

Commit

Permalink
Merge pull request #18 from Phoenix-Commerce/sample-plugin
Browse files Browse the repository at this point in the history
feat: make sample plugin show off (and also test) some of the other features
  • Loading branch information
brent-hoover authored Jun 16, 2024
2 parents 45bcb55 + 0d342b0 commit fd59d5d
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 43 deletions.
Binary file modified bun.lockb
Binary file not shown.
54 changes: 54 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,62 @@ services:
volumes:
- redis-data:/data

zookeeper:
image: bitnami/zookeeper:latest
container_name: zookeeper
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ports:
- "2181:2181"
volumes:
- zookeeper-data:/bitnami/zookeeper
networks:
- app-network

kafka:
image: bitnami/kafka:latest
container_name: kafka
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- ALLOW_PLAINTEXT_LISTENER=yes
ports:
- "9092:9092"
- "29092:29092"
volumes:
- kafka-data:/bitnami/kafka
depends_on:
- zookeeper
networks:
- app-network

akhq:
image: tchiotludo/akhq:latest
container_name: akhq
ports:
- "8080:8080"
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:9092"
networks:
- app-network

networks:
app-network:
driver: bridge

volumes:
mongo-data:
driver: local
redis-data:
driver: local
zookeeper-data:
driver: local
kafka-data:
driver: local
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"express": "^4.19.2",
"graphql": "^16.8.1",
"jsonwebtoken": "^9.0.2",
"kafkajs": "^2.2.4",
"mongoose": "^8.4.1",
"reflect-metadata": "^0.2.2",
"type-graphql": "^2.0.0-rc.1",
Expand Down
35 changes: 34 additions & 1 deletion schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,48 @@
# !!! DO NOT MODIFY THIS FILE BY YOURSELF !!!
# -----------------------------------------------

type Cart {
items: [Item!]!
}

type Item {
description: String!
name: String!
price: Float!
productId: String!
quantity: Int!
}

input ItemInput {
description: String!
name: String!
price: Float!
productId: String!
quantity: Int!
}

type Mutation {
createUser(email: String!, name: String!): User!
addItemToCart(cartId: String!, item: ItemInput!): Cart!
addRole(permissions: [String!]!, role: String!): Boolean!
assignRole(role: String!, userId: String!): Boolean!
createSample(name: String!): Sample!
login(email: String!, password: String!): String!
register(email: String!, name: String!, password: String!, role: String! = "user"): User!
}

type Query {
getCarts: [Cart!]!
samples: [Sample!]!
users: [User!]!
}

type Sample {
name: String!
}

type User {
email: String!
name: String!
password: String!
role: String!
}
24 changes: 0 additions & 24 deletions src/event/event-system.ts

This file was deleted.

84 changes: 84 additions & 0 deletions src/event/kafka-event-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { Kafka, type Producer, type Consumer } from 'kafkajs';
import { Container, Service } from 'typedi';

@Service()
class KafkaEventService {
private kafka: Kafka;
private readonly producer: Producer;
private readonly consumer: Consumer;
private eventHandlers: { [key: string]: ((message: any) => void)[] } = {};
private topics: string[] = [];

constructor() {
this.kafka = new Kafka({
clientId: 'my-app',
brokers: [process.env.KAFKA_BROKER || 'localhost:29092'], // Use environment variable or default to localhost
});

this.producer = this.kafka.producer();
this.consumer = this.kafka.consumer({ groupId: 'sample-group' });

this.initialize();
}

private async initialize() {
await this.retryConnect(this.producer.connect.bind(this.producer), 5);
await this.retryConnect(this.consumer.connect.bind(this.consumer), 5);

for (const topic of this.topics) {
await this.consumer.subscribe({ topic, fromBeginning: true });
}

await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (message.value) {
const handlers = this.eventHandlers[topic] || [];
const parsedMessage = JSON.parse(message.value.toString());
handlers.forEach(handler => handler(parsedMessage));
}
},
});
}

private async retryConnect(connectFn: () => Promise<void>, retries: number) {
for (let i = 0; i < retries; i++) {
try {
await connectFn();
return;
} catch (err) {
if (err instanceof Error) {
console.error(`Connection attempt ${i + 1} failed: ${err.message}`);
} else {
console.error(`Connection attempt ${i + 1} failed: ${err}`);
}
if (i === retries - 1) throw err;
await new Promise(res => setTimeout(res, 2000)); // Wait for 2 seconds before retrying
}
}
}

registerTopic(topic: string) {
if (!this.topics.includes(topic)) {
this.topics.push(topic);
}
}

async emitEvent(topic: string, message: any) {
await this.producer.send({
topic,
messages: [{ value: JSON.stringify(message) }],
});
}

subscribeToEvent(topic: string, handler: (message: any) => void) {
if (!this.eventHandlers[topic]) {
this.eventHandlers[topic] = [];
}
this.eventHandlers[topic].push(handler);
}
}

// Register the service with typedi
Container.set(KafkaEventService, new KafkaEventService());

export default KafkaEventService;
3 changes: 2 additions & 1 deletion src/plugins/plugin-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Container } from 'typedi';
import { GraphQLSchema } from 'graphql';
import { buildSchema, type NonEmptyArray } from 'type-graphql';
import { statSync } from 'fs';
import { join } from 'path';
import path, { join } from 'path';
import logger from '../config/logger';
import mongoose, { Schema } from 'mongoose';
import { type GlobalContext } from './global-context';
Expand Down Expand Up @@ -101,6 +101,7 @@ class PluginLoader {
return await buildSchema({
resolvers: allResolvers as unknown as NonEmptyArray<Function>,
container: Container,
emitSchemaFile: path.resolve(__dirname, '../../schema.graphql'),
});
} catch (error) {
logger.error(`Error building schema: ${error}`, loggerCtx);
Expand Down
40 changes: 29 additions & 11 deletions src/plugins/sample-plugin/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
import { Container } from 'typedi';
import { getModelForClass } from '@typegoose/typegoose';
import { type GlobalContext } from '../global-context';
import { Sample } from './models/sample';
import { SampleResolver } from './resolvers/sample-resolver';
import type { Plugin } from '../plugin-interface';
import FunctionRegistry from '../function-registry';
import { SampleService } from './services/sample-service';
import KafkaEventService from '../../event/kafka-event-service';

const samplePlugin: Plugin = {
name: 'Sample Plugin',
type: 'sample',
export default {
name: 'sample-plugin',
type: 'demonstration',
resolvers: [SampleResolver],
register: (container: any) => {
// Perform any additional registration if necessary
const functionRegistry = FunctionRegistry.getInstance();
functionRegistry.registerFunction('sample', () => console.log('Sample function called'));
register(container: typeof Container, context: GlobalContext) {
const SampleModel = getModelForClass(Sample);
context.models['Sample'] = { schema: SampleModel.schema, model: SampleModel };
container.set('SampleModel', SampleModel);

// Register SampleService and KafkaEventService with typedi
container.set(SampleService, new SampleService(Container.get(KafkaEventService)));

// Ensure SampleResolver is added to context resolvers
context.resolvers['Sample'] = [SampleResolver];

// Register the topic with KafkaEventService
const eventService = Container.get(KafkaEventService);
eventService.registerTopic('sampleCreated');

// Set up event handlers using the centralized event service
eventService.subscribeToEvent('sampleCreated', (sample) => {
console.log('Sample created:', sample);
// Additional handling logic here
});
},
};

export default samplePlugin;
File renamed without changes.
8 changes: 4 additions & 4 deletions src/plugins/sample-plugin/resolvers/sample-resolver.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { Resolver, Query, Mutation, Arg } from 'type-graphql';
import { Inject, Service } from 'typedi';
import { Sample } from '../entities/sample';
import { Sample } from '../models/sample';
import { SampleService } from '../services/sample-service';
import FunctionRegistry from '../../function-registry';

@Service()
@Resolver()
export class SampleResolver {
@Inject(() => SampleService)
private readonly sampleService!: SampleService;
constructor(
@Inject(() => SampleService) private readonly sampleService: SampleService
) {}

@Query(() => [Sample])
async samples() {
Expand All @@ -23,7 +24,6 @@ export class SampleResolver {
const functionRegistry = FunctionRegistry.getInstance();
const userFunctions = functionRegistry.getFunctionsOfType('user');
userFunctions.forEach((fn) => fn());

return sample;
}
}
10 changes: 8 additions & 2 deletions src/plugins/sample-plugin/services/sample-service.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// src/services/sample-service.ts
import { Service } from 'typedi';
import { Sample, SampleModel } from '../entities/sample';
import { Sample, SampleModel } from '../models/sample';
import KafkaEventService from '../../../event/kafka-event-service';

@Service()
export class SampleService {
constructor(private eventService: KafkaEventService) {}

async getAllSamples(): Promise<Sample[]> {
return SampleModel.find().exec();
}

async createSample(name: string): Promise<Sample> {
const sample = new SampleModel({ name });
return sample.save();
const savedSample = await sample.save();
await this.eventService.emitEvent('sampleCreated', savedSample); // Emit event using the centralized service
return savedSample;
}
}

0 comments on commit fd59d5d

Please sign in to comment.