Skip to content

Commit

Permalink
refactor(core): tiny improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilmysliwiec committed Nov 25, 2024
1 parent 3324703 commit 25ad5b4
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { ExternalSvcService } from './external-svc.service';
import { CreateExternalSvcDto } from './dto/create-external-svc.dto';
import { UpdateExternalSvcDto } from './dto/update-external-svc.dto';
import { ExternalSvcService } from './external-svc.service';

@Controller()
export class ExternalSvcController {
Expand Down
11 changes: 6 additions & 5 deletions packages/core/injector/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ export class Module {
return this.addCustomProvider(provider, this._providers, enhancerSubtype);
}

if (this.isTransientProvider(provider) && this.getProviderByKey(provider)) {
const isAlreadyDeclared = this._providers.has(provider);
if (this.isTransientProvider(provider) && isAlreadyDeclared) {
return provider;
}

Expand Down Expand Up @@ -297,10 +298,6 @@ export class Module {
);
}

private isTransientProvider(provider: Type<any>): boolean {
return getClassScope(provider) === Scope.TRANSIENT;
}

public addCustomProvider(
provider:
| ClassProvider
Expand Down Expand Up @@ -665,4 +662,8 @@ export class Module {
const key = this.name?.toString() ?? this.token?.toString();
return key ? UuidFactory.get(`${prefix}_${key}`) : randomStringGenerator();
}

private isTransientProvider(provider: Type<any>): boolean {
return getClassScope(provider) === Scope.TRANSIENT;
}
}
63 changes: 58 additions & 5 deletions sample/03-microservices/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Transport } from '@nestjs/microservices';
import { Kafka } from '@nestjs/microservices/external/kafka.interface';
import { AppModule } from './app.module';

async function bootstrap() {
Expand All @@ -15,13 +16,65 @@ async function bootstrap() {
*
*/
const app = await NestFactory.create(AppModule);
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.TCP,
options: { retryAttempts: 5, retryDelay: 3000 },
const msvc = app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-kafka-consumer',
},
},
});

await app.startAllMicroservices();
await app.listen(3001);
await app.listen(Math.floor(Math.random() * 1000) + 3000);
console.log(`Application is running on: ${await app.getUrl()}`);

const kafka = msvc.unwrap<Kafka>();
const admin = kafka.admin();

const topicName = 'math';
try {
const topicExists = await admin.fetchTopicMetadata({
topics: [topicName],
});

console.log(
`Topic "${topicName}" already exists with ${topicExists.topics[0].partitions.length} partitions.`,
);

// Update "math.reply" topic to have 2 partitions if it has only 1 partition
const replyTopicName = 'math.reply';
const replyTopicExists = await admin.fetchTopicMetadata({
topics: [replyTopicName],
});
if (replyTopicExists.topics[0].partitions.length === 1) {
await admin.createPartitions({
topicPartitions: [
{
topic: replyTopicName,
count: 2,
},
],
});
console.log(`Topic "${replyTopicName}" updated with 2 partitions.`);
}
} catch (error) {
if (error.message.includes('does not host this topic-partition')) {
await admin.createTopics({
topics: [
{
topic: topicName,
numPartitions: 2,
},
],
});
console.log(`Topic "${topicName}" created with 2 partitions.`);
} else {
console.error('Error creating topic:', error.message);
}
}
}
bootstrap();
16 changes: 11 additions & 5 deletions sample/03-microservices/src/math/math.controller.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import { Controller, Get, Inject } from '@nestjs/common';
import { ClientProxy, MessagePattern } from '@nestjs/microservices';
import { ClientKafka, MessagePattern, Payload } from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { MATH_SERVICE } from './math.constants';

@Controller()
export class MathController {
constructor(@Inject(MATH_SERVICE) private readonly client: ClientProxy) {}
constructor(@Inject(MATH_SERVICE) private readonly client: ClientKafka) {}

async onModuleInit() {
this.client.subscribeToResponseOf('sum');
this.client.subscribeToResponseOf('math');
await this.client.connect();
}

@Get()
execute(): Observable<number> {
const pattern = { cmd: 'sum' };
const pattern = 'sum';
const data = [1, 2, 3, 4, 5];
return this.client.send<number>(pattern, data);
}

@MessagePattern({ cmd: 'sum' })
sum(data: number[]): number {
@MessagePattern('sum')
sum(@Payload() data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
}
12 changes: 11 additions & 1 deletion sample/03-microservices/src/math/math.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import { MathController } from './math.controller';

@Module({
imports: [
ClientsModule.register([{ name: MATH_SERVICE, transport: Transport.TCP }]),
ClientsModule.register([
{
name: MATH_SERVICE,
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
},
]),
],
controllers: [MathController],
})
Expand Down

0 comments on commit 25ad5b4

Please sign in to comment.