From a138be8c0e043c3af005da1e846922a99bc24f32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Thu, 4 Feb 2021 09:50:54 +0100 Subject: [PATCH 1/6] fix(websockets): support ws servers running on different paths --- .../server-and-event-streams-factory.ts} | 6 +- packages/websockets/interfaces/index.ts | 2 +- ...erver-and-event-streams-host.interface.ts} | 2 +- packages/websockets/package.json | 1 + packages/websockets/socket-module.ts | 2 +- packages/websockets/socket-server-provider.ts | 77 +++++++++++-------- packages/websockets/sockets-container.ts | 39 ++++++---- packages/websockets/test/container.spec.ts | 35 +++++---- .../server-and-event-streams-factory.spec.ts} | 6 +- .../test/socket-server-provider.spec.ts | 41 ++++++++-- packages/websockets/web-sockets-controller.ts | 4 +- 11 files changed, 136 insertions(+), 79 deletions(-) rename packages/websockets/{socket-events-host-factory.ts => factories/server-and-event-streams-factory.ts} (55%) rename packages/websockets/interfaces/{socket-events-host.interface.ts => server-and-event-streams-host.interface.ts} (72%) rename packages/websockets/test/{socket-events-host-factory.spec.ts => factories/server-and-event-streams-factory.spec.ts} (72%) diff --git a/packages/websockets/socket-events-host-factory.ts b/packages/websockets/factories/server-and-event-streams-factory.ts similarity index 55% rename from packages/websockets/socket-events-host-factory.ts rename to packages/websockets/factories/server-and-event-streams-factory.ts index 1c7d1946da7..fc7038e5280 100644 --- a/packages/websockets/socket-events-host-factory.ts +++ b/packages/websockets/factories/server-and-event-streams-factory.ts @@ -1,8 +1,8 @@ import { ReplaySubject, Subject } from 'rxjs'; -import { SocketEventsHost } from './interfaces/socket-events-host.interface'; +import { ServerAndEventStreamsHost } from '../interfaces/server-and-event-streams-host.interface'; -export class SocketEventsHostFactory { - public static create(server: T): SocketEventsHost { +export class ServerAndEventStreamsFactory { + public static create(server: T): ServerAndEventStreamsHost { const init = new ReplaySubject(); init.next(server); diff --git a/packages/websockets/interfaces/index.ts b/packages/websockets/interfaces/index.ts index 827282853b7..80428a5fb6d 100644 --- a/packages/websockets/interfaces/index.ts +++ b/packages/websockets/interfaces/index.ts @@ -1,5 +1,5 @@ export * from './gateway-metadata.interface'; export * from './hooks'; -export * from './socket-events-host.interface'; +export * from './server-and-event-streams-host.interface'; export * from './web-socket-server.interface'; export * from './ws-response.interface'; diff --git a/packages/websockets/interfaces/socket-events-host.interface.ts b/packages/websockets/interfaces/server-and-event-streams-host.interface.ts similarity index 72% rename from packages/websockets/interfaces/socket-events-host.interface.ts rename to packages/websockets/interfaces/server-and-event-streams-host.interface.ts index ba36ddfb5af..4b04a81e5dc 100644 --- a/packages/websockets/interfaces/socket-events-host.interface.ts +++ b/packages/websockets/interfaces/server-and-event-streams-host.interface.ts @@ -1,6 +1,6 @@ import { ReplaySubject, Subject } from 'rxjs'; -export interface SocketEventsHost { +export interface ServerAndEventStreamsHost { server: T; init: ReplaySubject; connection: Subject; diff --git a/packages/websockets/package.json b/packages/websockets/package.json index 04c4180cd2d..07a8a129f96 100644 --- a/packages/websockets/package.json +++ b/packages/websockets/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "iterare": "1.2.1", + "object-hash": "2.1.1", "tslib": "2.1.0" }, "devDependencies": { diff --git a/packages/websockets/socket-module.ts b/packages/websockets/socket-module.ts index 3a3626a2338..532c5180e63 100644 --- a/packages/websockets/socket-module.ts +++ b/packages/websockets/socket-module.ts @@ -86,7 +86,7 @@ export class SocketModule { if (!adapter) { return; } - const servers = this.socketsContainer.getAllSocketEventHosts(); + const servers = this.socketsContainer.getAll(); await Promise.all( iterate(servers.values()) .filter(({ server }) => server) diff --git a/packages/websockets/socket-server-provider.ts b/packages/websockets/socket-server-provider.ts index c050bf8fc24..f5e1c785e93 100644 --- a/packages/websockets/socket-server-provider.ts +++ b/packages/websockets/socket-server-provider.ts @@ -1,9 +1,8 @@ -import { addLeadingSlash } from '@nestjs/common/utils/shared.utils'; +import { addLeadingSlash, isString } from '@nestjs/common/utils/shared.utils'; import { ApplicationConfig } from '@nestjs/core/application-config'; -import { isString } from 'util'; +import { ServerAndEventStreamsFactory } from './factories/server-and-event-streams-factory'; import { GatewayMetadata } from './interfaces/gateway-metadata.interface'; -import { SocketEventsHost } from './interfaces/socket-events-host.interface'; -import { SocketEventsHostFactory } from './socket-events-host-factory'; +import { ServerAndEventStreamsHost } from './interfaces/server-and-event-streams-host.interface'; import { SocketsContainer } from './sockets-container'; export class SocketServerProvider { @@ -12,56 +11,70 @@ export class SocketServerProvider { private readonly applicationConfig: ApplicationConfig, ) {} - public scanForSocketServer( + public scanForSocketServer( options: T, port: number, - ): SocketEventsHost { - const socketEventsHost = this.socketsContainer.getSocketEventsHostByPort( + ): ServerAndEventStreamsHost { + const serverAndStreamsHost = this.socketsContainer.getOneByConfig({ port, - ); - return socketEventsHost - ? this.createWithNamespace(options, port, socketEventsHost) + path: options.path, + }); + if (serverAndStreamsHost && options.namespace) { + return this.decorateWithNamespace(options, port, serverAndStreamsHost); + } + return serverAndStreamsHost + ? serverAndStreamsHost : this.createSocketServer(options, port); } private createSocketServer( options: T, port: number, - ): SocketEventsHost { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { namespace, server, ...partialOptions } = options as any; + ): ServerAndEventStreamsHost { const adapter = this.applicationConfig.getIoAdapter(); + const { namespace, server, ...partialOptions } = options as Record< + string, + unknown + >; const ioServer = adapter.create(port, partialOptions); - const observableSocket = SocketEventsHostFactory.create(ioServer); + const serverAndEventStreamsHost = ServerAndEventStreamsFactory.create( + ioServer, + ); - this.socketsContainer.addSocketEventsHost(null, port, observableSocket); - return this.createWithNamespace(options, port, observableSocket); + this.socketsContainer.addOne( + { port, path: options.path }, + serverAndEventStreamsHost, + ); + if (!namespace) { + return serverAndEventStreamsHost; + } + return this.decorateWithNamespace(options, port, ioServer); } - private createWithNamespace( + private decorateWithNamespace( options: T, port: number, - socketEventsHost: SocketEventsHost, - ): SocketEventsHost { - const { namespace } = options; - if (!namespace) { - return socketEventsHost; - } + targetServer: unknown, + ): ServerAndEventStreamsHost { const namespaceServer = this.getServerOfNamespace( options, port, - socketEventsHost.server, + targetServer, ); - const eventsHost = SocketEventsHostFactory.create(namespaceServer); - this.socketsContainer.addSocketEventsHost(namespace, port, eventsHost); - return eventsHost; + const serverAndEventStreamsHost = ServerAndEventStreamsFactory.create( + namespaceServer, + ); + this.socketsContainer.addOne( + { port, path: options.path }, + serverAndEventStreamsHost, + ); + return serverAndEventStreamsHost; } - private getServerOfNamespace( - options: TOptions, - port: number, - server: TServer, - ) { + private getServerOfNamespace< + TOptions extends GatewayMetadata = any, + TServer = any + >(options: TOptions, port: number, server: TServer) { const adapter = this.applicationConfig.getIoAdapter(); return adapter.create(port, { ...options, diff --git a/packages/websockets/sockets-container.ts b/packages/websockets/sockets-container.ts index 267f305738d..1090150e823 100644 --- a/packages/websockets/sockets-container.ts +++ b/packages/websockets/sockets-container.ts @@ -1,31 +1,38 @@ -import { SocketEventsHost } from './interfaces'; +import * as hash from 'object-hash'; +import { GatewayMetadata, ServerAndEventStreamsHost } from './interfaces'; export class SocketsContainer { - private readonly socketEventHosts = new Map< + private readonly serverAndEventStreamsHosts = new Map< string | RegExp, - SocketEventsHost + ServerAndEventStreamsHost >(); - public getAllSocketEventHosts(): Map { - return this.socketEventHosts; + public getAll(): Map { + return this.serverAndEventStreamsHosts; } - public getSocketEventsHostByPort(port: number): SocketEventsHost { - return this.socketEventHosts.get(`${port}`); + public getOneByConfig( + options: T, + ): ServerAndEventStreamsHost { + const uniqueToken = this.generateHashByOptions(options); + return this.serverAndEventStreamsHosts.get(uniqueToken); } - public addSocketEventsHost( - namespace: string | RegExp, - port: number, - host: SocketEventsHost, + public addOne( + options: T, + host: ServerAndEventStreamsHost, ) { - this.socketEventHosts.set( - namespace ? `${namespace}:${port}` : `${port}`, - host, - ); + const uniqueToken = this.generateHashByOptions(options); + this.serverAndEventStreamsHosts.set(uniqueToken, host); } public clear() { - this.socketEventHosts.clear(); + this.serverAndEventStreamsHosts.clear(); + } + + private generateHashByOptions( + options: T, + ): string { + return hash(options, { ignoreUnknown: true }); } } diff --git a/packages/websockets/test/container.spec.ts b/packages/websockets/test/container.spec.ts index 57edac7feee..1947804f5dc 100644 --- a/packages/websockets/test/container.spec.ts +++ b/packages/websockets/test/container.spec.ts @@ -1,9 +1,9 @@ import { expect } from 'chai'; +import * as hash from 'object-hash'; import * as sinon from 'sinon'; import { SocketsContainer } from '../sockets-container'; describe('SocketsContainer', () => { - const namespace = 'test'; const port = 30; let instance: SocketsContainer; @@ -13,35 +13,42 @@ describe('SocketsContainer', () => { setSpy = sinon.spy(); getSpy = sinon.spy(); instance = new SocketsContainer(); - (instance as any).socketEventHosts = { + (instance as any).serverAndEventStreamsHosts = { get: getSpy, set: setSpy, }; }); describe('getSocketEventsHostByPort', () => { - it(`should call "socketEventHosts" get method with expected arguments`, () => { - instance.getSocketEventsHostByPort(port); - expect(getSpy.calledWith(port.toString())).to.be.true; + it(`should call "serverAndEventStreamsHosts" get method with expected arguments`, () => { + const config = { port, path: 'random' }; + instance.getOneByConfig(config); + + const token = hash(config); + expect(getSpy.calledWith(token)).to.be.true; }); }); - describe('addSocketEventsHost', () => { - it(`should call "socketEventHosts" set method with expected arguments`, () => { + describe('addOne', () => { + it(`should call "serverAndEventStreamsHosts" set method with expected arguments`, () => { const server = {}; - instance.addSocketEventsHost(namespace, port, server as any); - expect(setSpy.calledWith(`${namespace}:${port}`, server)).to.be.true; + const config = { port, path: 'random' }; + + instance.addOne(config, server as any); + + const token = hash(config); + expect(setSpy.calledWith(token, server)).to.be.true; }); }); - describe('getAllSocketEventHosts', () => { - it('should return "socketEventHosts"', () => { + describe('getAll', () => { + it('should return "serverAndEventStreamsHosts"', () => { const collection = ['test']; - (instance as any).socketEventHosts = collection; - expect(instance.getAllSocketEventHosts()).to.be.eq(collection); + (instance as any).serverAndEventStreamsHosts = collection; + expect(instance.getAll()).to.be.eq(collection); }); }); describe('clear', () => { it('should clear hosts collection', () => { const collection = { clear: sinon.spy() }; - (instance as any).socketEventHosts = collection; + (instance as any).serverAndEventStreamsHosts = collection; instance.clear(); expect(collection.clear.called).to.be.true; }); diff --git a/packages/websockets/test/socket-events-host-factory.spec.ts b/packages/websockets/test/factories/server-and-event-streams-factory.spec.ts similarity index 72% rename from packages/websockets/test/socket-events-host-factory.spec.ts rename to packages/websockets/test/factories/server-and-event-streams-factory.spec.ts index 1634d9c88f4..7d7e148cb0f 100644 --- a/packages/websockets/test/socket-events-host-factory.spec.ts +++ b/packages/websockets/test/factories/server-and-event-streams-factory.spec.ts @@ -1,12 +1,12 @@ import { expect } from 'chai'; import { ReplaySubject, Subject } from 'rxjs'; -import { SocketEventsHostFactory } from '../socket-events-host-factory'; +import { ServerAndEventStreamsFactory } from '../../factories/server-and-event-streams-factory'; -describe('SocketEventsHostFactory', () => { +describe('ServerAndEventStreamsFactory', () => { describe('create', () => { it(`should return expected observable socket object`, () => { const server = { test: 'test' }; - const result = SocketEventsHostFactory.create(server); + const result = ServerAndEventStreamsFactory.create(server); expect(result).to.have.keys('init', 'connection', 'disconnect', 'server'); expect(result.init instanceof ReplaySubject).to.be.true; diff --git a/packages/websockets/test/socket-server-provider.spec.ts b/packages/websockets/test/socket-server-provider.spec.ts index 57118c0fd0e..8cfe7fb29a9 100644 --- a/packages/websockets/test/socket-server-provider.spec.ts +++ b/packages/websockets/test/socket-server-provider.spec.ts @@ -1,9 +1,9 @@ import { ApplicationConfig } from '@nestjs/core/application-config'; import { expect } from 'chai'; import * as sinon from 'sinon'; +import { AbstractWsAdapter } from '../adapters/ws-adapter'; import { SocketServerProvider } from '../socket-server-provider'; import { SocketsContainer } from '../sockets-container'; -import { AbstractWsAdapter } from '../adapters/ws-adapter'; class NoopAdapter extends AbstractWsAdapter { public create(port: number, options?: any) {} @@ -24,29 +24,58 @@ describe('SocketServerProvider', () => { }); describe('scanForSocketServer', () => { let createSocketServerSpy: sinon.SinonSpy; - const namespace = 'test'; + const path = 'localhost:3030'; const port = 30; beforeEach(() => { createSocketServerSpy = sinon.spy(instance, 'createSocketServer' as any); }); + afterEach(() => { mockContainer.restore(); }); - it(`should returns stored server`, () => { + + it(`should return stored server`, () => { const server = { test: 'test' }; - mockContainer.expects('getSocketEventsHostByPort').returns(server); + mockContainer.expects('getOneByConfig').returns(server); const result = instance.scanForSocketServer({ namespace: null }, port); expect(createSocketServerSpy.called).to.be.false; expect(result).to.eq(server); }); + it(`should call "createSocketServer" when server is not stored already`, () => { - mockContainer.expects('getSocketEventsHostByPort').returns(null); + mockContainer.expects('getOneByConfig').returns(null); - instance.scanForSocketServer({ namespace }, port); + instance.scanForSocketServer({ path }, port); expect(createSocketServerSpy.called).to.be.true; }); + + it(`should call "decorateWithNamespace" when namespace is specified`, () => { + const decorateWithNamespaceSpy = sinon.spy( + instance, + 'decorateWithNamespace' as any, + ); + + instance.scanForSocketServer({ path, namespace: 'random' }, port); + expect(decorateWithNamespaceSpy.called).to.be.true; + }); + + describe('when namespace is specified and server does exist already', () => { + it(`should call "decorateWithNamespace" and not call "createSocketServer"`, () => { + const server = { test: 'test' }; + mockContainer.expects('getOneByConfig').returns(server); + + const decorateWithNamespaceSpy = sinon.spy( + instance, + 'decorateWithNamespace' as any, + ); + + instance.scanForSocketServer({ path, namespace: 'random' }, port); + expect(decorateWithNamespaceSpy.called).to.be.true; + expect(createSocketServerSpy.called).to.be.false; + }); + }); }); }); diff --git a/packages/websockets/web-sockets-controller.ts b/packages/websockets/web-sockets-controller.ts index 134217803cf..82db9cca8f3 100644 --- a/packages/websockets/web-sockets-controller.ts +++ b/packages/websockets/web-sockets-controller.ts @@ -13,7 +13,7 @@ import { } from './gateway-metadata-explorer'; import { GatewayMetadata } from './interfaces/gateway-metadata.interface'; import { NestGateway } from './interfaces/nest-gateway.interface'; -import { SocketEventsHost } from './interfaces/socket-events-host.interface'; +import { ServerAndEventStreamsHost } from './interfaces/server-and-event-streams-host.interface'; import { SocketServerProvider } from './socket-server-provider'; import { compareElementAt } from './utils/compare-element.util'; @@ -72,7 +72,7 @@ export class WebSocketsController { public subscribeEvents( instance: NestGateway, subscribersMap: MessageMappingProperties[], - observableServer: SocketEventsHost, + observableServer: ServerAndEventStreamsHost, ) { const { init, disconnect, connection, server } = observableServer; const adapter = this.config.getIoAdapter(); From d381e149ca1990a630382520b6d0ae23424ff6b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Thu, 4 Feb 2021 14:48:59 +0100 Subject: [PATCH 2/6] fix(websockets): allow to share a single server with multiple paths (ws) --- integration/websockets/e2e/ws-gateway.spec.ts | 79 ++++++++++++- .../websockets/src/example-path.gateway.ts | 14 +++ integration/websockets/src/ws-path.gateway.ts | 14 +++ .../websockets/src/ws-path2.gateway.ts | 14 +++ package.json | 2 +- packages/platform-ws/adapters/ws-adapter.ts | 106 ++++++++++++++++-- packages/websockets/adapters/ws-adapter.ts | 6 +- packages/websockets/socket-module.ts | 3 + packages/websockets/socket-server-provider.ts | 8 +- 9 files changed, 230 insertions(+), 16 deletions(-) create mode 100644 integration/websockets/src/example-path.gateway.ts create mode 100644 integration/websockets/src/ws-path.gateway.ts create mode 100644 integration/websockets/src/ws-path2.gateway.ts diff --git a/integration/websockets/e2e/ws-gateway.spec.ts b/integration/websockets/e2e/ws-gateway.spec.ts index e96b05150af..4130ab200e7 100644 --- a/integration/websockets/e2e/ws-gateway.spec.ts +++ b/integration/websockets/e2e/ws-gateway.spec.ts @@ -5,7 +5,10 @@ import { expect } from 'chai'; import * as WebSocket from 'ws'; import { ApplicationGateway } from '../src/app.gateway'; import { CoreGateway } from '../src/core.gateway'; +import { ExamplePathGateway } from '../src/example-path.gateway'; import { ServerGateway } from '../src/server.gateway'; +import { WsPathGateway } from '../src/ws-path.gateway'; +import { WsPathGateway2 } from '../src/ws-path2.gateway'; async function createNestApp(...gateways): Promise { const testingModule = await Test.createTestingModule({ @@ -65,7 +68,81 @@ describe('WebSocketGateway (WsAdapter)', () => { ); }); - it(`should support 2 different gateways`, async function () { + it(`should handle message on a different path`, async () => { + app = await createNestApp(WsPathGateway); + await app.listenAsync(3000); + try { + ws = new WebSocket('ws://localhost:3000/ws-path'); + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', reject); + }); + + ws.send( + JSON.stringify({ + event: 'push', + data: { + test: 'test', + }, + }), + ); + await new Promise(resolve => + ws.on('message', data => { + expect(JSON.parse(data).data.test).to.be.eql('test'); + resolve(); + }), + ); + } catch (err) { + console.log(err); + } + }); + + it(`should support 2 different gateways running on different paths`, async function () { + this.retries(10); + + app = await createNestApp(ExamplePathGateway, WsPathGateway2); + await app.listenAsync(3000); + + // open websockets delay + await new Promise(resolve => setTimeout(resolve, 1000)); + + ws = new WebSocket('ws://localhost:8082/example'); + ws2 = new WebSocket('ws://localhost:8082/ws-path'); + + await new Promise(resolve => + ws.on('open', () => { + ws.on('message', data => { + expect(JSON.parse(data).data.test).to.be.eql('test'); + resolve(); + }); + ws.send( + JSON.stringify({ + event: 'push', + data: { + test: 'test', + }, + }), + ); + }), + ); + + await new Promise(resolve => { + ws2.on('message', data => { + expect(JSON.parse(data).data.test).to.be.eql('test'); + resolve(); + }); + ws2.send( + JSON.stringify({ + event: 'push', + data: { + test: 'test', + }, + }), + ); + }); + }); + + it(`should support 2 different gateways running on the same path (but different ports)`, async function () { this.retries(10); app = await createNestApp(ApplicationGateway, CoreGateway); diff --git a/integration/websockets/src/example-path.gateway.ts b/integration/websockets/src/example-path.gateway.ts new file mode 100644 index 00000000000..31c4b2c63b0 --- /dev/null +++ b/integration/websockets/src/example-path.gateway.ts @@ -0,0 +1,14 @@ +import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets'; + +@WebSocketGateway(8082, { + path: '/example', +}) +export class ExamplePathGateway { + @SubscribeMessage('push') + onPush(client, data) { + return { + event: 'pop', + data, + }; + } +} diff --git a/integration/websockets/src/ws-path.gateway.ts b/integration/websockets/src/ws-path.gateway.ts new file mode 100644 index 00000000000..d40d098480e --- /dev/null +++ b/integration/websockets/src/ws-path.gateway.ts @@ -0,0 +1,14 @@ +import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets'; + +@WebSocketGateway({ + path: '/ws-path', +}) +export class WsPathGateway { + @SubscribeMessage('push') + onPush(client, data) { + return { + event: 'pop', + data, + }; + } +} diff --git a/integration/websockets/src/ws-path2.gateway.ts b/integration/websockets/src/ws-path2.gateway.ts new file mode 100644 index 00000000000..0c4a9d69d21 --- /dev/null +++ b/integration/websockets/src/ws-path2.gateway.ts @@ -0,0 +1,14 @@ +import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets'; + +@WebSocketGateway(8082, { + path: '/ws-path', +}) +export class WsPathGateway2 { + @SubscribeMessage('push') + onPush(client, data) { + return { + event: 'pop', + data, + }; + } +} diff --git a/package.json b/package.json index 52f4520d6b9..faa61043801 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "format": "prettier \"**/*.ts\" --ignore-path ./.prettierignore --write && git status", "postinstall": "opencollective", "test": "nyc --require ts-node/register mocha packages/**/*.spec.ts --reporter spec --retries 3 --require 'node_modules/reflect-metadata/Reflect.js' --exit", - "test:integration": "mocha \"integration/*/{,!(node_modules)/**/}/*.spec.ts\" --reporter spec --require ts-node/register --require 'node_modules/reflect-metadata/Reflect.js' --exit", + "test:integration": "mocha \"integration/**/{,!(node_modules)/**/}/*.spec.ts\" --reporter spec --require ts-node/register --require 'node_modules/reflect-metadata/Reflect.js' --exit", "test:docker:up": "docker-compose -f integration/docker-compose.yml up -d", "test:docker:down": "docker-compose -f integration/docker-compose.yml down", "lint": "concurrently 'npm run lint:packages' 'npm run lint:integration' 'npm run lint:spec'", diff --git a/packages/platform-ws/adapters/ws-adapter.ts b/packages/platform-ws/adapters/ws-adapter.ts index 1c123ab564f..b0047dc8926 100644 --- a/packages/platform-ws/adapters/ws-adapter.ts +++ b/packages/platform-ws/adapters/ws-adapter.ts @@ -7,6 +7,7 @@ import { ERROR_EVENT, } from '@nestjs/websockets/constants'; import { MessageMappingProperties } from '@nestjs/websockets/gateway-metadata-explorer'; +import * as http from 'http'; import { EMPTY as empty, fromEvent, Observable } from 'rxjs'; import { filter, first, mergeMap, share, takeUntil } from 'rxjs/operators'; @@ -19,8 +20,23 @@ enum READY_STATE { CLOSED_STATE = 3, } +type HttpServerRegistryKey = number; +type HttpServerRegistryEntry = any; +type WsServerRegistryKey = number; +type WsServerRegistryEntry = any[]; + +const UNDERLYING_HTTP_SERVER_PORT = 0; + export class WsAdapter extends AbstractWsAdapter { protected readonly logger = new Logger(WsAdapter.name); + protected readonly httpServersRegistry = new Map< + HttpServerRegistryKey, + HttpServerRegistryEntry + >(); + protected readonly wsServersRegistry = new Map< + WsServerRegistryKey, + WsServerRegistryEntry + >(); constructor(appOrHttpServer?: INestApplicationContext | any) { super(appOrHttpServer); @@ -39,7 +55,7 @@ export class WsAdapter extends AbstractWsAdapter { this.logger.error(error); throw error; } - if (port === 0 && this.httpServer) { + if (port === UNDERLYING_HTTP_SERVER_PORT && this.httpServer) { return this.bindErrorHandler( new wsPackage.Server({ server: this.httpServer, @@ -47,14 +63,33 @@ export class WsAdapter extends AbstractWsAdapter { }), ); } - return server - ? server - : this.bindErrorHandler( - new wsPackage.Server({ - port, - ...wsOptions, - }), - ); + + if (server) { + // When server exists already + return server; + } + if (options.path && port !== UNDERLYING_HTTP_SERVER_PORT) { + // Multiple servers with different paths + // sharing a single HTTP/S server running on different port + // than a regular HTTP application + this.ensureHttpServerExists(port); + + const wsServer = this.bindErrorHandler( + new wsPackage.Server({ + noServer: true, + ...wsOptions, + }), + ); + this.addWsServerToRegistry(wsServer, port, options.path); + return wsServer; + } + const wsServer = this.bindErrorHandler( + new wsPackage.Server({ + port, + ...wsOptions, + }), + ); + return wsServer; } public bindMessageHandlers( @@ -98,7 +133,7 @@ export class WsAdapter extends AbstractWsAdapter { } public bindErrorHandler(server: any) { - server.on(CONNECTION_EVENT, ws => + server.on(CONNECTION_EVENT, (ws: any) => ws.on(ERROR_EVENT, (err: any) => this.logger.error(err)), ); server.on(ERROR_EVENT, (err: any) => this.logger.error(err)); @@ -108,4 +143,55 @@ export class WsAdapter extends AbstractWsAdapter { public bindClientDisconnect(client: any, callback: Function) { client.on(CLOSE_EVENT, callback); } + + public async dispose() { + const closeEvents = Array.from(this.httpServersRegistry).map( + ([_, server]) => new Promise(resolve => server.close(resolve)), + ); + await Promise.all(closeEvents); + this.httpServersRegistry.clear(); + this.wsServersRegistry.clear(); + } + + protected ensureHttpServerExists(port: number) { + if (this.httpServersRegistry.has(port)) { + return; + } + const httpServer = http.createServer(); + this.httpServersRegistry.set(port, httpServer); + + httpServer.on('upgrade', (request, socket, head) => { + const baseUrl = 'ws://' + request.headers.host + '/'; + const pathname = new URL(request.url, baseUrl).pathname; + const wsServersCollection = this.wsServersRegistry.get(port); + + let isRequestDelegated = false; + for (const wsServer of wsServersCollection) { + if (pathname === wsServer.path) { + wsServer.handleUpgrade(request, socket, head, (ws: unknown) => { + wsServer.emit('connection', ws, request); + }); + isRequestDelegated = true; + break; + } + } + if (!isRequestDelegated) { + socket.destroy(); + } + }); + + httpServer.listen(port); + } + + protected addWsServerToRegistry = any>( + wsServer: T, + port: number, + path: string, + ) { + const entries = this.wsServersRegistry.get(port) ?? []; + entries.push(wsServer); + + wsServer.path = path; + this.wsServersRegistry.set(port, entries); + } } diff --git a/packages/websockets/adapters/ws-adapter.ts b/packages/websockets/adapters/ws-adapter.ts index 6d3eab7747d..be06709e8e3 100644 --- a/packages/websockets/adapters/ws-adapter.ts +++ b/packages/websockets/adapters/ws-adapter.ts @@ -33,11 +33,13 @@ export abstract class AbstractWsAdapter< client.on(DISCONNECT_EVENT, callback); } - public close(server: TServer) { + public async close(server: TServer) { const isCallable = server && isFunction(server.close); - isCallable && server.close(); + isCallable && (await new Promise(resolve => server.close(resolve))); } + public async dispose() {} + public abstract create(port: number, options?: TOptions): TServer; public abstract bindMessageHandlers( client: TClient, diff --git a/packages/websockets/socket-module.ts b/packages/websockets/socket-module.ts index 532c5180e63..17a6ebe0f8d 100644 --- a/packages/websockets/socket-module.ts +++ b/packages/websockets/socket-module.ts @@ -10,6 +10,7 @@ import { InterceptorsContextCreator } from '@nestjs/core/interceptors/intercepto import { PipesConsumer } from '@nestjs/core/pipes/pipes-consumer'; import { PipesContextCreator } from '@nestjs/core/pipes/pipes-context-creator'; import { iterate } from 'iterare'; +import { AbstractWsAdapter } from './adapters'; import { GATEWAY_METADATA } from './constants'; import { ExceptionFiltersContext } from './context/exception-filters-context'; import { WsContextCreator } from './context/ws-context-creator'; @@ -92,6 +93,8 @@ export class SocketModule { .filter(({ server }) => server) .map(async ({ server }) => adapter.close(server)), ); + await (adapter as AbstractWsAdapter)?.dispose(); + this.socketsContainer.clear(); } diff --git a/packages/websockets/socket-server-provider.ts b/packages/websockets/socket-server-provider.ts index f5e1c785e93..eab5a11d513 100644 --- a/packages/websockets/socket-server-provider.ts +++ b/packages/websockets/socket-server-provider.ts @@ -20,7 +20,11 @@ export class SocketServerProvider { path: options.path, }); if (serverAndStreamsHost && options.namespace) { - return this.decorateWithNamespace(options, port, serverAndStreamsHost); + return this.decorateWithNamespace( + options, + port, + serverAndStreamsHost.server, + ); } return serverAndStreamsHost ? serverAndStreamsHost @@ -65,7 +69,7 @@ export class SocketServerProvider { namespaceServer, ); this.socketsContainer.addOne( - { port, path: options.path }, + { port, path: options.path, namespace: options.namespace }, serverAndEventStreamsHost, ); return serverAndEventStreamsHost; From cb4a9d641b95d89bbdfee5fd1f924d2d1713c0b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Thu, 4 Feb 2021 15:07:00 +0100 Subject: [PATCH 3/6] fix(websockets): allow multiple paths when http internal server is used --- integration/websockets/e2e/ws-gateway.spec.ts | 4 +-- .../websockets/src/example-path.gateway.ts | 2 +- .../websockets/src/ws-path2.gateway.ts | 2 +- package.json | 2 +- packages/platform-ws/adapters/ws-adapter.ts | 35 +++++++++++-------- 5 files changed, 26 insertions(+), 19 deletions(-) diff --git a/integration/websockets/e2e/ws-gateway.spec.ts b/integration/websockets/e2e/ws-gateway.spec.ts index de4daf9889e..c68883ba3fd 100644 --- a/integration/websockets/e2e/ws-gateway.spec.ts +++ b/integration/websockets/e2e/ws-gateway.spec.ts @@ -106,8 +106,8 @@ describe('WebSocketGateway (WsAdapter)', () => { // open websockets delay await new Promise(resolve => setTimeout(resolve, 1000)); - ws = new WebSocket('ws://localhost:8082/example'); - ws2 = new WebSocket('ws://localhost:8082/ws-path'); + ws = new WebSocket('ws://localhost:3000/example'); + ws2 = new WebSocket('ws://localhost:3000/ws-path'); await new Promise(resolve => ws.on('open', () => { diff --git a/integration/websockets/src/example-path.gateway.ts b/integration/websockets/src/example-path.gateway.ts index 31c4b2c63b0..728e2fb3207 100644 --- a/integration/websockets/src/example-path.gateway.ts +++ b/integration/websockets/src/example-path.gateway.ts @@ -1,6 +1,6 @@ import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets'; -@WebSocketGateway(8082, { +@WebSocketGateway({ path: '/example', }) export class ExamplePathGateway { diff --git a/integration/websockets/src/ws-path2.gateway.ts b/integration/websockets/src/ws-path2.gateway.ts index 0c4a9d69d21..2334950eb68 100644 --- a/integration/websockets/src/ws-path2.gateway.ts +++ b/integration/websockets/src/ws-path2.gateway.ts @@ -1,6 +1,6 @@ import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets'; -@WebSocketGateway(8082, { +@WebSocketGateway({ path: '/ws-path', }) export class WsPathGateway2 { diff --git a/package.json b/package.json index d4100d6d252..539a2a1fd6f 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "format": "prettier \"**/*.ts\" --ignore-path ./.prettierignore --write && git status", "postinstall": "opencollective", "test": "nyc --require ts-node/register mocha packages/**/*.spec.ts --reporter spec --require 'node_modules/reflect-metadata/Reflect.js' --exit", - "test:integration": "mocha \"integration/*/{,!(node_modules)/**/}/*.spec.ts\" --reporter spec --require ts-node/register --require 'node_modules/reflect-metadata/Reflect.js' --exit", + "test:integration": "mocha \"integration/websockets/{,!(node_modules)/**/}/*.spec.ts\" --reporter spec --require ts-node/register --require 'node_modules/reflect-metadata/Reflect.js' --exit", "test:docker:up": "docker-compose -f integration/docker-compose.yml up -d", "test:docker:down": "docker-compose -f integration/docker-compose.yml down", "lint": "concurrently 'npm run lint:packages' 'npm run lint:integration' 'npm run lint:spec'", diff --git a/packages/platform-ws/adapters/ws-adapter.ts b/packages/platform-ws/adapters/ws-adapter.ts index b0047dc8926..cca1f81a62d 100644 --- a/packages/platform-ws/adapters/ws-adapter.ts +++ b/packages/platform-ws/adapters/ws-adapter.ts @@ -45,8 +45,8 @@ export class WsAdapter extends AbstractWsAdapter { public create( port: number, - options?: any & { namespace?: string; server?: any }, - ): any { + options?: Record & { namespace?: string; server?: any }, + ) { const { server, ...wsOptions } = options; if (wsOptions?.namespace) { const error = new Error( @@ -55,24 +55,29 @@ export class WsAdapter extends AbstractWsAdapter { this.logger.error(error); throw error; } + if (port === UNDERLYING_HTTP_SERVER_PORT && this.httpServer) { - return this.bindErrorHandler( + this.ensureHttpServerExists(port, this.httpServer); + const wsServer = this.bindErrorHandler( new wsPackage.Server({ - server: this.httpServer, + noServer: true, ...wsOptions, }), ); + + this.addWsServerToRegistry(wsServer, port, options.path || '/'); + return wsServer; } if (server) { - // When server exists already return server; } if (options.path && port !== UNDERLYING_HTTP_SERVER_PORT) { // Multiple servers with different paths // sharing a single HTTP/S server running on different port // than a regular HTTP application - this.ensureHttpServerExists(port); + const httpServer = this.ensureHttpServerExists(port); + httpServer?.listen(port); const wsServer = this.bindErrorHandler( new wsPackage.Server({ @@ -145,19 +150,22 @@ export class WsAdapter extends AbstractWsAdapter { } public async dispose() { - const closeEvents = Array.from(this.httpServersRegistry).map( - ([_, server]) => new Promise(resolve => server.close(resolve)), - ); - await Promise.all(closeEvents); + const closeEventSignals = Array.from(this.httpServersRegistry) + .filter(([port]) => port !== UNDERLYING_HTTP_SERVER_PORT) + .map(([_, server]) => new Promise(resolve => server.close(resolve))); + + await Promise.all(closeEventSignals); this.httpServersRegistry.clear(); this.wsServersRegistry.clear(); } - protected ensureHttpServerExists(port: number) { + protected ensureHttpServerExists( + port: number, + httpServer = http.createServer(), + ) { if (this.httpServersRegistry.has(port)) { return; } - const httpServer = http.createServer(); this.httpServersRegistry.set(port, httpServer); httpServer.on('upgrade', (request, socket, head) => { @@ -179,8 +187,7 @@ export class WsAdapter extends AbstractWsAdapter { socket.destroy(); } }); - - httpServer.listen(port); + return httpServer; } protected addWsServerToRegistry = any>( From 949df4a88d055fb02c7a9c7276e779be3e3d97a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Thu, 4 Feb 2021 15:12:57 +0100 Subject: [PATCH 4/6] fix(): hotfix --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index b759dce1825..99ba3a5e6d9 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "format": "prettier \"**/*.ts\" --ignore-path ./.prettierignore --write && git status", "postinstall": "opencollective", "test": "nyc --require ts-node/register mocha packages/**/*.spec.ts --reporter spec --require 'node_modules/reflect-metadata/Reflect.js' --exit", - "test:integration": "mocha \"integration/websockets/{,!(node_modules)/**/}/*.spec.ts\" --reporter spec --require ts-node/register --require 'node_modules/reflect-metadata/Reflect.js' --exit", + "test:integration": "mocha \"integration/*/{,!(node_modules)/**/}/*.spec.ts\" --reporter spec --require ts-node/register --require 'node_modules/reflect-metadata/Reflect.js' --exit", "test:docker:up": "docker-compose -f integration/docker-compose.yml up -d", "test:docker:down": "docker-compose -f integration/docker-compose.yml down", "lint": "concurrently 'npm run lint:packages' 'npm run lint:integration' 'npm run lint:spec'", From 1e47bd909429469799f698f1e82da976742e2cc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Fri, 5 Feb 2021 15:17:19 +0100 Subject: [PATCH 5/6] sample(02-gateways): fix redis io adapter import --- packages/websockets/adapters/ws-adapter.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/websockets/adapters/ws-adapter.ts b/packages/websockets/adapters/ws-adapter.ts index be06709e8e3..29c64ecb94a 100644 --- a/packages/websockets/adapters/ws-adapter.ts +++ b/packages/websockets/adapters/ws-adapter.ts @@ -38,6 +38,7 @@ export abstract class AbstractWsAdapter< isCallable && (await new Promise(resolve => server.close(resolve))); } + // eslint-disable-next-line @typescript-eslint/no-empty-function public async dispose() {} public abstract create(port: number, options?: TOptions): TServer; From b15914db1e5a55dc29cd55e73276c524a8576968 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Fri, 5 Feb 2021 15:18:16 +0100 Subject: [PATCH 6/6] sample(02-gateways): fix redis io adapter import --- sample/02-gateways/src/adapters/redis-io.adapter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sample/02-gateways/src/adapters/redis-io.adapter.ts b/sample/02-gateways/src/adapters/redis-io.adapter.ts index 99e43823a79..128d412afea 100644 --- a/sample/02-gateways/src/adapters/redis-io.adapter.ts +++ b/sample/02-gateways/src/adapters/redis-io.adapter.ts @@ -1,6 +1,6 @@ import { IoAdapter } from '@nestjs/platform-socket.io'; import { ServerOptions } from 'socket.io'; -import * as redisIoAdapter from 'socket.io-redis'; +import redisIoAdapter from 'socket.io-redis'; const redisAdapter = redisIoAdapter({ host: 'localhost', port: 6379 });