Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add internal ws outbound transporter #267

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"react-native-fs": "^2.18.0",
"reflect-metadata": "^0.1.13",
"tsyringe": "^4.5.0",
"uuid": "^8.3.0"
"uuid": "^8.3.0",
"ws": "^7.4.5"
},
"devDependencies": {
"@types/bn.js": "^5.1.0",
Expand Down Expand Up @@ -63,8 +64,7 @@
"ts-jest": "^26.5.3",
"ts-node-dev": "^1.1.6",
"tslog": "^3.1.2",
"typescript": "^4.2.3",
"ws": "^7.4.5"
"typescript": "^4.2.3"
},
"publishConfig": {
"access": "public",
Expand Down
88 changes: 3 additions & 85 deletions samples/__tests__/e2e-ws.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import WebSocket from 'ws'
import { Agent, ConnectionRecord, InboundTransporter, OutboundTransporter } from '../../src'
import { OutboundPackage } from '../../src/types'
import { Agent, InboundTransporter, WsOutboundTransporter } from '../../src'
import { get } from '../http'
import { getBaseConfig, toBeConnectedWith, waitForBasicMessage } from '../../src/__tests__/helpers'
import testLogger from '../../src/__tests__/logger'
import { WebSocketTransport } from '../../src/agent/TransportService'

const logger = testLogger

Expand All @@ -19,8 +16,8 @@ describe('websockets with mediator', () => {
let aliceAtAliceBobId: string

afterAll(async () => {
;(aliceAgent.getOutboundTransporter() as WsOutboundTransporter).stop()
;(bobAgent.getOutboundTransporter() as WsOutboundTransporter).stop()
await aliceAgent.outboundTransporter?.stop()
await bobAgent.outboundTransporter?.stop()

// Wait for messages to flush out
await new Promise((r) => setTimeout(r, 1000))
Expand Down Expand Up @@ -118,82 +115,3 @@ class WsInboundTransporter implements InboundTransporter {
})
}
}

class WsOutboundTransporter implements OutboundTransporter {
private transportTable: Map<string, WebSocket> = new Map<string, WebSocket>()
private agent: Agent

public supportedSchemes = ['ws']

public constructor(agent: Agent) {
this.agent = agent
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { connection, payload, transport } = outboundPackage
logger.debug(`Sending outbound message to connection ${connection.id} over ${transport?.type} transport.`, payload)

if (transport instanceof WebSocketTransport) {
const socket = await this.resolveSocket(connection, transport)
socket.send(JSON.stringify(payload))
} else {
throw new Error(`Unsupported transport ${transport?.type}.`)
}
}

private async resolveSocket(connection: ConnectionRecord, transport: WebSocketTransport) {
if (transport.socket?.readyState === WebSocket.OPEN) {
return transport.socket
} else {
let socket = this.transportTable.get(connection.id)
if (!socket) {
if (!transport.endpoint) {
throw new Error(`Missing endpoint. I don't know how and where to send the message.`)
}
socket = await createSocketConnection(transport.endpoint)
this.transportTable.set(connection.id, socket)
this.listenOnWebSocketMessages(this.agent, socket)
}

if (socket.readyState !== WebSocket.OPEN) {
throw new Error('Socket is not open.')
}
return socket
}
}

private listenOnWebSocketMessages(agent: Agent, socket: WebSocket) {
socket.addEventListener('message', (event: any) => {
logger.debug('WebSocket message event received.', { url: event.target.url, data: event.data })
agent.receiveMessage(JSON.parse(event.data))
})
}

public stop() {
this.transportTable.forEach((socket) => {
socket.removeAllListeners()
socket.close()
})
}
}

function createSocketConnection(endpoint: string): Promise<WebSocket> {
if (!endpoint) {
throw new Error('Mediator URL is missing.')
}
return new Promise((resolve, reject) => {
logger.debug('Connecting to mediator via WebSocket')
const socket = new WebSocket(endpoint)
if (!socket) {
throw new Error('WebSocket has not been initialized.')
}
socket.onopen = () => {
logger.debug('Client connected')
resolve(socket)
}
socket.onerror = (e) => {
logger.debug('Client connection failed')
reject(e)
}
})
}
10 changes: 9 additions & 1 deletion samples/mediator-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,16 @@ class WsInboundTransporter implements InboundTransporter {
}
}

// TODO: use WsOutboundTransporter from the agent
class WsOutboundTransporter implements OutboundTransporter {
public supportedSchemes = ['ws']
public supportedSchemes = ['ws', 'wss']
TimoGlastra marked this conversation as resolved.
Show resolved Hide resolved

public async start(): Promise<void> {
// Nothing required to start WS
}
public async stop(): Promise<void> {
// Nothing required to stop WS
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { connection, payload, transport } = outboundPackage
Expand Down
8 changes: 8 additions & 0 deletions samples/mediator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ class StorageOutboundTransporter implements OutboundTransporter {
this.messageRepository = messageRepository
}

public async start(): Promise<void> {
// Nothing required to start
}

public async stop(): Promise<void> {
// Nothing required to stop
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { connection, payload } = outboundPackage

Expand Down
8 changes: 8 additions & 0 deletions src/__tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ export class SubjectOutboundTransporter implements OutboundTransporter {
this.subject = subject
}

public async start(): Promise<void> {
// Nothing required to start
}

public async stop(): Promise<void> {
// Nothing required to stop
}

public async sendMessage(outboundPackage: OutboundPackage) {
testLogger.test(`Sending outbound message to connection ${outboundPackage.connection.id}`)
const { payload } = outboundPackage
Expand Down
5 changes: 2 additions & 3 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export class Agent {
protected messageReceiver: MessageReceiver
protected messageSender: MessageSender
public inboundTransporter?: InboundTransporter
public outboundTransporter?: OutboundTransporter

public readonly connections!: ConnectionsModule
public readonly proofs!: ProofsModule
Expand Down Expand Up @@ -107,8 +106,8 @@ export class Agent {
this.messageSender.setOutboundTransporter(outboundTransporter)
}

public getOutboundTransporter() {
return this.messageSender.getOutboundTransporter()
public get outboundTransporter() {
return this.messageSender.outboundTransporter
}

public async init() {
Expand Down
8 changes: 4 additions & 4 deletions src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import { TransportService } from './TransportService'
export class MessageSender {
private envelopeService: EnvelopeService
private transportService: TransportService
private outboundTransporter?: OutboundTransporter
private _outboundTransporter?: OutboundTransporter

public constructor(envelopeService: EnvelopeService, transportService: TransportService) {
this.envelopeService = envelopeService
this.transportService = transportService
}

public setOutboundTransporter(outboundTransporter: OutboundTransporter) {
jakubkoci marked this conversation as resolved.
Show resolved Hide resolved
this.outboundTransporter = outboundTransporter
this._outboundTransporter = outboundTransporter
}

public getOutboundTransporter() {
return this.outboundTransporter
public get outboundTransporter() {
return this._outboundTransporter
}

public async packMessage(outboundMessage: OutboundMessage): Promise<OutboundPackage> {
Expand Down
8 changes: 8 additions & 0 deletions src/transport/HttpOutboundTransporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ export class HttpOutboundTransporter implements OutboundTransporter {
this.logger = agent.injectionContainer.resolve(Symbols.Logger)
}

public async start(): Promise<void> {
// Nothing required to start HTTP
}

public async stop(): Promise<void> {
// Nothing required to stop HTTP
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { payload, endpoint, responseRequested } = outboundPackage

Expand Down
3 changes: 3 additions & 0 deletions src/transport/OutboundTransporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ export interface OutboundTransporter {
supportedSchemes: string[]

sendMessage(outboundPackage: OutboundPackage): Promise<any>

start(): Promise<void>
stop(): Promise<void>
}
101 changes: 101 additions & 0 deletions src/transport/WsOutboundTransporter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { OutboundTransporter } from './OutboundTransporter'
import { Agent } from '../agent/Agent'
import { WebSocketTransport } from '../agent/TransportService'
import { Logger } from '../logger'
import { ConnectionRecord } from '../modules/connections'
import { OutboundPackage } from '../types'
import { Symbols } from '../symbols'
import { WebSocket } from '../utils/ws'

export class WsOutboundTransporter implements OutboundTransporter {
TimoGlastra marked this conversation as resolved.
Show resolved Hide resolved
private transportTable: Map<string, WebSocket> = new Map<string, WebSocket>()
private agent: Agent
private logger: Logger

public supportedSchemes = ['ws', 'wss']

public constructor(agent: Agent) {
this.agent = agent
this.logger = agent.injectionContainer.resolve(Symbols.Logger)
}

public async start(): Promise<void> {
// Nothing required to start WS
}

public async stop() {
this.transportTable.forEach((socket) => {
socket.removeEventListener('message', this.handleMessageEvent)
socket.close()
})
}

public async sendMessage(outboundPackage: OutboundPackage) {
const { connection, payload, transport } = outboundPackage
this.logger.debug(
`Sending outbound message to connection ${connection.id} over ${transport?.type} transport.`,
payload
)

if (transport instanceof WebSocketTransport) {
const socket = await this.resolveSocket(connection, transport)
socket.send(JSON.stringify(payload))
} else {
throw new Error(`Unsupported transport ${transport?.type}.`)
}
}

private async resolveSocket(connection: ConnectionRecord, transport: WebSocketTransport) {
// If we already have a socket connection use it
if (transport.socket?.readyState === WebSocket.OPEN) {
return transport.socket
}

let socket = this.transportTable.get(connection.id)

if (!socket) {
if (!transport.endpoint) {
throw new Error(`Missing endpoint. I don't know how and where to send the message.`)
}
socket = await this.createSocketConnection(transport.endpoint)
this.transportTable.set(connection.id, socket)
this.listenOnWebSocketMessages(socket)
}

if (socket.readyState !== WebSocket.OPEN) {
throw new Error('Socket is not open.')
}

return socket
}

// NOTE: Because this method is passed to the event handler this must be a lambda method
// so 'this' is scoped to the 'WsOutboundTransporter' class instance
private handleMessageEvent = (event: any) => {
this.logger.debug('WebSocket message event received.', { url: event.target.url, data: event.data })
this.agent.receiveMessage(JSON.parse(event.data))
}

private listenOnWebSocketMessages(socket: WebSocket) {
socket.addEventListener('message', this.handleMessageEvent)
}

private createSocketConnection(endpoint: string): Promise<WebSocket> {
return new Promise((resolve, reject) => {
this.logger.debug(`Connecting to WebSocket ${endpoint}`)
const socket = new WebSocket(endpoint)

socket.onopen = () => {
this.logger.debug(`Successfully connected to WebSocket ${endpoint}`)
resolve(socket)
}

socket.onerror = (error) => {
this.logger.debug(`Error while connecting to WebSocket ${endpoint}`, {
error,
})
reject(error)
}
})
}
}
1 change: 1 addition & 0 deletions src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './InboundTransporter'
export * from './OutboundTransporter'
export * from './HttpOutboundTransporter'
export * from './WsOutboundTransporter'
14 changes: 14 additions & 0 deletions src/utils/ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { isNodeJS } from './environment'

// RN exposes global WebSocket
let WebSocket = global.WebSocket

// NodeJS doesn't have WebSocket by default
if (!WebSocket && isNodeJS()) {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const nodeWebSocket = require('ws')
TimoGlastra marked this conversation as resolved.
Show resolved Hide resolved

WebSocket = nodeWebSocket
}

export { WebSocket }