Skip to content

Commit

Permalink
refactor: Move a transport protocol-related logic from the framework …
Browse files Browse the repository at this point in the history
…core (#280)

* refactor: Move logic up from envelope service to message sender
* refactor: Use endpoint from transport service in outbound message context
* refactor: Rename transport to transport session

Signed-off-by: Jakub Koci <[email protected]>
  • Loading branch information
jakubkoci authored May 19, 2021
1 parent e5efce0 commit dc3a575
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 152 deletions.
7 changes: 3 additions & 4 deletions samples/mediator-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ import WebSocket from 'ws'
import cors from 'cors'
import { v4 as uuid } from 'uuid'
import config from './config'
import { Agent, InboundTransporter, WsOutboundTransporter } from '../src'
import { Agent, InboundTransporter, WebSocketTransportSession, WsOutboundTransporter } from '../src'
import { DidCommMimeType } from '../src/types'
import { InMemoryMessageRepository } from '../src/storage/InMemoryMessageRepository'
import { WebSocketTransport } from '../src/agent/TransportService'
import testLogger from '../src/__tests__/logger'

const logger = testLogger
Expand Down Expand Up @@ -40,8 +39,8 @@ class WsInboundTransporter implements InboundTransporter {
socket.addEventListener('message', async (event: any) => {
logger.debug('WebSocket message event received.', { url: event.target.url, data: event.data })
// @ts-expect-error Property 'dispatchEvent' is missing in type WebSocket imported from 'ws' module but required in type 'WebSocket'.
const transport = new WebSocketTransport('', socket)
const outboundMessage = await agent.receiveMessage(JSON.parse(event.data), transport)
const session = new WebSocketTransportSession(socket)
const outboundMessage = await agent.receiveMessage(JSON.parse(event.data), session)
if (outboundMessage) {
socket.send(JSON.stringify(outboundMessage.payload))
}
Expand Down
8 changes: 8 additions & 0 deletions src/__tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ export async function issueCredential({
}
}

/**
* Returns mock of function with correct type annotations according to original function `fn`.
* It can be used also for class methods.
*
* @param fn function you want to mock
* @returns mock function with type annotations
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function mockFunction<T extends (...args: any[]) => any>(fn: T): jest.MockedFunction<T> {
return fn as jest.MockedFunction<T>
}
6 changes: 3 additions & 3 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { BasicMessagesModule } from '../modules/basic-messages/BasicMessagesModu
import { LedgerModule } from '../modules/ledger/LedgerModule'
import { InMemoryMessageRepository } from '../storage/InMemoryMessageRepository'
import { Symbols } from '../symbols'
import { Transport } from './TransportService'
import { TransportSession } from './TransportService'
import { EventEmitter } from './EventEmitter'
import { AgentEventTypes, AgentMessageReceivedEvent } from './Events'

Expand Down Expand Up @@ -136,8 +136,8 @@ export class Agent {
return this.agentConfig.mediatorUrl
}

public async receiveMessage(inboundPackedMessage: unknown, transport?: Transport) {
return await this.messageReceiver.receiveMessage(inboundPackedMessage, transport)
public async receiveMessage(inboundPackedMessage: unknown, session?: TransportSession) {
return await this.messageReceiver.receiveMessage(inboundPackedMessage, session)
}

public async closeAndDeleteWallet() {
Expand Down
24 changes: 7 additions & 17 deletions src/agent/EnvelopeService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { inject, scoped, Lifecycle } from 'tsyringe'
import { OutboundMessage, OutboundPackage, UnpackedMessageContext } from '../types'
import { OutboundMessage, UnpackedMessageContext } from '../types'
import { Wallet } from '../wallet/Wallet'
import { ForwardMessage } from '../modules/routing/messages'
import { AgentConfig } from './AgentConfig'
Expand All @@ -16,36 +16,26 @@ class EnvelopeService {
this.logger = agentConfig.logger
}

public async packMessage(outboundMessage: OutboundMessage): Promise<OutboundPackage> {
const { connection, routingKeys, recipientKeys, senderVk, payload, endpoint } = outboundMessage
const { verkey, theirKey } = connection

const returnRoute = outboundMessage.payload.hasReturnRouting()
public async packMessage(outboundMessage: OutboundMessage): Promise<JsonWebKey> {
const { routingKeys, recipientKeys, senderVk, payload } = outboundMessage
const message = payload.toJSON()

this.logger.info('outboundMessage', {
verkey,
theirKey,
routingKeys,
endpoint,
message,
})
let outboundPackedMessage = await this.wallet.pack(message, recipientKeys, senderVk)
let wireMessage = await this.wallet.pack(message, recipientKeys, senderVk)

if (routingKeys && routingKeys.length > 0) {
for (const routingKey of routingKeys) {
const [recipientKey] = recipientKeys

const forwardMessage = new ForwardMessage({
to: recipientKey,
message: outboundPackedMessage,
message: wireMessage,
})

this.logger.debug('Forward message created', forwardMessage)
outboundPackedMessage = await this.wallet.pack(forwardMessage.toJSON(), [routingKey], senderVk)
wireMessage = await this.wallet.pack(forwardMessage.toJSON(), [routingKey], senderVk)
}
}
return { connection, payload: outboundPackedMessage, endpoint, responseRequested: returnRoute }
return wireMessage
}

public async unpackMessage(packedMessage: JsonWebKey): Promise<UnpackedMessageContext> {
Expand Down
8 changes: 4 additions & 4 deletions src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { AgentMessage } from './AgentMessage'
import { JsonTransformer } from '../utils/JsonTransformer'
import { Logger } from '../logger'
import { replaceLegacyDidSovPrefixOnMessage } from '../utils/messageType'
import { Transport, TransportService } from './TransportService'
import { TransportSession, TransportService } from './TransportService'
import { AriesFrameworkError } from '../error'

@scoped(Lifecycle.ContainerScoped)
Expand Down Expand Up @@ -44,7 +44,7 @@ export class MessageReceiver {
*
* @param inboundPackedMessage the message to receive and handle
*/
public async receiveMessage(inboundPackedMessage: unknown, transport?: Transport) {
public async receiveMessage(inboundPackedMessage: unknown, session?: TransportSession) {
if (typeof inboundPackedMessage !== 'object' || inboundPackedMessage == null) {
throw new AriesFrameworkError('Invalid message received. Message should be object')
}
Expand All @@ -70,8 +70,8 @@ export class MessageReceiver {
}
}

if (connection && transport) {
this.transportService.saveTransport(connection.id, transport)
if (connection && session) {
this.transportService.saveSession(connection.id, session)
}

this.logger.info(`Received message with type '${unpackedMessage.message['@type']}'`, unpackedMessage.message)
Expand Down
26 changes: 20 additions & 6 deletions src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
import { Lifecycle, scoped } from 'tsyringe'
import { inject, Lifecycle, scoped } from 'tsyringe'

import { OutboundMessage, OutboundPackage } from '../types'
import { OutboundTransporter } from '../transport/OutboundTransporter'
import { EnvelopeService } from './EnvelopeService'
import { TransportService } from './TransportService'
import { AriesFrameworkError } from '../error'
import { Logger } from '../logger'
import { Symbols } from '../symbols'

@scoped(Lifecycle.ContainerScoped)
export class MessageSender {
private envelopeService: EnvelopeService
private transportService: TransportService
private logger: Logger
private _outboundTransporter?: OutboundTransporter

public constructor(envelopeService: EnvelopeService, transportService: TransportService) {
public constructor(
envelopeService: EnvelopeService,
transportService: TransportService,
@inject(Symbols.Logger) logger: Logger
) {
this.envelopeService = envelopeService
this.transportService = transportService
this.logger = logger
}

public setOutboundTransporter(outboundTransporter: OutboundTransporter) {
Expand All @@ -26,16 +34,22 @@ export class MessageSender {
}

public async packMessage(outboundMessage: OutboundMessage): Promise<OutboundPackage> {
return this.envelopeService.packMessage(outboundMessage)
const { connection, payload } = outboundMessage
const { verkey, theirKey } = connection
const endpoint = this.transportService.findEndpoint(connection)
const message = payload.toJSON()
this.logger.debug('outboundMessage', { verkey, theirKey, message })
const responseRequested = outboundMessage.payload.hasReturnRouting()
const wireMessage = await this.envelopeService.packMessage(outboundMessage)
return { connection, payload: wireMessage, endpoint, responseRequested }
}

public async sendMessage(outboundMessage: OutboundMessage): Promise<void> {
if (!this.outboundTransporter) {
throw new AriesFrameworkError('Agent has no outbound transporter!')
}
const outboundPackage = await this.envelopeService.packMessage(outboundMessage)
const transport = this.transportService.resolveTransport(outboundMessage.connection)
outboundPackage.transport = transport
const outboundPackage = await this.packMessage(outboundMessage)
outboundPackage.session = this.transportService.findSession(outboundMessage.connection.id)
await this.outboundTransporter.sendMessage(outboundPackage)
}
}
75 changes: 14 additions & 61 deletions src/agent/TransportService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,102 +4,55 @@ import { Logger } from '../logger'
import { ConnectionRecord } from '../modules/connections/repository'
import { ConnectionRole } from '../modules/connections/models'
import { Symbols } from '../symbols'
import { AriesFrameworkError } from '../error'

export const DID_COMM_TRANSPORT_QUEUE = 'didcomm:transport/queue'

@scoped(Lifecycle.ContainerScoped)
export class TransportService {
private transportTable: TransportTable = {}
private transportSessionTable: TransportSessionTable = {}
private logger: Logger

public constructor(@inject(Symbols.Logger) logger: Logger) {
this.logger = logger
}

public saveTransport(connectionId: string, transport: Transport) {
this.transportTable[connectionId] = transport
}

public resolveTransport(connection: ConnectionRecord): Transport {
const transport = this.findTransport(connection.id)
if (transport) {
return transport
}

const endpoint = this.findEndpoint(connection)
if (endpoint) {
if (endpoint.startsWith('ws')) {
return new WebSocketTransport(endpoint)
} else if (endpoint.startsWith('http')) {
return new HttpTransport(endpoint)
} else if (endpoint === DID_COMM_TRANSPORT_QUEUE) {
return new DidCommQueueTransport()
}
throw new Error(`Unsupported scheme in endpoint: ${endpoint}.`)
}

throw new Error(`No transport found for connection with id ${connection.id}`)
public saveSession(connectionId: string, transport: TransportSession) {
this.transportSessionTable[connectionId] = transport
}

public hasInboundEndpoint(connection: ConnectionRecord) {
return connection.didDoc.didCommServices.find((s) => s.serviceEndpoint !== DID_COMM_TRANSPORT_QUEUE)
}

private findTransport(connectionId: string) {
return this.transportTable[connectionId]
public findSession(connectionId: string) {
return this.transportSessionTable[connectionId]
}

private findEndpoint(connection: ConnectionRecord) {
public findEndpoint(connection: ConnectionRecord) {
if (connection.theirDidDoc) {
const endpoint = connection.theirDidDoc.didCommServices[0].serviceEndpoint
if (endpoint) {
this.logger.debug('Taking service endpoint from their DidDoc')
this.logger.debug(`Taking service endpoint ${endpoint} from their DidDoc`)
return endpoint
}
}

if (connection.role === ConnectionRole.Invitee && connection.invitation) {
const endpoint = connection.invitation.serviceEndpoint
if (endpoint) {
this.logger.debug('Taking service endpoint from invitation')
this.logger.debug(`Taking service endpoint ${endpoint} from invitation`)
return endpoint
}
}
throw new AriesFrameworkError(`No endpoint found for connection with id ${connection.id}`)
}
}

interface TransportTable {
[connectionRecordId: string]: Transport
}

type TransportType = 'websocket' | 'http' | 'queue'

export interface Transport {
type: TransportType
endpoint: string
}

export class WebSocketTransport implements Transport {
public readonly type = 'websocket'
public endpoint: string
public socket?: WebSocket

public constructor(endpoint: string, socket?: WebSocket) {
this.endpoint = endpoint
this.socket = socket
}
}

export class HttpTransport implements Transport {
public readonly type = 'http'
public endpoint: string

public constructor(endpoint: string) {
this.endpoint = endpoint
}
interface TransportSessionTable {
[connectionRecordId: string]: TransportSession
}

export class DidCommQueueTransport implements Transport {
public readonly type = 'queue'
public endpoint = DID_COMM_TRANSPORT_QUEUE
export interface TransportSession {
type: string
}
Loading

0 comments on commit dc3a575

Please sign in to comment.