From 835a81f0c953d5ab2a01611d277478d5b78aa8b0 Mon Sep 17 00:00:00 2001
From: Mathias Lundell <lundell.mathias@gmail.com>
Date: Fri, 27 Aug 2021 14:18:01 +0200
Subject: [PATCH] feat: expose AmqpConnectionManagerClass

The naming is strange because I don't want to introduce any
breaking change.

Note that jest complains about leaking resources due to:
https://github.com/squaremo/amqp.node/pull/584
---
 src/AmqpConnectionManager.ts      | 41 +++++++++++++++++++++++++-
 src/index.ts                      |  8 +++++-
 test/AmqpConnectionManagerTest.ts | 48 ++++++++++++++++++++++++-------
 test/fixtures.ts                  | 11 ++++++-
 test/importTest.ts                |  6 +++-
 test/integrationTest.ts           | 15 +++++++++-
 6 files changed, 113 insertions(+), 16 deletions(-)

diff --git a/src/AmqpConnectionManager.ts b/src/AmqpConnectionManager.ts
index 8eeaf01..cf6ec12 100644
--- a/src/AmqpConnectionManager.ts
+++ b/src/AmqpConnectionManager.ts
@@ -1,5 +1,5 @@
 import amqp, { Connection } from 'amqplib';
-import { EventEmitter } from 'events';
+import { EventEmitter, once } from 'events';
 import { TcpSocketConnectOpts } from 'net';
 import pb from 'promise-breaker';
 import { ConnectionOptions } from 'tls';
@@ -82,6 +82,8 @@ export interface IAmqpConnectionManager {
     addListener(event: 'unblocked', listener: () => void): this;
     addListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this;
 
+    listeners(eventName: string | symbol): any;
+
     on(event: string, listener: (...args: any[]) => void): this;
     on(event: 'connect', listener: ConnectListener): this;
     on(event: 'blocked', listener: (arg: { reason: string }) => void): this;
@@ -108,6 +110,8 @@ export interface IAmqpConnectionManager {
 
     removeListener(event: string, listener: (...args: any[]) => void): this;
 
+    connect(options?: { timeout?: number }): Promise<void>;
+    reconnect(): void;
     createChannel(options?: CreateChannelOpts): ChannelWrapper;
     close(): Promise<void>;
     isConnected(): boolean;
@@ -196,8 +200,43 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
         this.setMaxListeners(0);
 
         this._findServers = options.findServers || (() => Promise.resolve(urls));
+    }
 
+    /**
+     * Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the
+     * reconnect attempts will continue in the background.
+     * @param [options={}] -
+     * @param [options.timeout] - Time to wait for initial connect
+     */
+    async connect({ timeout }: { timeout?: number } = {}): Promise<void> {
         this._connect();
+
+        let reject: (reason?: any) => void;
+        const onDisconnect = ({ err }: { err: any }) => {
+            // Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials.
+            if (err.isOperational) {
+                reject(err);
+            }
+        };
+
+        try {
+            await Promise.race([
+                once(this, 'connect'),
+                new Promise((_resolve, innerReject) => {
+                    reject = innerReject;
+                    this.on('disconnect', onDisconnect);
+                }),
+                ...(timeout
+                    ? [
+                          wait(timeout).promise.then(() => {
+                              throw new Error('amqp-connection-manager: connect timeout');
+                          }),
+                      ]
+                    : []),
+            ]);
+        } finally {
+            this.removeListener('disconnect', onDisconnect);
+        }
     }
 
     // `options` here are any options that can be passed to ChannelWrapper.
diff --git a/src/index.ts b/src/index.ts
index 7e32324..01e7224 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -15,9 +15,15 @@ export function connect(
     urls: ConnectionUrl | ConnectionUrl[] | undefined | null,
     options?: AmqpConnectionManagerOptions
 ): IAmqpConnectionManager {
-    return new AmqpConnectionManager(urls, options);
+    const conn = new AmqpConnectionManager(urls, options);
+    conn.connect().catch(() => {
+        /* noop */
+    });
+    return conn;
 }
 
+export { AmqpConnectionManager as AmqpConnectionManagerClass };
+
 const amqp = { connect };
 
 export default amqp;
diff --git a/test/AmqpConnectionManagerTest.ts b/test/AmqpConnectionManagerTest.ts
index 560a60c..47dd330 100644
--- a/test/AmqpConnectionManagerTest.ts
+++ b/test/AmqpConnectionManagerTest.ts
@@ -27,6 +27,7 @@ describe('AmqpConnectionManager', function () {
 
     it('should establish a connection to a broker', async () => {
         amqp = new AmqpConnectionManager('amqp://localhost');
+        amqp.connect();
         const [{ connection, url }] = await once(amqp, 'connect');
         expect(url, 'url').to.equal('amqp://localhost');
         expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -37,6 +38,7 @@ describe('AmqpConnectionManager', function () {
             protocol: 'amqp',
             hostname: 'localhost',
         });
+        amqp.connect();
         const [{ connection, url }] = await once(amqp, 'connect');
         expect(url, 'url').to.eql({
             protocol: 'amqp',
@@ -51,7 +53,7 @@ describe('AmqpConnectionManager', function () {
 
     it('should establish a url object based connection to a broker', async () => {
         amqp = new AmqpConnectionManager({ url: 'amqp://localhost' });
-
+        amqp.connect();
         const [{ connection, url }] = await once(amqp, 'connect');
         expect(url, 'url').to.equal('amqp://localhost');
         expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -59,6 +61,7 @@ describe('AmqpConnectionManager', function () {
 
     it('should close connection to a broker', async () => {
         amqp = new AmqpConnectionManager('amqp://localhost');
+        amqp.connect();
         const [{ connection, url }] = await once(amqp, 'connect');
         expect(url, 'url').to.equal('amqp://localhost');
         expect((connection as any).url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -77,6 +80,7 @@ describe('AmqpConnectionManager', function () {
         let connected = false;
 
         amqp = new AmqpConnectionManager('amqp://localhost');
+        amqp.connect();
         // Connection should not yet be established
         expect(amqp.connection, 'current connection').to.equal(undefined);
         // Connection should be pending though
@@ -123,6 +127,7 @@ describe('AmqpConnectionManager', function () {
                 return Promise.resolve('amqp://localhost');
             },
         });
+        amqp.connect();
         const [{ connection, url }] = await once(amqp, 'connect');
         expect(url, 'url').to.equal('amqp://localhost');
         expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -134,6 +139,7 @@ describe('AmqpConnectionManager', function () {
                 return Promise.resolve({ url: 'amqp://localhost' });
             },
         });
+        amqp.connect();
         const [{ connection, url }] = await once(amqp, 'connect');
         expect(url, 'url').to.equal('amqp://localhost');
         expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -145,13 +151,29 @@ describe('AmqpConnectionManager', function () {
                 return Promise.resolve(null);
             },
         });
+        amqp.connect();
         const [{ err }] = await once(amqp, 'disconnect');
         expect(err.message).to.contain('No servers found');
         return amqp?.close();
     });
 
+    it('should timeout connect', async () => {
+        jest.spyOn(origAmpq, 'connect').mockImplementation((): any => {
+            return promiseTools.delay(200);
+        });
+        amqp = new AmqpConnectionManager('amqp://localhost');
+        let err;
+        try {
+            await amqp.connect({ timeout: 0.1 });
+        } catch (error) {
+            err = error;
+        }
+        expect(err.message).to.equal('amqp-connection-manager: connect timeout');
+    });
+
     it('should work with a URL with a query', async () => {
         amqp = new AmqpConnectionManager('amqp://localhost?frameMax=0x1000');
+        amqp.connect();
         const [{ connection }] = await once(amqp, 'connect');
         expect(connection.url, 'connection.url').to.equal(
             'amqp://localhost?frameMax=0x1000&heartbeat=5'
@@ -171,6 +193,7 @@ describe('AmqpConnectionManager', function () {
         amqp = new AmqpConnectionManager(['amqp://rabbit1', 'amqp://rabbit2'], {
             heartbeatIntervalInSeconds: 0.01,
         });
+        amqp.connect();
 
         let disconnectEventsSeen = 0;
         amqp.on('disconnect', function () {
@@ -196,10 +219,10 @@ describe('AmqpConnectionManager', function () {
         let disconnectsSeen = 0;
         amqp.on('disconnect', () => disconnectsSeen++);
 
-        await once(amqp, 'connect');
+        await amqp.connect();
         amqplib.kill();
 
-        await once(amqp, 'connect');
+        await amqp.connect();
         expect(disconnectsSeen).to.equal(1);
     });
 
@@ -211,7 +234,7 @@ describe('AmqpConnectionManager', function () {
         let disconnectsSeen = 0;
         amqp.on('disconnect', () => disconnectsSeen++);
 
-        await once(amqp, 'connect');
+        await amqp.connect();
 
         // Close the connection nicely
         amqplib.simulateRemoteClose();
@@ -222,6 +245,7 @@ describe('AmqpConnectionManager', function () {
 
     it('should know if it is connected or not', async () => {
         amqp = new AmqpConnectionManager('amqp://localhost');
+        amqp.connect();
 
         expect(amqp.isConnected()).to.be.false;
 
@@ -231,7 +255,7 @@ describe('AmqpConnectionManager', function () {
 
     it('should be able to manually reconnect', async () => {
         amqp = new AmqpConnectionManager('amqp://localhost');
-        await once(amqp, 'connect');
+        await amqp.connect();
 
         amqp.reconnect();
         await once(amqp, 'disconnect');
@@ -240,13 +264,14 @@ describe('AmqpConnectionManager', function () {
 
     it('should throw on manual reconnect after close', async () => {
         amqp = new AmqpConnectionManager('amqp://localhost');
-        await once(amqp, 'connect');
-        await amqp.close()
-        expect(amqp.reconnect).to.throw()
-    })
+        await amqp.connect();
+        await amqp.close();
+        expect(amqp.reconnect).to.throw();
+    });
 
     it('should create and clean up channel wrappers', async function () {
         amqp = new AmqpConnectionManager('amqp://localhost');
+        await amqp.connect();
         const channel = amqp.createChannel({ name: 'test-chan' });
 
         // Channel should register with connection manager
@@ -264,6 +289,7 @@ describe('AmqpConnectionManager', function () {
 
     it('should clean up channels on close', async function () {
         amqp = new AmqpConnectionManager('amqp://localhost');
+        await amqp.connect();
         amqp.createChannel({ name: 'test-chan' });
 
         // Channel should register with connection manager
@@ -286,7 +312,7 @@ describe('AmqpConnectionManager', function () {
 
         let connectsSeen = 0;
         amqp.on('connect', () => connectsSeen++);
-        await once(amqp, 'connect');
+        await amqp.connect();
 
         // Close the manager
         await amqp?.close();
@@ -308,7 +334,7 @@ describe('AmqpConnectionManager', function () {
 
         amqp.on('unblocked', () => unblockSeen++);
 
-        await once(amqp, 'connect');
+        await amqp.connect();
         // Close the connection nicely
         amqplib.simulateRemoteBlock();
         amqplib.simulateRemoteUnblock();
diff --git a/test/fixtures.ts b/test/fixtures.ts
index e9e4ef6..ff75607 100644
--- a/test/fixtures.ts
+++ b/test/fixtures.ts
@@ -2,7 +2,7 @@
 /* eslint-disable @typescript-eslint/explicit-module-boundary-types */
 
 import { Connection, Message, Options, Replies } from 'amqplib';
-import { EventEmitter } from 'events';
+import { EventEmitter, once } from 'events';
 import { IAmqpConnectionManager } from '../src/AmqpConnectionManager';
 import ChannelWrapper, { CreateChannelOpts } from '../src/ChannelWrapper';
 
@@ -194,6 +194,15 @@ export class FakeAmqpConnectionManager extends EventEmitter implements IAmqpConn
         return 0;
     }
 
+    async connect(): Promise<void> {
+        await Promise.all([once(this, 'connect'), this.simulateConnect()]);
+    }
+
+    reconnect(): void {
+        this.simulateDisconnect();
+        this.simulateConnect();
+    }
+
     isConnected() {
         return this.connected;
     }
diff --git a/test/importTest.ts b/test/importTest.ts
index 1a35464..d8f6035 100644
--- a/test/importTest.ts
+++ b/test/importTest.ts
@@ -1,9 +1,13 @@
 import { expect } from 'chai';
-import amqp from '../src';
+import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src';
 
 describe('import test', function () {
     it('should let you import as default (#51)', function () {
         expect(amqp).to.exist;
         expect(amqp.connect).to.exist;
     });
+
+    it('should let you import class', function () {
+        new AmqpConnectionManager('url');
+    });
 });
diff --git a/test/integrationTest.ts b/test/integrationTest.ts
index ed589b9..98c787d 100644
--- a/test/integrationTest.ts
+++ b/test/integrationTest.ts
@@ -3,7 +3,7 @@ import chai from 'chai';
 import chaiJest from 'chai-jest';
 import pEvent from 'p-event';
 import { defer, timeout } from 'promise-tools';
-import amqp from '../src';
+import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src';
 import { IAmqpConnectionManager } from '../src/AmqpConnectionManager';
 
 chai.use(chaiJest);
@@ -69,6 +69,19 @@ describe('Integration tests', () => {
         await timeout(pEvent(connection, 'connect'), 3000);
     });
 
+    // This test might cause jest to complain about leaked resources due to the bug described and fixed by:
+    // https://github.com/squaremo/amqp.node/pull/584
+    it('should throw on awaited connect with wrong password', async () => {
+        connection = new AmqpConnectionManager('amqp://guest:wrong@localhost');
+        let err;
+        try {
+            await connection.connect();
+        } catch (error) {
+            err = error;
+        }
+        expect(err.message).to.contain('ACCESS-REFUSED');
+    });
+
     it('send and receive messages', async () => {
         const queueName = 'testQueue1';
         const content = `hello world - ${Date.now()}`;