Skip to content

Commit

Permalink
feat: add internal ws outbound transporter (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
TimoGlastra authored May 14, 2021
1 parent 60fc65f commit 2933207
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 96 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"react-native-fs": "^2.18.0",
"reflect-metadata": "^0.1.13",
"tsyringe": "^4.5.0",
"uuid": "^8.3.0"
"uuid": "^8.3.0",
"ws": "^7.4.5"
},
"devDependencies": {
"@types/bn.js": "^5.1.0",
Expand Down Expand Up @@ -63,8 +64,7 @@
"ts-jest": "^26.5.3",
"ts-node-dev": "^1.1.6",
"tslog": "^3.1.2",
"typescript": "^4.2.3",
"ws": "^7.4.5"
"typescript": "^4.2.3"
},
"publishConfig": {
"access": "public",
Expand Down
88 changes: 3 additions & 85 deletions samples/__tests__/e2e-ws.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import WebSocket from 'ws'
import { Agent, ConnectionRecord, InboundTransporter, OutboundTransporter } from '../../src'
import { OutboundPackage } from '../../src/types'
import { Agent, InboundTransporter, WsOutboundTransporter } from '../../src'
import { get } from '../http'
import { getBaseConfig, toBeConnectedWith, waitForBasicMessage } from '../../src/__tests__/helpers'
import testLogger from '../../src/__tests__/logger'
import { WebSocketTransport } from '../../src/agent/TransportService'

const logger = testLogger

Expand All @@ -19,8 +16,8 @@ describe('websockets with mediator', () => {
let aliceAtAliceBobId: string

afterAll(async () => {
;(aliceAgent.getOutboundTransporter() as WsOutboundTransporter).stop()
;(bobAgent.getOutboundTransporter() as WsOutboundTransporter).stop()
await aliceAgent.outboundTransporter?.stop()
await bobAgent.outboundTransporter?.stop()

// Wait for messages to flush out
await new Promise((r) => setTimeout(r, 1000))
Expand Down Expand Up @@ -118,82 +115,3 @@ class WsInboundTransporter implements InboundTransporter {
})
}
}

class WsOutboundTransporter implements OutboundTransporter {
private transportTable: Map<string, WebSocket> = new Map<string, WebSocket>()
private agent: Agent

public supportedSchemes = ['ws']

public constructor(agent: Agent) {
this.agent = agent
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { connection, payload, transport } = outboundPackage
logger.debug(`Sending outbound message to connection ${connection.id} over ${transport?.type} transport.`, payload)

if (transport instanceof WebSocketTransport) {
const socket = await this.resolveSocket(connection, transport)
socket.send(JSON.stringify(payload))
} else {
throw new Error(`Unsupported transport ${transport?.type}.`)
}
}

private async resolveSocket(connection: ConnectionRecord, transport: WebSocketTransport) {
if (transport.socket?.readyState === WebSocket.OPEN) {
return transport.socket
} else {
let socket = this.transportTable.get(connection.id)
if (!socket) {
if (!transport.endpoint) {
throw new Error(`Missing endpoint. I don't know how and where to send the message.`)
}
socket = await createSocketConnection(transport.endpoint)
this.transportTable.set(connection.id, socket)
this.listenOnWebSocketMessages(this.agent, socket)
}

if (socket.readyState !== WebSocket.OPEN) {
throw new Error('Socket is not open.')
}
return socket
}
}

private listenOnWebSocketMessages(agent: Agent, socket: WebSocket) {
socket.addEventListener('message', (event: any) => {
logger.debug('WebSocket message event received.', { url: event.target.url, data: event.data })
agent.receiveMessage(JSON.parse(event.data))
})
}

public stop() {
this.transportTable.forEach((socket) => {
socket.removeAllListeners()
socket.close()
})
}
}

function createSocketConnection(endpoint: string): Promise<WebSocket> {
if (!endpoint) {
throw new Error('Mediator URL is missing.')
}
return new Promise((resolve, reject) => {
logger.debug('Connecting to mediator via WebSocket')
const socket = new WebSocket(endpoint)
if (!socket) {
throw new Error('WebSocket has not been initialized.')
}
socket.onopen = () => {
logger.debug('Client connected')
resolve(socket)
}
socket.onerror = (e) => {
logger.debug('Client connection failed')
reject(e)
}
})
}
10 changes: 9 additions & 1 deletion samples/mediator-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,16 @@ class WsInboundTransporter implements InboundTransporter {
}
}

// TODO: use WsOutboundTransporter from the agent
class WsOutboundTransporter implements OutboundTransporter {
public supportedSchemes = ['ws']
public supportedSchemes = ['ws', 'wss']

public async start(): Promise<void> {
// Nothing required to start WS
}
public async stop(): Promise<void> {
// Nothing required to stop WS
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { connection, payload, transport } = outboundPackage
Expand Down
8 changes: 8 additions & 0 deletions samples/mediator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ class StorageOutboundTransporter implements OutboundTransporter {
this.messageRepository = messageRepository
}

public async start(): Promise<void> {
// Nothing required to start
}

public async stop(): Promise<void> {
// Nothing required to stop
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { connection, payload } = outboundPackage

Expand Down
8 changes: 8 additions & 0 deletions src/__tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ export class SubjectOutboundTransporter implements OutboundTransporter {
this.subject = subject
}

public async start(): Promise<void> {
// Nothing required to start
}

public async stop(): Promise<void> {
// Nothing required to stop
}

public async sendMessage(outboundPackage: OutboundPackage) {
testLogger.test(`Sending outbound message to connection ${outboundPackage.connection.id}`)
const { payload } = outboundPackage
Expand Down
5 changes: 2 additions & 3 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export class Agent {
protected messageReceiver: MessageReceiver
protected messageSender: MessageSender
public inboundTransporter?: InboundTransporter
public outboundTransporter?: OutboundTransporter

public readonly connections!: ConnectionsModule
public readonly proofs!: ProofsModule
Expand Down Expand Up @@ -107,8 +106,8 @@ export class Agent {
this.messageSender.setOutboundTransporter(outboundTransporter)
}

public getOutboundTransporter() {
return this.messageSender.getOutboundTransporter()
public get outboundTransporter() {
return this.messageSender.outboundTransporter
}

public async init() {
Expand Down
8 changes: 4 additions & 4 deletions src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import { TransportService } from './TransportService'
export class MessageSender {
private envelopeService: EnvelopeService
private transportService: TransportService
private outboundTransporter?: OutboundTransporter
private _outboundTransporter?: OutboundTransporter

public constructor(envelopeService: EnvelopeService, transportService: TransportService) {
this.envelopeService = envelopeService
this.transportService = transportService
}

public setOutboundTransporter(outboundTransporter: OutboundTransporter) {
this.outboundTransporter = outboundTransporter
this._outboundTransporter = outboundTransporter
}

public getOutboundTransporter() {
return this.outboundTransporter
public get outboundTransporter() {
return this._outboundTransporter
}

public async packMessage(outboundMessage: OutboundMessage): Promise<OutboundPackage> {
Expand Down
8 changes: 8 additions & 0 deletions src/transport/HttpOutboundTransporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ export class HttpOutboundTransporter implements OutboundTransporter {
this.logger = agent.injectionContainer.resolve(Symbols.Logger)
}

public async start(): Promise<void> {
// Nothing required to start HTTP
}

public async stop(): Promise<void> {
// Nothing required to stop HTTP
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { payload, endpoint, responseRequested } = outboundPackage

Expand Down
3 changes: 3 additions & 0 deletions src/transport/OutboundTransporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ export interface OutboundTransporter {
supportedSchemes: string[]

sendMessage(outboundPackage: OutboundPackage): Promise<any>

start(): Promise<void>
stop(): Promise<void>
}
101 changes: 101 additions & 0 deletions src/transport/WsOutboundTransporter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { OutboundTransporter } from './OutboundTransporter'
import { Agent } from '../agent/Agent'
import { WebSocketTransport } from '../agent/TransportService'
import { Logger } from '../logger'
import { ConnectionRecord } from '../modules/connections'
import { OutboundPackage } from '../types'
import { Symbols } from '../symbols'
import { WebSocket } from '../utils/ws'

export class WsOutboundTransporter implements OutboundTransporter {
private transportTable: Map<string, WebSocket> = new Map<string, WebSocket>()
private agent: Agent
private logger: Logger

public supportedSchemes = ['ws', 'wss']

public constructor(agent: Agent) {
this.agent = agent
this.logger = agent.injectionContainer.resolve(Symbols.Logger)
}

public async start(): Promise<void> {
// Nothing required to start WS
}

public async stop() {
this.transportTable.forEach((socket) => {
socket.removeEventListener('message', this.handleMessageEvent)
socket.close()
})
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { connection, payload, transport } = outboundPackage
this.logger.debug(
`Sending outbound message to connection ${connection.id} over ${transport?.type} transport.`,
payload
)

if (transport instanceof WebSocketTransport) {
const socket = await this.resolveSocket(connection, transport)
socket.send(JSON.stringify(payload))
} else {
throw new Error(`Unsupported transport ${transport?.type}.`)
}
}

private async resolveSocket(connection: ConnectionRecord, transport: WebSocketTransport) {
// If we already have a socket connection use it
if (transport.socket?.readyState === WebSocket.OPEN) {
return transport.socket
}

let socket = this.transportTable.get(connection.id)

if (!socket) {
if (!transport.endpoint) {
throw new Error(`Missing endpoint. I don't know how and where to send the message.`)
}
socket = await this.createSocketConnection(transport.endpoint)
this.transportTable.set(connection.id, socket)
this.listenOnWebSocketMessages(socket)
}

if (socket.readyState !== WebSocket.OPEN) {
throw new Error('Socket is not open.')
}

return socket
}

// NOTE: Because this method is passed to the event handler this must be a lambda method
// so 'this' is scoped to the 'WsOutboundTransporter' class instance
private handleMessageEvent = (event: any) => {
this.logger.debug('WebSocket message event received.', { url: event.target.url, data: event.data })
this.agent.receiveMessage(JSON.parse(event.data))
}

private listenOnWebSocketMessages(socket: WebSocket) {
socket.addEventListener('message', this.handleMessageEvent)
}

private createSocketConnection(endpoint: string): Promise<WebSocket> {
return new Promise((resolve, reject) => {
this.logger.debug(`Connecting to WebSocket ${endpoint}`)
const socket = new WebSocket(endpoint)

socket.onopen = () => {
this.logger.debug(`Successfully connected to WebSocket ${endpoint}`)
resolve(socket)
}

socket.onerror = (error) => {
this.logger.debug(`Error while connecting to WebSocket ${endpoint}`, {
error,
})
reject(error)
}
})
}
}
1 change: 1 addition & 0 deletions src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './InboundTransporter'
export * from './OutboundTransporter'
export * from './HttpOutboundTransporter'
export * from './WsOutboundTransporter'
14 changes: 14 additions & 0 deletions src/utils/ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { isNodeJS } from './environment'

// RN exposes global WebSocket
let WebSocket = global.WebSocket

// NodeJS doesn't have WebSocket by default
if (!WebSocket && isNodeJS()) {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const nodeWebSocket = require('ws')

WebSocket = nodeWebSocket
}

export { WebSocket }

0 comments on commit 2933207

Please sign in to comment.