Skip to content

Commit

Permalink
Merge pull request #9425 from nestjs/fix/multiple-rmq-client-urls
Browse files Browse the repository at this point in the history
fix(microservices): multiple rmq client urls issue #9364
  • Loading branch information
kamilmysliwiec authored Apr 7, 2022
2 parents 53d88e5 + e971856 commit 45872ae
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
7 changes: 7 additions & 0 deletions integration/microservices/e2e/sum-rmq.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ describe('RabbitMQ transport', () => {
.expect(200, '15');
});

it(`/POST (multiple-urls)`, () => {
return request(server)
.post('/multiple-urls')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
}).timeout(10000);

it(`/POST (event notification)`, done => {
request(server)
.post('/notify')
Expand Down
20 changes: 20 additions & 0 deletions integration/microservices/src/rmq/rmq.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ export class RMQController {
.reduce(async (a, b) => (await a) && b);
}

@Post('multiple-urls')
@HttpCode(200)
multipleUrls(@Body() data: number[]) {
const clientWithMultipleUrls = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5671`, `amqp://localhost:5672`],
queue: 'test',
queueOptions: { durable: false },
socketOptions: { noDelay: true },
},
});
return clientWithMultipleUrls.send<number>({ cmd: 'multiple-urls' }, data);
}

@Post('record-builder-duplex')
@HttpCode(200)
useRecordBuilderDuplex(@Body() data: Record<string, any>) {
Expand Down Expand Up @@ -109,6 +124,11 @@ export class RMQController {
return from(data);
}

@MessagePattern({ cmd: 'multiple-urls' })
handleMultipleUrls(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}

@Post('notify')
async sendNotification(): Promise<any> {
return this.client.emit<number>('notification', true);
Expand Down
17 changes: 15 additions & 2 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
import { EventEmitter } from 'events';
import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs';
import { first, map, share, switchMap } from 'rxjs/operators';
import { first, map, retryWhen, scan, share, switchMap } from 'rxjs/operators';
import {
CONNECT_FAILED_EVENT,
DISCONNECTED_RMQ_MESSAGE,
Expand Down Expand Up @@ -118,7 +118,20 @@ export class ClientRMQ extends ClientProxy {
}),
);
const disconnect$ = eventToError(DISCONNECT_EVENT);
const connectFailed$ = eventToError(CONNECT_FAILED_EVENT);

const urls = this.getOptionsProp(this.options, 'urls', []);
const connectFailed$ = eventToError(CONNECT_FAILED_EVENT).pipe(
retryWhen(e =>
e.pipe(
scan((errorCount, error: any) => {
if (urls.indexOf(error.url) >= urls.length - 1) {
throw error;
}
return errorCount + 1;
}, 0),
),
),
);
return merge(source$, disconnect$, connectFailed$).pipe(first());
}

Expand Down

0 comments on commit 45872ae

Please sign in to comment.