From af0e4001ff6dbaf07dbf6bc43dd0358ac98a1159 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Fri, 1 Apr 2022 16:51:46 +0200 Subject: [PATCH 1/2] fix(microservices): multiple rmq client urls issue #9364 --- integration/microservices/e2e/sum-rmq.spec.ts | 7 +++++++ .../microservices/src/rmq/rmq.controller.ts | 20 +++++++++++++++++++ packages/microservices/client/client-rmq.ts | 17 ++++++++++++++-- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/integration/microservices/e2e/sum-rmq.spec.ts b/integration/microservices/e2e/sum-rmq.spec.ts index f5f0e0a36ac..97ac90749a2 100644 --- a/integration/microservices/e2e/sum-rmq.spec.ts +++ b/integration/microservices/e2e/sum-rmq.spec.ts @@ -77,6 +77,13 @@ describe('RabbitMQ transport', () => { .expect(200, '15'); }); + it(`/POST (multiple-urls)`, () => { + return request(server) + .post('/multiple-urls') + .send([1, 2, 3, 4, 5]) + .expect(200, '15'); + }).timeout(10000); + it(`/POST (event notification)`, done => { request(server) .post('/notify') diff --git a/integration/microservices/src/rmq/rmq.controller.ts b/integration/microservices/src/rmq/rmq.controller.ts index 932bb46a9cc..07f8075e6b8 100644 --- a/integration/microservices/src/rmq/rmq.controller.ts +++ b/integration/microservices/src/rmq/rmq.controller.ts @@ -61,6 +61,21 @@ export class RMQController { .reduce(async (a, b) => (await a) && b); } + @Post('multiple-urls') + @HttpCode(200) + multipleUrls(@Body() data: number[]) { + const clientWithMultipleUrls = ClientProxyFactory.create({ + transport: Transport.RMQ, + options: { + urls: [`amqp://localhost:5671`, `amqp://localhost:5672`], + queue: 'test', + queueOptions: { durable: false }, + socketOptions: { noDelay: true }, + }, + }); + return clientWithMultipleUrls.send({ cmd: 'multiple-urls' }, data); + } + @Post('record-builder-duplex') @HttpCode(200) useRecordBuilderDuplex(@Body() data: Record) { @@ -109,6 +124,11 @@ export class RMQController { return from(data); } + @MessagePattern({ cmd: 'multiple-urls' }) + handleMultipleUrls(data: number[]): number { + return (data || []).reduce((a, b) => a + b); + } + @Post('notify') async sendNotification(): Promise { return this.client.emit('notification', true); diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index 27d52679c82..385bf5a353a 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -3,7 +3,7 @@ import { loadPackage } from '@nestjs/common/utils/load-package.util'; import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util'; import { EventEmitter } from 'events'; import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs'; -import { first, map, share, switchMap } from 'rxjs/operators'; +import { first, map, retryWhen, scan, share, switchMap } from 'rxjs/operators'; import { CONNECT_FAILED_EVENT, DISCONNECTED_RMQ_MESSAGE, @@ -118,7 +118,20 @@ export class ClientRMQ extends ClientProxy { }), ); const disconnect$ = eventToError(DISCONNECT_EVENT); - const connectFailed$ = eventToError(CONNECT_FAILED_EVENT); + + const urls = this.getOptionsProp(this.options, 'urls'); + const connectFailed$ = eventToError(CONNECT_FAILED_EVENT).pipe( + retryWhen(e => + e.pipe( + scan((errorCount, error: any) => { + if (urls.indexOf(error.url) >= urls.length - 1) { + throw error; + } + return errorCount + 1; + }, 0), + ), + ), + ); return merge(source$, disconnect$, connectFailed$).pipe(first()); } From e9718565557ba2ac641dacb730ce4c65eecc03e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Fri, 1 Apr 2022 16:53:53 +0200 Subject: [PATCH 2/2] fix(microservices): add default urls value --- packages/microservices/client/client-rmq.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index 385bf5a353a..a8b1e64fd9f 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -119,7 +119,7 @@ export class ClientRMQ extends ClientProxy { ); const disconnect$ = eventToError(DISCONNECT_EVENT); - const urls = this.getOptionsProp(this.options, 'urls'); + const urls = this.getOptionsProp(this.options, 'urls', []); const connectFailed$ = eventToError(CONNECT_FAILED_EVENT).pipe( retryWhen(e => e.pipe(