Skip to content

Commit

Permalink
fix(websockets): allow to share a single server with multiple paths (ws)
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilmysliwiec committed Feb 4, 2021
1 parent a138be8 commit d381e14
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 16 deletions.
79 changes: 78 additions & 1 deletion integration/websockets/e2e/ws-gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import { expect } from 'chai';
import * as WebSocket from 'ws';
import { ApplicationGateway } from '../src/app.gateway';
import { CoreGateway } from '../src/core.gateway';
import { ExamplePathGateway } from '../src/example-path.gateway';
import { ServerGateway } from '../src/server.gateway';
import { WsPathGateway } from '../src/ws-path.gateway';
import { WsPathGateway2 } from '../src/ws-path2.gateway';

async function createNestApp(...gateways): Promise<INestApplication> {
const testingModule = await Test.createTestingModule({
Expand Down Expand Up @@ -65,7 +68,81 @@ describe('WebSocketGateway (WsAdapter)', () => {
);
});

it(`should support 2 different gateways`, async function () {
it(`should handle message on a different path`, async () => {
app = await createNestApp(WsPathGateway);
await app.listenAsync(3000);
try {
ws = new WebSocket('ws://localhost:3000/ws-path');
await new Promise((resolve, reject) => {
ws.on('open', resolve);
ws.on('error', reject);
});

ws.send(
JSON.stringify({
event: 'push',
data: {
test: 'test',
},
}),
);
await new Promise<void>(resolve =>
ws.on('message', data => {
expect(JSON.parse(data).data.test).to.be.eql('test');
resolve();
}),
);
} catch (err) {
console.log(err);
}
});

it(`should support 2 different gateways running on different paths`, async function () {
this.retries(10);

app = await createNestApp(ExamplePathGateway, WsPathGateway2);
await app.listenAsync(3000);

// open websockets delay
await new Promise(resolve => setTimeout(resolve, 1000));

ws = new WebSocket('ws://localhost:8082/example');
ws2 = new WebSocket('ws://localhost:8082/ws-path');

await new Promise<void>(resolve =>
ws.on('open', () => {
ws.on('message', data => {
expect(JSON.parse(data).data.test).to.be.eql('test');
resolve();
});
ws.send(
JSON.stringify({
event: 'push',
data: {
test: 'test',
},
}),
);
}),
);

await new Promise<void>(resolve => {
ws2.on('message', data => {
expect(JSON.parse(data).data.test).to.be.eql('test');
resolve();
});
ws2.send(
JSON.stringify({
event: 'push',
data: {
test: 'test',
},
}),
);
});
});

it(`should support 2 different gateways running on the same path (but different ports)`, async function () {
this.retries(10);

app = await createNestApp(ApplicationGateway, CoreGateway);
Expand Down
14 changes: 14 additions & 0 deletions integration/websockets/src/example-path.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets';

@WebSocketGateway(8082, {
path: '/example',
})
export class ExamplePathGateway {
@SubscribeMessage('push')
onPush(client, data) {
return {
event: 'pop',
data,
};
}
}
14 changes: 14 additions & 0 deletions integration/websockets/src/ws-path.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets';

@WebSocketGateway({
path: '/ws-path',
})
export class WsPathGateway {
@SubscribeMessage('push')
onPush(client, data) {
return {
event: 'pop',
data,
};
}
}
14 changes: 14 additions & 0 deletions integration/websockets/src/ws-path2.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets';

@WebSocketGateway(8082, {
path: '/ws-path',
})
export class WsPathGateway2 {
@SubscribeMessage('push')
onPush(client, data) {
return {
event: 'pop',
data,
};
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"format": "prettier \"**/*.ts\" --ignore-path ./.prettierignore --write && git status",
"postinstall": "opencollective",
"test": "nyc --require ts-node/register mocha packages/**/*.spec.ts --reporter spec --retries 3 --require 'node_modules/reflect-metadata/Reflect.js' --exit",
"test:integration": "mocha \"integration/*/{,!(node_modules)/**/}/*.spec.ts\" --reporter spec --require ts-node/register --require 'node_modules/reflect-metadata/Reflect.js' --exit",
"test:integration": "mocha \"integration/**/{,!(node_modules)/**/}/*.spec.ts\" --reporter spec --require ts-node/register --require 'node_modules/reflect-metadata/Reflect.js' --exit",
"test:docker:up": "docker-compose -f integration/docker-compose.yml up -d",
"test:docker:down": "docker-compose -f integration/docker-compose.yml down",
"lint": "concurrently 'npm run lint:packages' 'npm run lint:integration' 'npm run lint:spec'",
Expand Down
106 changes: 96 additions & 10 deletions packages/platform-ws/adapters/ws-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ERROR_EVENT,
} from '@nestjs/websockets/constants';
import { MessageMappingProperties } from '@nestjs/websockets/gateway-metadata-explorer';
import * as http from 'http';
import { EMPTY as empty, fromEvent, Observable } from 'rxjs';
import { filter, first, mergeMap, share, takeUntil } from 'rxjs/operators';

Expand All @@ -19,8 +20,23 @@ enum READY_STATE {
CLOSED_STATE = 3,
}

type HttpServerRegistryKey = number;
type HttpServerRegistryEntry = any;
type WsServerRegistryKey = number;
type WsServerRegistryEntry = any[];

const UNDERLYING_HTTP_SERVER_PORT = 0;

export class WsAdapter extends AbstractWsAdapter {
protected readonly logger = new Logger(WsAdapter.name);
protected readonly httpServersRegistry = new Map<
HttpServerRegistryKey,
HttpServerRegistryEntry
>();
protected readonly wsServersRegistry = new Map<
WsServerRegistryKey,
WsServerRegistryEntry
>();

constructor(appOrHttpServer?: INestApplicationContext | any) {
super(appOrHttpServer);
Expand All @@ -39,22 +55,41 @@ export class WsAdapter extends AbstractWsAdapter {
this.logger.error(error);
throw error;
}
if (port === 0 && this.httpServer) {
if (port === UNDERLYING_HTTP_SERVER_PORT && this.httpServer) {
return this.bindErrorHandler(
new wsPackage.Server({
server: this.httpServer,
...wsOptions,
}),
);
}
return server
? server
: this.bindErrorHandler(
new wsPackage.Server({
port,
...wsOptions,
}),
);

if (server) {
// When server exists already
return server;
}
if (options.path && port !== UNDERLYING_HTTP_SERVER_PORT) {
// Multiple servers with different paths
// sharing a single HTTP/S server running on different port
// than a regular HTTP application
this.ensureHttpServerExists(port);

const wsServer = this.bindErrorHandler(
new wsPackage.Server({
noServer: true,
...wsOptions,
}),
);
this.addWsServerToRegistry(wsServer, port, options.path);
return wsServer;
}
const wsServer = this.bindErrorHandler(
new wsPackage.Server({
port,
...wsOptions,
}),
);
return wsServer;
}

public bindMessageHandlers(
Expand Down Expand Up @@ -98,7 +133,7 @@ export class WsAdapter extends AbstractWsAdapter {
}

public bindErrorHandler(server: any) {
server.on(CONNECTION_EVENT, ws =>
server.on(CONNECTION_EVENT, (ws: any) =>
ws.on(ERROR_EVENT, (err: any) => this.logger.error(err)),
);
server.on(ERROR_EVENT, (err: any) => this.logger.error(err));
Expand All @@ -108,4 +143,55 @@ export class WsAdapter extends AbstractWsAdapter {
public bindClientDisconnect(client: any, callback: Function) {
client.on(CLOSE_EVENT, callback);
}

public async dispose() {
const closeEvents = Array.from(this.httpServersRegistry).map(
([_, server]) => new Promise(resolve => server.close(resolve)),
);
await Promise.all(closeEvents);
this.httpServersRegistry.clear();
this.wsServersRegistry.clear();
}

protected ensureHttpServerExists(port: number) {
if (this.httpServersRegistry.has(port)) {
return;
}
const httpServer = http.createServer();
this.httpServersRegistry.set(port, httpServer);

httpServer.on('upgrade', (request, socket, head) => {
const baseUrl = 'ws://' + request.headers.host + '/';
const pathname = new URL(request.url, baseUrl).pathname;
const wsServersCollection = this.wsServersRegistry.get(port);

let isRequestDelegated = false;
for (const wsServer of wsServersCollection) {
if (pathname === wsServer.path) {
wsServer.handleUpgrade(request, socket, head, (ws: unknown) => {
wsServer.emit('connection', ws, request);
});
isRequestDelegated = true;
break;
}
}
if (!isRequestDelegated) {
socket.destroy();
}
});

httpServer.listen(port);
}

protected addWsServerToRegistry<T extends Record<'path', string> = any>(
wsServer: T,
port: number,
path: string,
) {
const entries = this.wsServersRegistry.get(port) ?? [];
entries.push(wsServer);

wsServer.path = path;
this.wsServersRegistry.set(port, entries);
}
}
6 changes: 4 additions & 2 deletions packages/websockets/adapters/ws-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ export abstract class AbstractWsAdapter<
client.on(DISCONNECT_EVENT, callback);
}

public close(server: TServer) {
public async close(server: TServer) {
const isCallable = server && isFunction(server.close);
isCallable && server.close();
isCallable && (await new Promise(resolve => server.close(resolve)));
}

public async dispose() {}

public abstract create(port: number, options?: TOptions): TServer;
public abstract bindMessageHandlers(
client: TClient,
Expand Down
3 changes: 3 additions & 0 deletions packages/websockets/socket-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { InterceptorsContextCreator } from '@nestjs/core/interceptors/intercepto
import { PipesConsumer } from '@nestjs/core/pipes/pipes-consumer';
import { PipesContextCreator } from '@nestjs/core/pipes/pipes-context-creator';
import { iterate } from 'iterare';
import { AbstractWsAdapter } from './adapters';
import { GATEWAY_METADATA } from './constants';
import { ExceptionFiltersContext } from './context/exception-filters-context';
import { WsContextCreator } from './context/ws-context-creator';
Expand Down Expand Up @@ -92,6 +93,8 @@ export class SocketModule<HttpServer = any> {
.filter(({ server }) => server)
.map(async ({ server }) => adapter.close(server)),
);
await (adapter as AbstractWsAdapter)?.dispose();

this.socketsContainer.clear();
}

Expand Down
8 changes: 6 additions & 2 deletions packages/websockets/socket-server-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ export class SocketServerProvider {
path: options.path,
});
if (serverAndStreamsHost && options.namespace) {
return this.decorateWithNamespace(options, port, serverAndStreamsHost);
return this.decorateWithNamespace(
options,
port,
serverAndStreamsHost.server,
);
}
return serverAndStreamsHost
? serverAndStreamsHost
Expand Down Expand Up @@ -65,7 +69,7 @@ export class SocketServerProvider {
namespaceServer,
);
this.socketsContainer.addOne(
{ port, path: options.path },
{ port, path: options.path, namespace: options.namespace },
serverAndEventStreamsHost,
);
return serverAndEventStreamsHost;
Expand Down

0 comments on commit d381e14

Please sign in to comment.