From 25ad5b4b8038177ca986734e968d225796e50c32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Mon, 25 Nov 2024 14:11:38 +0100 Subject: [PATCH] refactor(core): tiny improvements --- .../external-svc/external-svc.controller.ts | 2 +- packages/core/injector/module.ts | 11 ++-- sample/03-microservices/src/main.ts | 63 +++++++++++++++++-- .../src/math/math.controller.ts | 16 +++-- .../03-microservices/src/math/math.module.ts | 12 +++- 5 files changed, 87 insertions(+), 17 deletions(-) diff --git a/integration/inspector/src/external-svc/external-svc.controller.ts b/integration/inspector/src/external-svc/external-svc.controller.ts index f2233620f6d..e1e9e224748 100644 --- a/integration/inspector/src/external-svc/external-svc.controller.ts +++ b/integration/inspector/src/external-svc/external-svc.controller.ts @@ -1,8 +1,8 @@ import { Controller } from '@nestjs/common'; import { MessagePattern, Payload } from '@nestjs/microservices'; -import { ExternalSvcService } from './external-svc.service'; import { CreateExternalSvcDto } from './dto/create-external-svc.dto'; import { UpdateExternalSvcDto } from './dto/update-external-svc.dto'; +import { ExternalSvcService } from './external-svc.service'; @Controller() export class ExternalSvcController { diff --git a/packages/core/injector/module.ts b/packages/core/injector/module.ts index 5396cd2e422..7063aa55294 100644 --- a/packages/core/injector/module.ts +++ b/packages/core/injector/module.ts @@ -254,7 +254,8 @@ export class Module { return this.addCustomProvider(provider, this._providers, enhancerSubtype); } - if (this.isTransientProvider(provider) && this.getProviderByKey(provider)) { + const isAlreadyDeclared = this._providers.has(provider); + if (this.isTransientProvider(provider) && isAlreadyDeclared) { return provider; } @@ -297,10 +298,6 @@ export class Module { ); } - private isTransientProvider(provider: Type): boolean { - return getClassScope(provider) === Scope.TRANSIENT; - } - public addCustomProvider( provider: | ClassProvider @@ -665,4 +662,8 @@ export class Module { const key = this.name?.toString() ?? this.token?.toString(); return key ? UuidFactory.get(`${prefix}_${key}`) : randomStringGenerator(); } + + private isTransientProvider(provider: Type): boolean { + return getClassScope(provider) === Scope.TRANSIENT; + } } diff --git a/sample/03-microservices/src/main.ts b/sample/03-microservices/src/main.ts index 45fedcdb4c5..e846b5416cb 100644 --- a/sample/03-microservices/src/main.ts +++ b/sample/03-microservices/src/main.ts @@ -1,5 +1,6 @@ import { NestFactory } from '@nestjs/core'; -import { MicroserviceOptions, Transport } from '@nestjs/microservices'; +import { Transport } from '@nestjs/microservices'; +import { Kafka } from '@nestjs/microservices/external/kafka.interface'; import { AppModule } from './app.module'; async function bootstrap() { @@ -15,13 +16,65 @@ async function bootstrap() { * */ const app = await NestFactory.create(AppModule); - app.connectMicroservice({ - transport: Transport.TCP, - options: { retryAttempts: 5, retryDelay: 3000 }, + const msvc = app.connectMicroservice({ + transport: Transport.KAFKA, + options: { + client: { + brokers: ['localhost:9092'], + }, + consumer: { + groupId: 'my-kafka-consumer', + }, + }, }); await app.startAllMicroservices(); - await app.listen(3001); + await app.listen(Math.floor(Math.random() * 1000) + 3000); console.log(`Application is running on: ${await app.getUrl()}`); + + const kafka = msvc.unwrap(); + const admin = kafka.admin(); + + const topicName = 'math'; + try { + const topicExists = await admin.fetchTopicMetadata({ + topics: [topicName], + }); + + console.log( + `Topic "${topicName}" already exists with ${topicExists.topics[0].partitions.length} partitions.`, + ); + + // Update "math.reply" topic to have 2 partitions if it has only 1 partition + const replyTopicName = 'math.reply'; + const replyTopicExists = await admin.fetchTopicMetadata({ + topics: [replyTopicName], + }); + if (replyTopicExists.topics[0].partitions.length === 1) { + await admin.createPartitions({ + topicPartitions: [ + { + topic: replyTopicName, + count: 2, + }, + ], + }); + console.log(`Topic "${replyTopicName}" updated with 2 partitions.`); + } + } catch (error) { + if (error.message.includes('does not host this topic-partition')) { + await admin.createTopics({ + topics: [ + { + topic: topicName, + numPartitions: 2, + }, + ], + }); + console.log(`Topic "${topicName}" created with 2 partitions.`); + } else { + console.error('Error creating topic:', error.message); + } + } } bootstrap(); diff --git a/sample/03-microservices/src/math/math.controller.ts b/sample/03-microservices/src/math/math.controller.ts index d1859ef46d1..1dfee4ae261 100644 --- a/sample/03-microservices/src/math/math.controller.ts +++ b/sample/03-microservices/src/math/math.controller.ts @@ -1,21 +1,27 @@ import { Controller, Get, Inject } from '@nestjs/common'; -import { ClientProxy, MessagePattern } from '@nestjs/microservices'; +import { ClientKafka, MessagePattern, Payload } from '@nestjs/microservices'; import { Observable } from 'rxjs'; import { MATH_SERVICE } from './math.constants'; @Controller() export class MathController { - constructor(@Inject(MATH_SERVICE) private readonly client: ClientProxy) {} + constructor(@Inject(MATH_SERVICE) private readonly client: ClientKafka) {} + + async onModuleInit() { + this.client.subscribeToResponseOf('sum'); + this.client.subscribeToResponseOf('math'); + await this.client.connect(); + } @Get() execute(): Observable { - const pattern = { cmd: 'sum' }; + const pattern = 'sum'; const data = [1, 2, 3, 4, 5]; return this.client.send(pattern, data); } - @MessagePattern({ cmd: 'sum' }) - sum(data: number[]): number { + @MessagePattern('sum') + sum(@Payload() data: number[]): number { return (data || []).reduce((a, b) => a + b); } } diff --git a/sample/03-microservices/src/math/math.module.ts b/sample/03-microservices/src/math/math.module.ts index d1f856e1f55..0b32f8b48ec 100644 --- a/sample/03-microservices/src/math/math.module.ts +++ b/sample/03-microservices/src/math/math.module.ts @@ -5,7 +5,17 @@ import { MathController } from './math.controller'; @Module({ imports: [ - ClientsModule.register([{ name: MATH_SERVICE, transport: Transport.TCP }]), + ClientsModule.register([ + { + name: MATH_SERVICE, + transport: Transport.KAFKA, + options: { + client: { + brokers: ['localhost:9092'], + }, + }, + }, + ]), ], controllers: [MathController], })