diff --git a/docs/getting-started/1-transports.md b/docs/getting-started/1-transports.md index e67954e942..dda96c257f 100644 --- a/docs/getting-started/1-transports.md +++ b/docs/getting-started/1-transports.md @@ -18,11 +18,11 @@ const agent = new Agent({ // Use HTTP as outbound transporter const httpOutboundTransporter = new HttpOutboundTransporter() -agent.setOutboundTransporter(httpOutboundTransporter) +agent.registerOutboundTransporter(httpOutboundTransporter) // Or use WebSocket instead const wsOutboundTransporter = new WsOutboundTransporter() -agent.setOutboundTransporter(wsOutboundTransporter) +agent.registerOutboundTransporter(wsOutboundTransporter) ``` ## Inbound Transport diff --git a/packages/core/src/agent/Agent.ts b/packages/core/src/agent/Agent.ts index 71d20f2e04..a66a2b835b 100644 --- a/packages/core/src/agent/Agent.ts +++ b/packages/core/src/agent/Agent.ts @@ -118,12 +118,12 @@ export class Agent { this.inboundTransporter = inboundTransporter } - public setOutboundTransporter(outboundTransporter: OutboundTransporter) { - this.messageSender.setOutboundTransporter(outboundTransporter) + public registerOutboundTransporter(outboundTransporter: OutboundTransporter) { + this.messageSender.registerOutboundTransporter(outboundTransporter) } - public get outboundTransporter() { - return this.messageSender.outboundTransporter + public get outboundTransporters() { + return this.messageSender.outboundTransporters } public get events() { @@ -162,23 +162,15 @@ export class Agent { await this.inboundTransporter.start(this) } - if (this.outboundTransporter) { - await this.outboundTransporter.start(this) + for (const transport of this.messageSender.outboundTransporters) { + transport.start(this) } // Connect to mediator through provided invitation if provided in config // Also requests mediation ans sets as default mediator // Because this requires the connections module, we do this in the agent constructor if (mediatorConnectionsInvite) { - // Assumption: processInvitation is a URL-encoded invitation - let connectionRecord = await this.connections.receiveInvitationFromUrl(mediatorConnectionsInvite, { - autoAcceptConnection: true, - }) - - // TODO: add timeout to returnWhenIsConnected - connectionRecord = await this.connections.returnWhenIsConnected(connectionRecord.id) - const mediationRecord = await this.mediationRecipient.requestAndAwaitGrant(connectionRecord, 60000) // TODO: put timeout as a config parameter - await this.mediationRecipient.setDefaultMediator(mediationRecord) + await this.mediationRecipient.provision(mediatorConnectionsInvite) } await this.mediationRecipient.initialize() @@ -192,7 +184,9 @@ export class Agent { this.agentConfig.stop$.next(true) // Stop transports - await this.outboundTransporter?.stop() + for (const transport of this.messageSender.outboundTransporters) { + transport.stop() + } await this.inboundTransporter?.stop() // close/delete wallet if still initialized diff --git a/packages/core/src/agent/MessageSender.ts b/packages/core/src/agent/MessageSender.ts index 6375994e78..7d1ba4bba2 100644 --- a/packages/core/src/agent/MessageSender.ts +++ b/packages/core/src/agent/MessageSender.ts @@ -16,13 +16,18 @@ import { MessageRepository } from '../storage/MessageRepository' import { EnvelopeService } from './EnvelopeService' import { TransportService } from './TransportService' +export interface TransportPriorityOptions { + schemes: string[] + restrictive?: boolean +} + @scoped(Lifecycle.ContainerScoped) export class MessageSender { private envelopeService: EnvelopeService private transportService: TransportService private messageRepository: MessageRepository private logger: Logger - private _outboundTransporter?: OutboundTransporter + private outboundTransports: OutboundTransporter[] = [] public constructor( envelopeService: EnvelopeService, @@ -34,14 +39,15 @@ export class MessageSender { this.transportService = transportService this.messageRepository = messageRepository this.logger = logger + this.outboundTransports = [] } - public setOutboundTransporter(outboundTransporter: OutboundTransporter) { - this._outboundTransporter = outboundTransporter + public registerOutboundTransporter(outboundTransporter: OutboundTransporter) { + this.outboundTransports.push(outboundTransporter) } - public get outboundTransporter() { - return this._outboundTransporter + public get outboundTransporters() { + return this.outboundTransports } public async packMessage({ @@ -77,9 +83,11 @@ export class MessageSender { public async sendPackage({ connection, packedMessage, + options, }: { connection: ConnectionRecord packedMessage: WireMessage + options?: { transportPriority?: TransportPriorityOptions } }) { // Try to send to already open session const session = this.transportService.findSessionByConnectionId(connection.id) @@ -93,26 +101,25 @@ export class MessageSender { } // Retrieve DIDComm services - const allServices = this.transportService.findDidCommServices(connection) - const reachableServices = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint)) - const queueService = allServices.find((s) => isDidCommTransportQueue(s.serviceEndpoint)) + const { services, queueService } = await this.retrieveServicesByConnection(connection, options?.transportPriority) - this.logger.debug( - `Found ${allServices.length} services for message to connection '${connection.id}' (${connection.theirLabel})` - ) - - if (!this.outboundTransporter) { + if (this.outboundTransporters.length === 0 && !queueService) { throw new AriesFrameworkError('Agent has no outbound transporter!') } // Loop trough all available services and try to send the message - for await (const service of reachableServices) { + for await (const service of services) { this.logger.debug(`Sending outbound message to service:`, { service }) try { - await this.outboundTransporter.sendMessage({ - payload: packedMessage, - endpoint: service.serviceEndpoint, - }) + for (const transport of this.outboundTransporters) { + if (transport.supportedSchemes.includes(service.protocolScheme)) { + await transport.sendMessage({ + payload: packedMessage, + endpoint: service.serviceEndpoint, + }) + break + } + } return } catch (error) { this.logger.debug( @@ -141,11 +148,12 @@ export class MessageSender { throw new AriesFrameworkError(`Message is undeliverable to connection ${connection.id} (${connection.theirLabel})`) } - public async sendMessage(outboundMessage: OutboundMessage) { - if (!this.outboundTransporter) { - throw new AriesFrameworkError('Agent has no outbound transporter!') + public async sendMessage( + outboundMessage: OutboundMessage, + options?: { + transportPriority?: TransportPriorityOptions } - + ) { const { connection, payload } = outboundMessage this.logger.debug('Send outbound message', { @@ -165,16 +173,10 @@ export class MessageSender { } // Retrieve DIDComm services - const allServices = this.transportService.findDidCommServices(connection) - const reachableServices = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint)) - const queueService = allServices.find((s) => isDidCommTransportQueue(s.serviceEndpoint)) - - this.logger.debug( - `Found ${allServices.length} services for message to connection '${connection.id}' (${connection.theirLabel})` - ) + const { services, queueService } = await this.retrieveServicesByConnection(connection, options?.transportPriority) // Loop trough all available services and try to send the message - for await (const service of reachableServices) { + for await (const service of services) { try { // Enable return routing if the const shouldUseReturnRoute = !this.transportService.hasInboundEndpoint(connection.didDoc) @@ -232,7 +234,7 @@ export class MessageSender { senderKey: string returnRoute?: boolean }) { - if (!this.outboundTransporter) { + if (this.outboundTransports.length === 0) { throw new AriesFrameworkError('Agent has no outbound transporter!') } @@ -250,7 +252,50 @@ export class MessageSender { } const outboundPackage = await this.packMessage({ message, keys, endpoint: service.serviceEndpoint }) - await this.outboundTransporter.sendMessage(outboundPackage) + outboundPackage.endpoint = service.serviceEndpoint + for (const transport of this.outboundTransporters) { + if (transport.supportedSchemes.includes(service.protocolScheme)) { + await transport.sendMessage(outboundPackage) + break + } + } + } + + private async retrieveServicesByConnection( + connection: ConnectionRecord, + transportPriority?: TransportPriorityOptions + ) { + this.logger.debug(`Retrieving services for connection '${connection.id}' (${connection.theirLabel})`, { + transportPriority, + }) + // Retrieve DIDComm services + const allServices = this.transportService.findDidCommServices(connection) + + //Separate queue service out + const services = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint)) + const queueService = allServices.find((s) => isDidCommTransportQueue(s.serviceEndpoint)) + + //If restrictive will remove services not listed in schemes list + if (transportPriority?.restrictive) { + services.filter((service) => { + const serviceSchema = service.protocolScheme + return transportPriority.schemes.includes(serviceSchema) + }) + } + + //If transport priority is set we will sort services by our priority + if (transportPriority?.schemes) { + services.sort(function (a, b) { + const aScheme = a.protocolScheme + const bScheme = b.protocolScheme + return transportPriority?.schemes.indexOf(aScheme) - transportPriority?.schemes.indexOf(bScheme) + }) + } + + this.logger.debug( + `Retrieved ${services.length} services for message to connection '${connection.id}'(${connection.theirLabel})'` + ) + return { services, queueService } } } diff --git a/packages/core/src/agent/__tests__/MessageSender.test.ts b/packages/core/src/agent/__tests__/MessageSender.test.ts index ac39a4aea8..f0fc2aa3de 100644 --- a/packages/core/src/agent/__tests__/MessageSender.test.ts +++ b/packages/core/src/agent/__tests__/MessageSender.test.ts @@ -31,7 +31,7 @@ class DummyOutboundTransporter implements OutboundTransporter { throw new Error('Method not implemented.') } - public supportedSchemes: string[] = [] + public supportedSchemes: string[] = ['https'] public sendMessage() { return Promise.resolve() @@ -109,11 +109,11 @@ describe('MessageSender', () => { }) test('throw error when there is no outbound transport', async () => { - await expect(messageSender.sendMessage(outboundMessage)).rejects.toThrow(`Agent has no outbound transporter!`) + await expect(messageSender.sendMessage(outboundMessage)).rejects.toThrow(/Message is undeliverable to connection/) }) test('throw error when there is no service or queue', async () => { - messageSender.setOutboundTransporter(outboundTransporter) + messageSender.registerOutboundTransporter(outboundTransporter) transportServiceFindServicesMock.mockReturnValue([]) await expect(messageSender.sendMessage(outboundMessage)).rejects.toThrow( @@ -122,11 +122,11 @@ describe('MessageSender', () => { }) test('call send message when session send method fails', async () => { - messageSender.setOutboundTransporter(outboundTransporter) + messageSender.registerOutboundTransporter(outboundTransporter) transportServiceFindSessionMock.mockReturnValue(session) session.send = jest.fn().mockRejectedValue(new Error('some error')) - messageSender.setOutboundTransporter(outboundTransporter) + messageSender.registerOutboundTransporter(outboundTransporter) const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage') await messageSender.sendMessage(outboundMessage) @@ -140,10 +140,10 @@ describe('MessageSender', () => { }) test('call send message when session send method fails with missing keys', async () => { - messageSender.setOutboundTransporter(outboundTransporter) + messageSender.registerOutboundTransporter(outboundTransporter) transportServiceFindSessionMock.mockReturnValue(sessionWithoutKeys) - messageSender.setOutboundTransporter(outboundTransporter) + messageSender.registerOutboundTransporter(outboundTransporter) const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage') await messageSender.sendMessage(outboundMessage) @@ -157,7 +157,7 @@ describe('MessageSender', () => { }) test('call send message on session when there is a session for a given connection', async () => { - messageSender.setOutboundTransporter(outboundTransporter) + messageSender.registerOutboundTransporter(outboundTransporter) const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage') const sendMessageToServiceSpy = jest.spyOn(messageSender, 'sendMessageToService') @@ -174,7 +174,7 @@ describe('MessageSender', () => { }) test('calls sendMessageToService with payload and endpoint from second DidComm service when the first fails', async () => { - messageSender.setOutboundTransporter(outboundTransporter) + messageSender.registerOutboundTransporter(outboundTransporter) const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage') const sendMessageToServiceSpy = jest.spyOn(messageSender, 'sendMessageToService') @@ -229,7 +229,7 @@ describe('MessageSender', () => { }) test('calls send message with payload and endpoint from DIDComm service', async () => { - messageSender.setOutboundTransporter(outboundTransporter) + messageSender.registerOutboundTransporter(outboundTransporter) const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage') await messageSender.sendMessageToService({ @@ -247,7 +247,7 @@ describe('MessageSender', () => { }) test('call send message with responseRequested when message has return route', async () => { - messageSender.setOutboundTransporter(outboundTransporter) + messageSender.registerOutboundTransporter(outboundTransporter) const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage') const message = new AgentMessage() diff --git a/packages/core/src/modules/connections/ConnectionsModule.ts b/packages/core/src/modules/connections/ConnectionsModule.ts index dfdc31380f..408d58270c 100644 --- a/packages/core/src/modules/connections/ConnectionsModule.ts +++ b/packages/core/src/modules/connections/ConnectionsModule.ts @@ -221,6 +221,17 @@ export class ConnectionsModule { return this.connectionService.findByTheirKey(verkey) } + /** + * Find connection by Invitation key. + * + * @param key the invitation key to search for + * @returns the connection record, or null if not found + * @throws {RecordDuplicateError} if multiple connections are found for the given verkey + */ + public findByInvitationKey(key: string): Promise { + return this.connectionService.findByInvitationKey(key) + } + /** * Retrieve a connection record by thread id * diff --git a/packages/core/src/modules/connections/models/did/service/Service.ts b/packages/core/src/modules/connections/models/did/service/Service.ts index 66d8ff3d18..3ed46b0aa7 100644 --- a/packages/core/src/modules/connections/models/did/service/Service.ts +++ b/packages/core/src/modules/connections/models/did/service/Service.ts @@ -9,6 +9,10 @@ export class Service { } } + public get protocolScheme(): string { + return this.serviceEndpoint.split(':')[0] + } + @IsString() public id!: string diff --git a/packages/core/src/modules/connections/services/ConnectionService.ts b/packages/core/src/modules/connections/services/ConnectionService.ts index a3a906ad66..5d8cf03e15 100644 --- a/packages/core/src/modules/connections/services/ConnectionService.ts +++ b/packages/core/src/modules/connections/services/ConnectionService.ts @@ -493,6 +493,19 @@ export class ConnectionService { }) } + /** + * Find connection by invitation key. + * + * @param key the invitation key to search for + * @returns the connection record, or null if not found + * @throws {RecordDuplicateError} if multiple connections are found for the given verkey + */ + public findByInvitationKey(key: string): Promise { + return this.connectionRepository.findSingleByQuery({ + invitationKey: key, + }) + } + /** * Retrieve a connection record by thread id * diff --git a/packages/core/src/modules/routing/RecipientModule.ts b/packages/core/src/modules/routing/RecipientModule.ts index 96b62b5130..8be1ad7a81 100644 --- a/packages/core/src/modules/routing/RecipientModule.ts +++ b/packages/core/src/modules/routing/RecipientModule.ts @@ -1,3 +1,4 @@ +import type { Logger } from '../../logger' import type { ConnectionRecord } from '../connections' import type { MediationStateChangedEvent } from './RoutingEvents' import type { MediationRecord } from './index' @@ -11,6 +12,8 @@ import { Dispatcher } from '../../agent/Dispatcher' import { EventEmitter } from '../../agent/EventEmitter' import { MessageSender } from '../../agent/MessageSender' import { createOutboundMessage } from '../../agent/helpers' +import { AriesFrameworkError } from '../../error' +import { ConnectionInvitationMessage } from '../connections' import { ConnectionService } from '../connections/services' import { MediatorPickupStrategy } from './MediatorPickupStrategy' @@ -29,6 +32,7 @@ export class RecipientModule { private connectionService: ConnectionService private messageSender: MessageSender private eventEmitter: EventEmitter + private logger: Logger public constructor( dispatcher: Dispatcher, @@ -43,6 +47,7 @@ export class RecipientModule { this.mediationRecipientService = mediationRecipientService this.messageSender = messageSender this.eventEmitter = eventEmitter + this.logger = agentConfig.logger this.registerHandlers(dispatcher) } @@ -178,6 +183,61 @@ export class RecipientModule { return event.payload.mediationRecord } + public async provision(mediatorConnInvite: string) { + this.logger.debug('Provision Mediation with invitation', { invite: mediatorConnInvite }) + // Connect to mediator through provided invitation + // Also requests mediation and sets as default mediator + // Assumption: processInvitation is a URL-encoded invitation + const invitation = await ConnectionInvitationMessage.fromUrl(mediatorConnInvite) + // Check if invitation has been used already + if (!invitation || !invitation.recipientKeys || !invitation.recipientKeys[0]) { + throw new AriesFrameworkError(`Invalid mediation invitation. Invitation must have at least one recipient key.`) + } + const connection = await this.connectionService.findByInvitationKey(invitation.recipientKeys[0]) + if (!connection) { + this.logger.debug('Mediation Connection does not exist, creating connection') + const routing = await this.mediationRecipientService.getRouting() + + const invitationConnectionRecord = await this.connectionService.processInvitation(invitation, { + autoAcceptConnection: true, + routing, + }) + this.logger.debug('Processed mediation invitation', { + connectionId: invitationConnectionRecord, + }) + const { message, connectionRecord } = await this.connectionService.createRequest(invitationConnectionRecord.id) + const outbound = createOutboundMessage(connectionRecord, message) + await this.messageSender.sendMessage(outbound) + + // TODO: add timeout to returnWhenIsConnected + const completedConnectionRecord = await this.connectionService.returnWhenIsConnected(connectionRecord.id) + this.logger.debug('Connection completed, requesting mediation') + const mediationRecord = await this.requestAndAwaitGrant(completedConnectionRecord, 60000) // TODO: put timeout as a config parameter + this.logger.debug('Mediation Granted, setting as default mediator') + await this.setDefaultMediator(mediationRecord) + this.logger.debug('Default mediator set') + return + } else if (connection && !connection.isReady) { + const connectionRecord = await this.connectionService.returnWhenIsConnected(connection.id) + const mediationRecord = await this.requestAndAwaitGrant(connectionRecord, 60000) // TODO: put timeout as a config parameter + await this.setDefaultMediator(mediationRecord) + return + } else if (connection.isReady) { + this.agentConfig.logger.warn('Mediator Invitation in configuration has already been used to create a connection.') + const mediator = await this.findByConnectionId(connection.id) + if (!mediator) { + this.agentConfig.logger.warn('requesting mediation over connection.') + const mediationRecord = await this.requestAndAwaitGrant(connection, 60000) // TODO: put timeout as a config parameter + await this.setDefaultMediator(mediationRecord) + } else { + this.agentConfig.logger.warn( + `Mediator Invitation in configuration has already been ${ + mediator.isReady ? 'granted' : 'requested' + } mediation` + ) + } + } + } // Register handlers for the several messages for the mediator. private registerHandlers(dispatcher: Dispatcher) { dispatcher.registerHandler(new KeylistUpdateResponseHandler(this.mediationRecipientService)) diff --git a/packages/core/src/modules/routing/__tests__/mediation.test.ts b/packages/core/src/modules/routing/__tests__/mediation.test.ts index 7d665b7b8c..8e3c164db2 100644 --- a/packages/core/src/modules/routing/__tests__/mediation.test.ts +++ b/packages/core/src/modules/routing/__tests__/mediation.test.ts @@ -55,7 +55,7 @@ describe('mediator establishment', () => { // Initialize mediatorReceived message mediatorAgent = new Agent(mediatorConfig.config, recipientConfig.agentDependencies) - mediatorAgent.setOutboundTransporter(new SubjectOutboundTransporter(mediatorMessages, subjectMap)) + mediatorAgent.registerOutboundTransporter(new SubjectOutboundTransporter(mediatorMessages, subjectMap)) mediatorAgent.setInboundTransporter(new SubjectInboundTransporter(mediatorMessages)) await mediatorAgent.initialize() @@ -72,7 +72,7 @@ describe('mediator establishment', () => { { ...recipientConfig.config, mediatorConnectionsInvite: mediatorInvitation.toUrl() }, recipientConfig.agentDependencies ) - recipientAgent.setOutboundTransporter(new SubjectOutboundTransporter(recipientMessages, subjectMap)) + recipientAgent.registerOutboundTransporter(new SubjectOutboundTransporter(recipientMessages, subjectMap)) recipientAgent.setInboundTransporter(new SubjectInboundTransporter(recipientMessages)) await recipientAgent.initialize() @@ -94,7 +94,7 @@ describe('mediator establishment', () => { // Initialize sender agent senderAgent = new Agent(senderConfig.config, senderConfig.agentDependencies) - senderAgent.setOutboundTransporter(new SubjectOutboundTransporter(senderMessages, subjectMap)) + senderAgent.registerOutboundTransporter(new SubjectOutboundTransporter(senderMessages, subjectMap)) senderAgent.setInboundTransporter(new SubjectInboundTransporter(senderMessages)) await senderAgent.initialize() diff --git a/packages/core/src/modules/routing/messages/KeylistUpdateMessage.ts b/packages/core/src/modules/routing/messages/KeylistUpdateMessage.ts index 5a044069e5..205d8869ee 100644 --- a/packages/core/src/modules/routing/messages/KeylistUpdateMessage.ts +++ b/packages/core/src/modules/routing/messages/KeylistUpdateMessage.ts @@ -25,7 +25,7 @@ export class KeylistUpdateMessage extends AgentMessage { @Equals(KeylistUpdateMessage.type) public readonly type = KeylistUpdateMessage.type - public static readonly type = 'https://didcomm.org/coordinatemediation/1.0/keylist-update' + public static readonly type = 'https://didcomm.org/coordinate-mediation/1.0/keylist-update' @Type(() => KeylistUpdate) @IsArray() diff --git a/packages/core/src/modules/routing/messages/KeylistUpdateResponseMessage.ts b/packages/core/src/modules/routing/messages/KeylistUpdateResponseMessage.ts index c90d9539e7..0f5d5f4fa9 100644 --- a/packages/core/src/modules/routing/messages/KeylistUpdateResponseMessage.ts +++ b/packages/core/src/modules/routing/messages/KeylistUpdateResponseMessage.ts @@ -31,7 +31,7 @@ export class KeylistUpdateResponseMessage extends AgentMessage { @Equals(KeylistUpdateResponseMessage.type) public readonly type = KeylistUpdateResponseMessage.type - public static readonly type = 'https://didcomm.org/coordinatemediation/1.0/keylist-update-response' + public static readonly type = 'https://didcomm.org/coordinate-mediation/1.0/keylist-update-response' @Type(() => KeylistUpdated) @IsArray() diff --git a/packages/core/src/transport/WsOutboundTransporter.ts b/packages/core/src/transport/WsOutboundTransporter.ts index 79aa53e41f..31d2446611 100644 --- a/packages/core/src/transport/WsOutboundTransporter.ts +++ b/packages/core/src/transport/WsOutboundTransporter.ts @@ -12,12 +12,13 @@ export class WsOutboundTransporter implements OutboundTransporter { private agent!: Agent private logger!: Logger private WebSocketClass!: typeof WebSocket - + private continue!: boolean + private recursiveBackOff: { [socketId: string]: number } = {} public supportedSchemes = ['ws', 'wss'] public async start(agent: Agent): Promise { this.agent = agent - + this.continue = true const agentConfig = agent.injectionContainer.resolve(AgentConfig) this.logger = agentConfig.logger @@ -27,7 +28,7 @@ export class WsOutboundTransporter implements OutboundTransporter { public async stop() { this.logger.debug('Stopping WS outbound transport') - + this.continue = false this.transportTable.forEach((socket) => { socket.removeEventListener('message', this.handleMessageEvent) socket.close() @@ -47,7 +48,7 @@ export class WsOutboundTransporter implements OutboundTransporter { const isNewSocket = this.hasOpenSocket(endpoint) const socket = await this.resolveSocket(endpoint, endpoint) - socket.send(JSON.stringify(payload)) + socket.send(Buffer.from(JSON.stringify(payload))) // If the socket was created for this message and we don't have return routing enabled // We can close the socket as it shouldn't return messages anymore @@ -84,8 +85,10 @@ export class WsOutboundTransporter implements OutboundTransporter { // so 'this' is scoped to the 'WsOutboundTransporter' class instance // eslint-disable-next-line @typescript-eslint/no-explicit-any private handleMessageEvent = (event: any) => { - this.logger.debug('WebSocket message event received.', { url: event.target.url, data: event.data }) - this.agent.receiveMessage(JSON.parse(event.data)) + this.logger.trace('WebSocket message event received.', { url: event.target.url, data: event.data }) + const payload = JSON.parse(Buffer.from(event.data).toString('utf-8')) + this.logger.debug('Payload received from mediator:', payload) + this.agent.receiveMessage(payload) } private listenOnWebSocketMessages(socket: WebSocket) { @@ -109,9 +112,29 @@ export class WsOutboundTransporter implements OutboundTransporter { reject(error) } - socket.onclose = () => { + socket.onclose = async () => { + this.logger.debug(`WebSocket closing to ${endpoint}`) socket.removeEventListener('message', this.handleMessageEvent) this.transportTable.delete(socketId) + if (this.continue) { + const mediators = await this.agent.mediationRecipient.getMediators() + const mediatorConnIds = mediators.map((mediator) => mediator.connectionId) + if (mediatorConnIds.includes(socketId)) { + this.logger.debug(`WebSocket attempting to reconnect to ${endpoint}`) + // send trustPing to mediator to open socket + let interval = 100 + if (this.recursiveBackOff[socketId as string]) { + interval = this.recursiveBackOff[socketId] + } + setTimeout( + () => { + this.agent.connections.acceptResponse(socketId) + }, + interval < 1000 ? interval : 1000 + ) + this.recursiveBackOff[socketId] = interval * 2 + } + } } }) } diff --git a/packages/core/tests/agents.test.ts b/packages/core/tests/agents.test.ts index dc43752c9f..37fbbb60e5 100644 --- a/packages/core/tests/agents.test.ts +++ b/packages/core/tests/agents.test.ts @@ -43,12 +43,12 @@ describe('agents', () => { aliceAgent = new Agent(aliceConfig.config, aliceConfig.agentDependencies) aliceAgent.setInboundTransporter(new SubjectInboundTransporter(aliceMessages)) - aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages, subjectMap)) + aliceAgent.registerOutboundTransporter(new SubjectOutboundTransporter(aliceMessages, subjectMap)) await aliceAgent.initialize() bobAgent = new Agent(bobConfig.config, bobConfig.agentDependencies) bobAgent.setInboundTransporter(new SubjectInboundTransporter(bobMessages)) - bobAgent.setOutboundTransporter(new SubjectOutboundTransporter(bobMessages, subjectMap)) + bobAgent.registerOutboundTransporter(new SubjectOutboundTransporter(bobMessages, subjectMap)) await bobAgent.initialize() const aliceConnectionAtAliceBob = await aliceAgent.connections.createConnection() diff --git a/packages/core/tests/connectionless-credentials.test.ts b/packages/core/tests/connectionless-credentials.test.ts index 954c71930c..0889df22da 100644 --- a/packages/core/tests/connectionless-credentials.test.ts +++ b/packages/core/tests/connectionless-credentials.test.ts @@ -46,12 +46,12 @@ describe('credentials', () => { } faberAgent = new Agent(faberConfig.config, faberConfig.agentDependencies) faberAgent.setInboundTransporter(new SubjectInboundTransporter(faberMessages)) - faberAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages, subjectMap)) + faberAgent.registerOutboundTransporter(new SubjectOutboundTransporter(aliceMessages, subjectMap)) await faberAgent.initialize() aliceAgent = new Agent(aliceConfig.config, aliceConfig.agentDependencies) aliceAgent.setInboundTransporter(new SubjectInboundTransporter(aliceMessages)) - aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(faberMessages, subjectMap)) + aliceAgent.registerOutboundTransporter(new SubjectOutboundTransporter(faberMessages, subjectMap)) await aliceAgent.initialize() const { definition } = await prepareForIssuance(faberAgent, ['name', 'age']) diff --git a/packages/core/tests/helpers.ts b/packages/core/tests/helpers.ts index 421557cf05..74f1970871 100644 --- a/packages/core/tests/helpers.ts +++ b/packages/core/tests/helpers.ts @@ -539,12 +539,12 @@ export async function setupCredentialTests( }) const faberAgent = new Agent(faberConfig.config, faberConfig.agentDependencies) faberAgent.setInboundTransporter(new SubjectInboundTransporter(faberMessages)) - faberAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages, subjectMap)) + faberAgent.registerOutboundTransporter(new SubjectOutboundTransporter(aliceMessages, subjectMap)) await faberAgent.initialize() const aliceAgent = new Agent(aliceConfig.config, aliceConfig.agentDependencies) aliceAgent.setInboundTransporter(new SubjectInboundTransporter(aliceMessages)) - aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(faberMessages, subjectMap)) + aliceAgent.registerOutboundTransporter(new SubjectOutboundTransporter(faberMessages, subjectMap)) await aliceAgent.initialize() const { @@ -586,12 +586,12 @@ export async function setupProofsTest(faberName: string, aliceName: string, auto } const faberAgent = new Agent(faberConfig.config, faberConfig.agentDependencies) faberAgent.setInboundTransporter(new SubjectInboundTransporter(faberMessages)) - faberAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages, subjectMap)) + faberAgent.registerOutboundTransporter(new SubjectOutboundTransporter(aliceMessages, subjectMap)) await faberAgent.initialize() const aliceAgent = new Agent(aliceConfig.config, aliceConfig.agentDependencies) aliceAgent.setInboundTransporter(new SubjectInboundTransporter(aliceMessages)) - aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(faberMessages, subjectMap)) + aliceAgent.registerOutboundTransporter(new SubjectOutboundTransporter(faberMessages, subjectMap)) await aliceAgent.initialize() const { definition } = await prepareForIssuance(faberAgent, ['name', 'age', 'image_0', 'image_1']) diff --git a/samples/mediator-ws.ts b/samples/mediator-ws.ts index 185cf7410c..fa938f146d 100644 --- a/samples/mediator-ws.ts +++ b/samples/mediator-ws.ts @@ -29,7 +29,7 @@ const config = agent.injectionContainer.resolve(AgentConfig) const messageSender = new WsOutboundTransporter() const messageReceiver = new WsInboundTransport({ server: socketServer }) agent.setInboundTransporter(messageReceiver) -agent.setOutboundTransporter(messageSender) +agent.registerOutboundTransporter(messageSender) // Allow to create invitation, no other way to ask for invitation yet app.get('/invitation', async (req, res) => { diff --git a/samples/mediator.ts b/samples/mediator.ts index f78236b945..f3cfa9b601 100644 --- a/samples/mediator.ts +++ b/samples/mediator.ts @@ -31,7 +31,7 @@ const inboundTransporter = new HttpInboundTransport({ port }) const outboundTransporter = new HttpOutboundTransporter() agent.setInboundTransporter(inboundTransporter) -agent.setOutboundTransporter(outboundTransporter) +agent.registerOutboundTransporter(outboundTransporter) // Allow to create invitation, no other way to ask for invitation yet inboundTransporter.app.get('/invitation', async (req, res) => { diff --git a/tests/e2e-http.test.ts b/tests/e2e-http.test.ts index eeefc1042a..583f985ba2 100644 --- a/tests/e2e-http.test.ts +++ b/tests/e2e-http.test.ts @@ -41,17 +41,17 @@ describe('E2E HTTP tests', () => { test('Full HTTP flow (connect, request mediation, issue, verify)', async () => { // Recipient Setup - recipientAgent.setOutboundTransporter(new HttpOutboundTransporter()) + recipientAgent.registerOutboundTransporter(new HttpOutboundTransporter()) await recipientAgent.initialize() // Mediator Setup mediatorAgent.setInboundTransporter(new HttpInboundTransport({ port: mediatorPort })) - mediatorAgent.setOutboundTransporter(new HttpOutboundTransporter()) + mediatorAgent.registerOutboundTransporter(new HttpOutboundTransporter()) await mediatorAgent.initialize() // Sender Setup senderAgent.setInboundTransporter(new HttpInboundTransport({ port: senderPort })) - senderAgent.setOutboundTransporter(new HttpOutboundTransporter()) + senderAgent.registerOutboundTransporter(new HttpOutboundTransporter()) await senderAgent.initialize() await e2eTest({ diff --git a/tests/e2e-subject.test.ts b/tests/e2e-subject.test.ts index 3137ee7b5c..84eed53a74 100644 --- a/tests/e2e-subject.test.ts +++ b/tests/e2e-subject.test.ts @@ -51,17 +51,17 @@ describe('E2E Subject tests', () => { } // Recipient Setup - recipientAgent.setOutboundTransporter(new SubjectOutboundTransporter(recipientMessages, subjectMap)) + recipientAgent.registerOutboundTransporter(new SubjectOutboundTransporter(recipientMessages, subjectMap)) recipientAgent.setInboundTransporter(new SubjectInboundTransporter(recipientMessages)) await recipientAgent.initialize() // Mediator Setup - mediatorAgent.setOutboundTransporter(new SubjectOutboundTransporter(mediatorMessages, subjectMap)) + mediatorAgent.registerOutboundTransporter(new SubjectOutboundTransporter(mediatorMessages, subjectMap)) mediatorAgent.setInboundTransporter(new SubjectInboundTransporter(mediatorMessages)) await mediatorAgent.initialize() // Sender Setup - senderAgent.setOutboundTransporter(new SubjectOutboundTransporter(senderMessages, subjectMap)) + senderAgent.registerOutboundTransporter(new SubjectOutboundTransporter(senderMessages, subjectMap)) senderAgent.setInboundTransporter(new SubjectInboundTransporter(senderMessages)) await senderAgent.initialize() diff --git a/tests/e2e-ws.test.ts b/tests/e2e-ws.test.ts index d4f80e4d70..a035dac3ce 100644 --- a/tests/e2e-ws.test.ts +++ b/tests/e2e-ws.test.ts @@ -41,17 +41,17 @@ describe('E2E WS tests', () => { test('Full WS flow (connect, request mediation, issue, verify)', async () => { // Recipient Setup - recipientAgent.setOutboundTransporter(new WsOutboundTransporter()) + recipientAgent.registerOutboundTransporter(new WsOutboundTransporter()) await recipientAgent.initialize() // Mediator Setup mediatorAgent.setInboundTransporter(new WsInboundTransport({ port: mediatorPort })) - mediatorAgent.setOutboundTransporter(new WsOutboundTransporter()) + mediatorAgent.registerOutboundTransporter(new WsOutboundTransporter()) await mediatorAgent.initialize() // Sender Setup senderAgent.setInboundTransporter(new WsInboundTransport({ port: senderPort })) - senderAgent.setOutboundTransporter(new WsOutboundTransporter()) + senderAgent.registerOutboundTransporter(new WsOutboundTransporter()) await senderAgent.initialize() await e2eTest({ diff --git a/tests/transport/SubjectOutboundTransport.ts b/tests/transport/SubjectOutboundTransport.ts index fc2094fc27..455a50f97b 100644 --- a/tests/transport/SubjectOutboundTransport.ts +++ b/tests/transport/SubjectOutboundTransport.ts @@ -11,7 +11,7 @@ export class SubjectOutboundTransporter implements OutboundTransporter { private ourSubject: Subject private subjectMap: { [key: string]: Subject | undefined } - public supportedSchemes = [] + public supportedSchemes = ['rxjs'] public constructor( ourSubject: Subject,