From 29332072f49e645bfe0fa394bb4c6f66b0bc0600 Mon Sep 17 00:00:00 2001 From: Timo Glastra Date: Fri, 14 May 2021 11:22:36 +0200 Subject: [PATCH] feat: add internal ws outbound transporter (#267) --- package.json | 6 +- samples/__tests__/e2e-ws.test.ts | 88 +------------------- samples/mediator-ws.ts | 10 ++- samples/mediator.ts | 8 ++ src/__tests__/helpers.ts | 8 ++ src/agent/Agent.ts | 5 +- src/agent/MessageSender.ts | 8 +- src/transport/HttpOutboundTransporter.ts | 8 ++ src/transport/OutboundTransporter.ts | 3 + src/transport/WsOutboundTransporter.ts | 101 +++++++++++++++++++++++ src/transport/index.ts | 1 + src/utils/ws.ts | 14 ++++ 12 files changed, 164 insertions(+), 96 deletions(-) create mode 100644 src/transport/WsOutboundTransporter.ts create mode 100644 src/utils/ws.ts diff --git a/package.json b/package.json index 8055bca6ab..022c659521 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,8 @@ "react-native-fs": "^2.18.0", "reflect-metadata": "^0.1.13", "tsyringe": "^4.5.0", - "uuid": "^8.3.0" + "uuid": "^8.3.0", + "ws": "^7.4.5" }, "devDependencies": { "@types/bn.js": "^5.1.0", @@ -63,8 +64,7 @@ "ts-jest": "^26.5.3", "ts-node-dev": "^1.1.6", "tslog": "^3.1.2", - "typescript": "^4.2.3", - "ws": "^7.4.5" + "typescript": "^4.2.3" }, "publishConfig": { "access": "public", diff --git a/samples/__tests__/e2e-ws.test.ts b/samples/__tests__/e2e-ws.test.ts index f3241a9672..841bca6d22 100644 --- a/samples/__tests__/e2e-ws.test.ts +++ b/samples/__tests__/e2e-ws.test.ts @@ -1,10 +1,7 @@ -import WebSocket from 'ws' -import { Agent, ConnectionRecord, InboundTransporter, OutboundTransporter } from '../../src' -import { OutboundPackage } from '../../src/types' +import { Agent, InboundTransporter, WsOutboundTransporter } from '../../src' import { get } from '../http' import { getBaseConfig, toBeConnectedWith, waitForBasicMessage } from '../../src/__tests__/helpers' import testLogger from '../../src/__tests__/logger' -import { WebSocketTransport } from '../../src/agent/TransportService' const logger = testLogger @@ -19,8 +16,8 @@ describe('websockets with mediator', () => { let aliceAtAliceBobId: string afterAll(async () => { - ;(aliceAgent.getOutboundTransporter() as WsOutboundTransporter).stop() - ;(bobAgent.getOutboundTransporter() as WsOutboundTransporter).stop() + await aliceAgent.outboundTransporter?.stop() + await bobAgent.outboundTransporter?.stop() // Wait for messages to flush out await new Promise((r) => setTimeout(r, 1000)) @@ -118,82 +115,3 @@ class WsInboundTransporter implements InboundTransporter { }) } } - -class WsOutboundTransporter implements OutboundTransporter { - private transportTable: Map = new Map() - private agent: Agent - - public supportedSchemes = ['ws'] - - public constructor(agent: Agent) { - this.agent = agent - } - - public async sendMessage(outboundPackage: OutboundPackage) { - const { connection, payload, transport } = outboundPackage - logger.debug(`Sending outbound message to connection ${connection.id} over ${transport?.type} transport.`, payload) - - if (transport instanceof WebSocketTransport) { - const socket = await this.resolveSocket(connection, transport) - socket.send(JSON.stringify(payload)) - } else { - throw new Error(`Unsupported transport ${transport?.type}.`) - } - } - - private async resolveSocket(connection: ConnectionRecord, transport: WebSocketTransport) { - if (transport.socket?.readyState === WebSocket.OPEN) { - return transport.socket - } else { - let socket = this.transportTable.get(connection.id) - if (!socket) { - if (!transport.endpoint) { - throw new Error(`Missing endpoint. I don't know how and where to send the message.`) - } - socket = await createSocketConnection(transport.endpoint) - this.transportTable.set(connection.id, socket) - this.listenOnWebSocketMessages(this.agent, socket) - } - - if (socket.readyState !== WebSocket.OPEN) { - throw new Error('Socket is not open.') - } - return socket - } - } - - private listenOnWebSocketMessages(agent: Agent, socket: WebSocket) { - socket.addEventListener('message', (event: any) => { - logger.debug('WebSocket message event received.', { url: event.target.url, data: event.data }) - agent.receiveMessage(JSON.parse(event.data)) - }) - } - - public stop() { - this.transportTable.forEach((socket) => { - socket.removeAllListeners() - socket.close() - }) - } -} - -function createSocketConnection(endpoint: string): Promise { - if (!endpoint) { - throw new Error('Mediator URL is missing.') - } - return new Promise((resolve, reject) => { - logger.debug('Connecting to mediator via WebSocket') - const socket = new WebSocket(endpoint) - if (!socket) { - throw new Error('WebSocket has not been initialized.') - } - socket.onopen = () => { - logger.debug('Client connected') - resolve(socket) - } - socket.onerror = (e) => { - logger.debug('Client connection failed') - reject(e) - } - }) -} diff --git a/samples/mediator-ws.ts b/samples/mediator-ws.ts index af4459d2c4..ffbeb08db2 100644 --- a/samples/mediator-ws.ts +++ b/samples/mediator-ws.ts @@ -49,8 +49,16 @@ class WsInboundTransporter implements InboundTransporter { } } +// TODO: use WsOutboundTransporter from the agent class WsOutboundTransporter implements OutboundTransporter { - public supportedSchemes = ['ws'] + public supportedSchemes = ['ws', 'wss'] + + public async start(): Promise { + // Nothing required to start WS + } + public async stop(): Promise { + // Nothing required to stop WS + } public async sendMessage(outboundPackage: OutboundPackage) { const { connection, payload, transport } = outboundPackage diff --git a/samples/mediator.ts b/samples/mediator.ts index f5f2d50682..7a36f3f574 100644 --- a/samples/mediator.ts +++ b/samples/mediator.ts @@ -38,6 +38,14 @@ class StorageOutboundTransporter implements OutboundTransporter { this.messageRepository = messageRepository } + public async start(): Promise { + // Nothing required to start + } + + public async stop(): Promise { + // Nothing required to stop + } + public async sendMessage(outboundPackage: OutboundPackage) { const { connection, payload } = outboundPackage diff --git a/src/__tests__/helpers.ts b/src/__tests__/helpers.ts index 22897c39ea..7037b8b7cd 100644 --- a/src/__tests__/helpers.ts +++ b/src/__tests__/helpers.ts @@ -182,6 +182,14 @@ export class SubjectOutboundTransporter implements OutboundTransporter { this.subject = subject } + public async start(): Promise { + // Nothing required to start + } + + public async stop(): Promise { + // Nothing required to stop + } + public async sendMessage(outboundPackage: OutboundPackage) { testLogger.test(`Sending outbound message to connection ${outboundPackage.connection.id}`) const { payload } = outboundPackage diff --git a/src/agent/Agent.ts b/src/agent/Agent.ts index c91386762c..f4edc86d08 100644 --- a/src/agent/Agent.ts +++ b/src/agent/Agent.ts @@ -31,7 +31,6 @@ export class Agent { protected messageReceiver: MessageReceiver protected messageSender: MessageSender public inboundTransporter?: InboundTransporter - public outboundTransporter?: OutboundTransporter public readonly connections!: ConnectionsModule public readonly proofs!: ProofsModule @@ -107,8 +106,8 @@ export class Agent { this.messageSender.setOutboundTransporter(outboundTransporter) } - public getOutboundTransporter() { - return this.messageSender.getOutboundTransporter() + public get outboundTransporter() { + return this.messageSender.outboundTransporter } public async init() { diff --git a/src/agent/MessageSender.ts b/src/agent/MessageSender.ts index a148a58826..640b5e27b0 100644 --- a/src/agent/MessageSender.ts +++ b/src/agent/MessageSender.ts @@ -9,7 +9,7 @@ import { TransportService } from './TransportService' export class MessageSender { private envelopeService: EnvelopeService private transportService: TransportService - private outboundTransporter?: OutboundTransporter + private _outboundTransporter?: OutboundTransporter public constructor(envelopeService: EnvelopeService, transportService: TransportService) { this.envelopeService = envelopeService @@ -17,11 +17,11 @@ export class MessageSender { } public setOutboundTransporter(outboundTransporter: OutboundTransporter) { - this.outboundTransporter = outboundTransporter + this._outboundTransporter = outboundTransporter } - public getOutboundTransporter() { - return this.outboundTransporter + public get outboundTransporter() { + return this._outboundTransporter } public async packMessage(outboundMessage: OutboundMessage): Promise { diff --git a/src/transport/HttpOutboundTransporter.ts b/src/transport/HttpOutboundTransporter.ts index eb9ea717e2..01232f47c6 100644 --- a/src/transport/HttpOutboundTransporter.ts +++ b/src/transport/HttpOutboundTransporter.ts @@ -23,6 +23,14 @@ export class HttpOutboundTransporter implements OutboundTransporter { this.logger = agent.injectionContainer.resolve(Symbols.Logger) } + public async start(): Promise { + // Nothing required to start HTTP + } + + public async stop(): Promise { + // Nothing required to stop HTTP + } + public async sendMessage(outboundPackage: OutboundPackage) { const { payload, endpoint, responseRequested } = outboundPackage diff --git a/src/transport/OutboundTransporter.ts b/src/transport/OutboundTransporter.ts index a1cfaf973d..15e4363f77 100644 --- a/src/transport/OutboundTransporter.ts +++ b/src/transport/OutboundTransporter.ts @@ -4,4 +4,7 @@ export interface OutboundTransporter { supportedSchemes: string[] sendMessage(outboundPackage: OutboundPackage): Promise + + start(): Promise + stop(): Promise } diff --git a/src/transport/WsOutboundTransporter.ts b/src/transport/WsOutboundTransporter.ts new file mode 100644 index 0000000000..4053315829 --- /dev/null +++ b/src/transport/WsOutboundTransporter.ts @@ -0,0 +1,101 @@ +import { OutboundTransporter } from './OutboundTransporter' +import { Agent } from '../agent/Agent' +import { WebSocketTransport } from '../agent/TransportService' +import { Logger } from '../logger' +import { ConnectionRecord } from '../modules/connections' +import { OutboundPackage } from '../types' +import { Symbols } from '../symbols' +import { WebSocket } from '../utils/ws' + +export class WsOutboundTransporter implements OutboundTransporter { + private transportTable: Map = new Map() + private agent: Agent + private logger: Logger + + public supportedSchemes = ['ws', 'wss'] + + public constructor(agent: Agent) { + this.agent = agent + this.logger = agent.injectionContainer.resolve(Symbols.Logger) + } + + public async start(): Promise { + // Nothing required to start WS + } + + public async stop() { + this.transportTable.forEach((socket) => { + socket.removeEventListener('message', this.handleMessageEvent) + socket.close() + }) + } + + public async sendMessage(outboundPackage: OutboundPackage) { + const { connection, payload, transport } = outboundPackage + this.logger.debug( + `Sending outbound message to connection ${connection.id} over ${transport?.type} transport.`, + payload + ) + + if (transport instanceof WebSocketTransport) { + const socket = await this.resolveSocket(connection, transport) + socket.send(JSON.stringify(payload)) + } else { + throw new Error(`Unsupported transport ${transport?.type}.`) + } + } + + private async resolveSocket(connection: ConnectionRecord, transport: WebSocketTransport) { + // If we already have a socket connection use it + if (transport.socket?.readyState === WebSocket.OPEN) { + return transport.socket + } + + let socket = this.transportTable.get(connection.id) + + if (!socket) { + if (!transport.endpoint) { + throw new Error(`Missing endpoint. I don't know how and where to send the message.`) + } + socket = await this.createSocketConnection(transport.endpoint) + this.transportTable.set(connection.id, socket) + this.listenOnWebSocketMessages(socket) + } + + if (socket.readyState !== WebSocket.OPEN) { + throw new Error('Socket is not open.') + } + + return socket + } + + // NOTE: Because this method is passed to the event handler this must be a lambda method + // so 'this' is scoped to the 'WsOutboundTransporter' class instance + private handleMessageEvent = (event: any) => { + this.logger.debug('WebSocket message event received.', { url: event.target.url, data: event.data }) + this.agent.receiveMessage(JSON.parse(event.data)) + } + + private listenOnWebSocketMessages(socket: WebSocket) { + socket.addEventListener('message', this.handleMessageEvent) + } + + private createSocketConnection(endpoint: string): Promise { + return new Promise((resolve, reject) => { + this.logger.debug(`Connecting to WebSocket ${endpoint}`) + const socket = new WebSocket(endpoint) + + socket.onopen = () => { + this.logger.debug(`Successfully connected to WebSocket ${endpoint}`) + resolve(socket) + } + + socket.onerror = (error) => { + this.logger.debug(`Error while connecting to WebSocket ${endpoint}`, { + error, + }) + reject(error) + } + }) + } +} diff --git a/src/transport/index.ts b/src/transport/index.ts index b38eb8d866..f6bc87af83 100644 --- a/src/transport/index.ts +++ b/src/transport/index.ts @@ -1,3 +1,4 @@ export * from './InboundTransporter' export * from './OutboundTransporter' export * from './HttpOutboundTransporter' +export * from './WsOutboundTransporter' diff --git a/src/utils/ws.ts b/src/utils/ws.ts new file mode 100644 index 0000000000..5802ca4487 --- /dev/null +++ b/src/utils/ws.ts @@ -0,0 +1,14 @@ +import { isNodeJS } from './environment' + +// RN exposes global WebSocket +let WebSocket = global.WebSocket + +// NodeJS doesn't have WebSocket by default +if (!WebSocket && isNodeJS()) { + // eslint-disable-next-line @typescript-eslint/no-var-requires + const nodeWebSocket = require('ws') + + WebSocket = nodeWebSocket +} + +export { WebSocket }