Skip to content

Commit

Permalink
fix: mediator transports (#419)
Browse files Browse the repository at this point in the history
Co-authored-by: Timo Glastra <[email protected]>
Co-authored-by: James Ebert <[email protected]>
  • Loading branch information
3 people authored Aug 19, 2021
1 parent 0e2338f commit 87bc589
Show file tree
Hide file tree
Showing 21 changed files with 243 additions and 93 deletions.
4 changes: 2 additions & 2 deletions docs/getting-started/1-transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ const agent = new Agent({

// Use HTTP as outbound transporter
const httpOutboundTransporter = new HttpOutboundTransporter()
agent.setOutboundTransporter(httpOutboundTransporter)
agent.registerOutboundTransporter(httpOutboundTransporter)

// Or use WebSocket instead
const wsOutboundTransporter = new WsOutboundTransporter()
agent.setOutboundTransporter(wsOutboundTransporter)
agent.registerOutboundTransporter(wsOutboundTransporter)
```

## Inbound Transport
Expand Down
26 changes: 10 additions & 16 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ export class Agent {
this.inboundTransporter = inboundTransporter
}

public setOutboundTransporter(outboundTransporter: OutboundTransporter) {
this.messageSender.setOutboundTransporter(outboundTransporter)
public registerOutboundTransporter(outboundTransporter: OutboundTransporter) {
this.messageSender.registerOutboundTransporter(outboundTransporter)
}

public get outboundTransporter() {
return this.messageSender.outboundTransporter
public get outboundTransporters() {
return this.messageSender.outboundTransporters
}

public get events() {
Expand Down Expand Up @@ -162,23 +162,15 @@ export class Agent {
await this.inboundTransporter.start(this)
}

if (this.outboundTransporter) {
await this.outboundTransporter.start(this)
for (const transport of this.messageSender.outboundTransporters) {
transport.start(this)
}

// Connect to mediator through provided invitation if provided in config
// Also requests mediation ans sets as default mediator
// Because this requires the connections module, we do this in the agent constructor
if (mediatorConnectionsInvite) {
// Assumption: processInvitation is a URL-encoded invitation
let connectionRecord = await this.connections.receiveInvitationFromUrl(mediatorConnectionsInvite, {
autoAcceptConnection: true,
})

// TODO: add timeout to returnWhenIsConnected
connectionRecord = await this.connections.returnWhenIsConnected(connectionRecord.id)
const mediationRecord = await this.mediationRecipient.requestAndAwaitGrant(connectionRecord, 60000) // TODO: put timeout as a config parameter
await this.mediationRecipient.setDefaultMediator(mediationRecord)
await this.mediationRecipient.provision(mediatorConnectionsInvite)
}

await this.mediationRecipient.initialize()
Expand All @@ -192,7 +184,9 @@ export class Agent {
this.agentConfig.stop$.next(true)

// Stop transports
await this.outboundTransporter?.stop()
for (const transport of this.messageSender.outboundTransporters) {
transport.stop()
}
await this.inboundTransporter?.stop()

// close/delete wallet if still initialized
Expand Down
109 changes: 77 additions & 32 deletions packages/core/src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ import { MessageRepository } from '../storage/MessageRepository'
import { EnvelopeService } from './EnvelopeService'
import { TransportService } from './TransportService'

export interface TransportPriorityOptions {
schemes: string[]
restrictive?: boolean
}

@scoped(Lifecycle.ContainerScoped)
export class MessageSender {
private envelopeService: EnvelopeService
private transportService: TransportService
private messageRepository: MessageRepository
private logger: Logger
private _outboundTransporter?: OutboundTransporter
private outboundTransports: OutboundTransporter[] = []

public constructor(
envelopeService: EnvelopeService,
Expand All @@ -34,14 +39,15 @@ export class MessageSender {
this.transportService = transportService
this.messageRepository = messageRepository
this.logger = logger
this.outboundTransports = []
}

public setOutboundTransporter(outboundTransporter: OutboundTransporter) {
this._outboundTransporter = outboundTransporter
public registerOutboundTransporter(outboundTransporter: OutboundTransporter) {
this.outboundTransports.push(outboundTransporter)
}

public get outboundTransporter() {
return this._outboundTransporter
public get outboundTransporters() {
return this.outboundTransports
}

public async packMessage({
Expand Down Expand Up @@ -77,9 +83,11 @@ export class MessageSender {
public async sendPackage({
connection,
packedMessage,
options,
}: {
connection: ConnectionRecord
packedMessage: WireMessage
options?: { transportPriority?: TransportPriorityOptions }
}) {
// Try to send to already open session
const session = this.transportService.findSessionByConnectionId(connection.id)
Expand All @@ -93,26 +101,25 @@ export class MessageSender {
}

// Retrieve DIDComm services
const allServices = this.transportService.findDidCommServices(connection)
const reachableServices = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint))
const queueService = allServices.find((s) => isDidCommTransportQueue(s.serviceEndpoint))
const { services, queueService } = await this.retrieveServicesByConnection(connection, options?.transportPriority)

this.logger.debug(
`Found ${allServices.length} services for message to connection '${connection.id}' (${connection.theirLabel})`
)

if (!this.outboundTransporter) {
if (this.outboundTransporters.length === 0 && !queueService) {
throw new AriesFrameworkError('Agent has no outbound transporter!')
}

// Loop trough all available services and try to send the message
for await (const service of reachableServices) {
for await (const service of services) {
this.logger.debug(`Sending outbound message to service:`, { service })
try {
await this.outboundTransporter.sendMessage({
payload: packedMessage,
endpoint: service.serviceEndpoint,
})
for (const transport of this.outboundTransporters) {
if (transport.supportedSchemes.includes(service.protocolScheme)) {
await transport.sendMessage({
payload: packedMessage,
endpoint: service.serviceEndpoint,
})
break
}
}
return
} catch (error) {
this.logger.debug(
Expand Down Expand Up @@ -141,11 +148,12 @@ export class MessageSender {
throw new AriesFrameworkError(`Message is undeliverable to connection ${connection.id} (${connection.theirLabel})`)
}

public async sendMessage(outboundMessage: OutboundMessage) {
if (!this.outboundTransporter) {
throw new AriesFrameworkError('Agent has no outbound transporter!')
public async sendMessage(
outboundMessage: OutboundMessage,
options?: {
transportPriority?: TransportPriorityOptions
}

) {
const { connection, payload } = outboundMessage

this.logger.debug('Send outbound message', {
Expand All @@ -165,16 +173,10 @@ export class MessageSender {
}

// Retrieve DIDComm services
const allServices = this.transportService.findDidCommServices(connection)
const reachableServices = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint))
const queueService = allServices.find((s) => isDidCommTransportQueue(s.serviceEndpoint))

this.logger.debug(
`Found ${allServices.length} services for message to connection '${connection.id}' (${connection.theirLabel})`
)
const { services, queueService } = await this.retrieveServicesByConnection(connection, options?.transportPriority)

// Loop trough all available services and try to send the message
for await (const service of reachableServices) {
for await (const service of services) {
try {
// Enable return routing if the
const shouldUseReturnRoute = !this.transportService.hasInboundEndpoint(connection.didDoc)
Expand Down Expand Up @@ -232,7 +234,7 @@ export class MessageSender {
senderKey: string
returnRoute?: boolean
}) {
if (!this.outboundTransporter) {
if (this.outboundTransports.length === 0) {
throw new AriesFrameworkError('Agent has no outbound transporter!')
}

Expand All @@ -250,7 +252,50 @@ export class MessageSender {
}

const outboundPackage = await this.packMessage({ message, keys, endpoint: service.serviceEndpoint })
await this.outboundTransporter.sendMessage(outboundPackage)
outboundPackage.endpoint = service.serviceEndpoint
for (const transport of this.outboundTransporters) {
if (transport.supportedSchemes.includes(service.protocolScheme)) {
await transport.sendMessage(outboundPackage)
break
}
}
}

private async retrieveServicesByConnection(
connection: ConnectionRecord,
transportPriority?: TransportPriorityOptions
) {
this.logger.debug(`Retrieving services for connection '${connection.id}' (${connection.theirLabel})`, {
transportPriority,
})
// Retrieve DIDComm services
const allServices = this.transportService.findDidCommServices(connection)

//Separate queue service out
const services = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint))
const queueService = allServices.find((s) => isDidCommTransportQueue(s.serviceEndpoint))

//If restrictive will remove services not listed in schemes list
if (transportPriority?.restrictive) {
services.filter((service) => {
const serviceSchema = service.protocolScheme
return transportPriority.schemes.includes(serviceSchema)
})
}

//If transport priority is set we will sort services by our priority
if (transportPriority?.schemes) {
services.sort(function (a, b) {
const aScheme = a.protocolScheme
const bScheme = b.protocolScheme
return transportPriority?.schemes.indexOf(aScheme) - transportPriority?.schemes.indexOf(bScheme)
})
}

this.logger.debug(
`Retrieved ${services.length} services for message to connection '${connection.id}'(${connection.theirLabel})'`
)
return { services, queueService }
}
}

Expand Down
22 changes: 11 additions & 11 deletions packages/core/src/agent/__tests__/MessageSender.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class DummyOutboundTransporter implements OutboundTransporter {
throw new Error('Method not implemented.')
}

public supportedSchemes: string[] = []
public supportedSchemes: string[] = ['https']

public sendMessage() {
return Promise.resolve()
Expand Down Expand Up @@ -109,11 +109,11 @@ describe('MessageSender', () => {
})

test('throw error when there is no outbound transport', async () => {
await expect(messageSender.sendMessage(outboundMessage)).rejects.toThrow(`Agent has no outbound transporter!`)
await expect(messageSender.sendMessage(outboundMessage)).rejects.toThrow(/Message is undeliverable to connection/)
})

test('throw error when there is no service or queue', async () => {
messageSender.setOutboundTransporter(outboundTransporter)
messageSender.registerOutboundTransporter(outboundTransporter)
transportServiceFindServicesMock.mockReturnValue([])

await expect(messageSender.sendMessage(outboundMessage)).rejects.toThrow(
Expand All @@ -122,11 +122,11 @@ describe('MessageSender', () => {
})

test('call send message when session send method fails', async () => {
messageSender.setOutboundTransporter(outboundTransporter)
messageSender.registerOutboundTransporter(outboundTransporter)
transportServiceFindSessionMock.mockReturnValue(session)
session.send = jest.fn().mockRejectedValue(new Error('some error'))

messageSender.setOutboundTransporter(outboundTransporter)
messageSender.registerOutboundTransporter(outboundTransporter)
const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage')

await messageSender.sendMessage(outboundMessage)
Expand All @@ -140,10 +140,10 @@ describe('MessageSender', () => {
})

test('call send message when session send method fails with missing keys', async () => {
messageSender.setOutboundTransporter(outboundTransporter)
messageSender.registerOutboundTransporter(outboundTransporter)
transportServiceFindSessionMock.mockReturnValue(sessionWithoutKeys)

messageSender.setOutboundTransporter(outboundTransporter)
messageSender.registerOutboundTransporter(outboundTransporter)
const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage')

await messageSender.sendMessage(outboundMessage)
Expand All @@ -157,7 +157,7 @@ describe('MessageSender', () => {
})

test('call send message on session when there is a session for a given connection', async () => {
messageSender.setOutboundTransporter(outboundTransporter)
messageSender.registerOutboundTransporter(outboundTransporter)
const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage')
const sendMessageToServiceSpy = jest.spyOn(messageSender, 'sendMessageToService')

Expand All @@ -174,7 +174,7 @@ describe('MessageSender', () => {
})

test('calls sendMessageToService with payload and endpoint from second DidComm service when the first fails', async () => {
messageSender.setOutboundTransporter(outboundTransporter)
messageSender.registerOutboundTransporter(outboundTransporter)
const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage')
const sendMessageToServiceSpy = jest.spyOn(messageSender, 'sendMessageToService')

Expand Down Expand Up @@ -229,7 +229,7 @@ describe('MessageSender', () => {
})

test('calls send message with payload and endpoint from DIDComm service', async () => {
messageSender.setOutboundTransporter(outboundTransporter)
messageSender.registerOutboundTransporter(outboundTransporter)
const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage')

await messageSender.sendMessageToService({
Expand All @@ -247,7 +247,7 @@ describe('MessageSender', () => {
})

test('call send message with responseRequested when message has return route', async () => {
messageSender.setOutboundTransporter(outboundTransporter)
messageSender.registerOutboundTransporter(outboundTransporter)
const sendMessageSpy = jest.spyOn(outboundTransporter, 'sendMessage')

const message = new AgentMessage()
Expand Down
11 changes: 11 additions & 0 deletions packages/core/src/modules/connections/ConnectionsModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@ export class ConnectionsModule {
return this.connectionService.findByTheirKey(verkey)
}

/**
* Find connection by Invitation key.
*
* @param key the invitation key to search for
* @returns the connection record, or null if not found
* @throws {RecordDuplicateError} if multiple connections are found for the given verkey
*/
public findByInvitationKey(key: string): Promise<ConnectionRecord | null> {
return this.connectionService.findByInvitationKey(key)
}

/**
* Retrieve a connection record by thread id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ export class Service {
}
}

public get protocolScheme(): string {
return this.serviceEndpoint.split(':')[0]
}

@IsString()
public id!: string

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,19 @@ export class ConnectionService {
})
}

/**
* Find connection by invitation key.
*
* @param key the invitation key to search for
* @returns the connection record, or null if not found
* @throws {RecordDuplicateError} if multiple connections are found for the given verkey
*/
public findByInvitationKey(key: string): Promise<ConnectionRecord | null> {
return this.connectionRepository.findSingleByQuery({
invitationKey: key,
})
}

/**
* Retrieve a connection record by thread id
*
Expand Down
Loading

0 comments on commit 87bc589

Please sign in to comment.