Skip to content

Commit

Permalink
feat: Use session to send outbound message (#362)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Koci <[email protected]>
  • Loading branch information
jakubkoci authored Jul 7, 2021
1 parent 46918a1 commit 7366ca7
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 98 deletions.
23 changes: 19 additions & 4 deletions src/__tests__/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Agent } from '../agent/Agent'
import type { TransportSession } from '../agent/TransportService'
import type { BasicMessage, BasicMessageReceivedEvent } from '../modules/basic-messages'
import type { ConnectionRecordProps } from '../modules/connections'
import type { CredentialRecord, CredentialOfferTemplate, CredentialStateChangedEvent } from '../modules/credentials'
Expand Down Expand Up @@ -133,6 +134,22 @@ export async function waitForBasicMessage(agent: Agent, { content }: { content?:
})
}

class SubjectTransportSession implements TransportSession {
public id: string
public readonly type = 'subject'
private theirSubject: Subject<WireMessage>

public constructor(id: string, theirSubject: Subject<WireMessage>) {
this.id = id
this.theirSubject = theirSubject
}

public send(outboundMessage: OutboundPackage): Promise<void> {
this.theirSubject.next(outboundMessage.payload)
return Promise.resolve()
}
}

export class SubjectInboundTransporter implements InboundTransporter {
private subject: Subject<WireMessage>
private theirSubject: Subject<WireMessage>
Expand All @@ -149,10 +166,8 @@ export class SubjectInboundTransporter implements InboundTransporter {
private subscribe(agent: Agent) {
this.subject.subscribe({
next: async (message: WireMessage) => {
const outboundMessage = await agent.receiveMessage(message)
if (outboundMessage) {
this.theirSubject.next(outboundMessage.payload)
}
const session = new SubjectTransportSession('subject-session-1', this.theirSubject)
await agent.receiveMessage(message, session)
},
})
}
Expand Down
12 changes: 12 additions & 0 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { EventEmitter } from './EventEmitter'
import { AgentEventTypes } from './Events'
import { MessageReceiver } from './MessageReceiver'
import { MessageSender } from './MessageSender'
import { TransportService } from './TransportService'

export class Agent {
protected agentConfig: AgentConfig
Expand All @@ -38,6 +39,7 @@ export class Agent {
protected eventEmitter: EventEmitter
protected wallet: Wallet
protected messageReceiver: MessageReceiver
protected transportService: TransportService
protected messageSender: MessageSender
public inboundTransporter?: InboundTransporter
private _isInitialized = false
Expand Down Expand Up @@ -96,6 +98,7 @@ export class Agent {
this.eventEmitter = this.container.resolve(EventEmitter)
this.messageSender = this.container.resolve(MessageSender)
this.messageReceiver = this.container.resolve(MessageReceiver)
this.transportService = this.container.resolve(TransportService)
this.wallet = this.container.resolve(InjectionSymbols.Wallet)

// We set the modules in the constructor because that allows to set them as read-only
Expand Down Expand Up @@ -176,6 +179,15 @@ export class Agent {
return await this.messageReceiver.receiveMessage(inboundPackedMessage, session)
}

public async closeAndDeleteWallet() {
await this.wallet.close()
await this.wallet.delete()
}

public removeSession(session: TransportSession) {
this.transportService.removeSession(session)
}

public get injectionContainer() {
return this.container
}
Expand Down
16 changes: 1 addition & 15 deletions src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { OutboundMessage, OutboundPackage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { Handler } from './Handler'
import type { InboundMessageContext } from './models/InboundMessageContext'
Expand Down Expand Up @@ -26,7 +25,7 @@ class Dispatcher {
this.handlers.push(handler)
}

public async dispatch(messageContext: InboundMessageContext): Promise<OutboundMessage | OutboundPackage | undefined> {
public async dispatch(messageContext: InboundMessageContext): Promise<void> {
const message = messageContext.message
const handler = this.getHandlerForType(message.type)

Expand All @@ -37,22 +36,9 @@ class Dispatcher {
const outboundMessage = await handler.handle(messageContext)

if (outboundMessage) {
const threadId = outboundMessage.payload.threadId

if (!this.transportService.hasInboundEndpoint(outboundMessage.connection)) {
outboundMessage.payload.setReturnRouting(ReturnRouteTypes.all)
}

// Check for return routing, with thread id
if (message.hasReturnRouting(threadId)) {
const keys = {
recipientKeys: messageContext.senderVerkey ? [messageContext.senderVerkey] : [],
routingKeys: [],
senderKey: messageContext.connection?.verkey || null,
}
return await this.messageSender.packMessage(outboundMessage, keys)
}

await this.messageSender.sendMessage(outboundMessage)
}
}
Expand Down
20 changes: 16 additions & 4 deletions src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ export class MessageReceiver {
}
}

if (connection && session) {
this.transportService.saveSession(connection.id, session)
}

this.logger.info(`Received message with type '${unpackedMessage.message['@type']}'`, unpackedMessage.message)

const message = await this.transformMessage(unpackedMessage)
Expand All @@ -86,6 +82,22 @@ export class MessageReceiver {
recipientVerkey: unpackedMessage.recipient_verkey,
})

// We want to save a session if there is a chance of returning outbound message via inbound transport.
// That can happen when inbound message has `return_route` set to `all` or `thread`.
// If `return_route` defines just `thread`, we decide later whether to use session according to outbound message `threadId`.
if (connection && message.hasAnyReturnRoute() && session) {
const keys = {
// TODO handle the case when senderKey is missing
recipientKeys: senderKey ? [senderKey] : [],
routingKeys: [],
senderKey: connection?.verkey || null,
}
session.keys = keys
session.inboundMessage = message
session.connection = connection
this.transportService.saveSession(session)
}

return await this.dispatcher.dispatch(messageContext)
}

Expand Down
16 changes: 15 additions & 1 deletion src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ export class MessageSender {
connection: { id, verkey, theirKey },
})

const session = this.transportService.findSessionByConnectionId(connection.id)
if (session?.inboundMessage?.hasReturnRouting(outboundMessage.payload.threadId)) {
this.logger.debug(`Existing ${session.type} transport session has been found.`)
try {
if (!session.keys) {
throw new AriesFrameworkError(`There are no keys for the given ${session.type} transport session.`)
}
const outboundPackage = await this.packMessage(outboundMessage, session.keys)
await session.send(outboundPackage)
return
} catch (error) {
this.logger.info(`Sending an outbound message via session failed with error: ${error.message}.`, error)
}
}

const services = this.transportService.findDidCommServices(connection)
if (services.length === 0) {
throw new AriesFrameworkError(`Connection with id ${connection.id} has no service!`)
Expand All @@ -69,7 +84,6 @@ export class MessageSender {
senderKey: connection.verkey,
}
const outboundPackage = await this.packMessage(outboundMessage, keys)
outboundPackage.session = this.transportService.findSession(connection.id)
outboundPackage.endpoint = service.serviceEndpoint
outboundPackage.responseRequested = outboundMessage.payload.hasReturnRouting()

Expand Down
36 changes: 23 additions & 13 deletions src/agent/TransportService.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@
import type { ConnectionRecord } from '../modules/connections/repository'
import type { OutboundPackage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { EnvelopeKeys } from './EnvelopeService'

import { Lifecycle, scoped, inject } from 'tsyringe'
import { Lifecycle, scoped } from 'tsyringe'

import { DID_COMM_TRANSPORT_QUEUE, InjectionSymbols } from '../constants'
import { Logger } from '../logger'
import { DID_COMM_TRANSPORT_QUEUE } from '../constants'
import { ConnectionRole, DidCommService } from '../modules/connections/models'

@scoped(Lifecycle.ContainerScoped)
export class TransportService {
private transportSessionTable: TransportSessionTable = {}
private logger: Logger

public constructor(@inject(InjectionSymbols.Logger) logger: Logger) {
this.logger = logger
public saveSession(session: TransportSession) {
this.transportSessionTable[session.id] = session
}

public saveSession(connectionId: string, transport: TransportSession) {
this.transportSessionTable[connectionId] = transport
public findSessionByConnectionId(connectionId: string) {
return Object.values(this.transportSessionTable).find((session) => session.connection?.id === connectionId)
}

public hasInboundEndpoint(connection: ConnectionRecord) {
return connection.didDoc.didCommServices.find((s) => s.serviceEndpoint !== DID_COMM_TRANSPORT_QUEUE)
public findSessionById(sessionId: string) {
return this.transportSessionTable[sessionId]
}

public removeSession(session: TransportSession) {
delete this.transportSessionTable[session.id]
}

public findSession(connectionId: string) {
return this.transportSessionTable[connectionId]
public hasInboundEndpoint(connection: ConnectionRecord) {
return connection.didDoc.didCommServices.find((s) => s.serviceEndpoint !== DID_COMM_TRANSPORT_QUEUE)
}

public findDidCommServices(connection: ConnectionRecord): DidCommService[] {
Expand All @@ -49,9 +54,14 @@ export class TransportService {
}

interface TransportSessionTable {
[connectionRecordId: string]: TransportSession
[sessionId: string]: TransportSession
}

export interface TransportSession {
id: string
type: string
keys?: EnvelopeKeys
inboundMessage?: AgentMessage
connection?: ConnectionRecord
send(outboundMessage: OutboundPackage): Promise<void>
}
Loading

0 comments on commit 7366ca7

Please sign in to comment.