diff --git a/src/__tests__/agents.test.ts b/src/__tests__/agents.test.ts index f50a7bafa6..8fb169ecfd 100644 --- a/src/__tests__/agents.test.ts +++ b/src/__tests__/agents.test.ts @@ -20,7 +20,7 @@ describe('agents', () => { await bobAgent.closeAndDeleteWallet() }) - test('make a connection between agents and send a message over the connection', async () => { + test('make a connection between agents', async () => { const aliceMessages = new Subject() const bobMessages = new Subject() @@ -42,7 +42,9 @@ describe('agents', () => { expect(aliceConnection).toBeConnectedWith(bobConnection) expect(bobConnection).toBeConnectedWith(aliceConnection) + }) + test('send a message to connection', async () => { const message = 'hello, world' await aliceAgent.basicMessages.sendMessage(aliceConnection, message) diff --git a/src/agent/Agent.ts b/src/agent/Agent.ts index 3ed7751124..d7739b681c 100644 --- a/src/agent/Agent.ts +++ b/src/agent/Agent.ts @@ -1,4 +1,5 @@ import type { Logger } from '../logger' +import type { CredentialsModule } from '../modules/credentials/CredentialsModule' import type { MessageRepository } from '../storage/MessageRepository' import type { InboundTransporter } from '../transport/InboundTransporter' import type { OutboundTransporter } from '../transport/OutboundTransporter' @@ -15,7 +16,6 @@ import { container as baseContainer } from 'tsyringe' import { InjectionSymbols } from '../constants' import { BasicMessagesModule } from '../modules/basic-messages/BasicMessagesModule' import { ConnectionsModule } from '../modules/connections/ConnectionsModule' -import { CredentialsModule } from '../modules/credentials/CredentialsModule' import { LedgerModule } from '../modules/ledger/LedgerModule' import { ProofsModule } from '../modules/proofs/ProofsModule' import { MediatorModule } from '../modules/routing/MediatorModule' @@ -87,12 +87,12 @@ export class Agent { // Resolve instances after everything is registered this.eventEmitter = this.container.resolve(EventEmitter) this.messageSender = this.container.resolve(MessageSender) + this.container.registerInstance(InjectionSymbols.MessageSender, this.messageSender) this.messageReceiver = this.container.resolve(MessageReceiver) this.wallet = this.container.resolve(InjectionSymbols.Wallet) // We set the modules in the constructor because that allows to set them as read-only this.connections = this.container.resolve(ConnectionsModule) - this.credentials = this.container.resolve(CredentialsModule) this.proofs = this.container.resolve(ProofsModule) this.mediator = this.container.resolve(MediatorModule) this.mediationRecipient = this.container.resolve(RecipientModule) diff --git a/src/constants.ts b/src/constants.ts index 3122674b0b..93ad49fc74 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -9,6 +9,8 @@ export const InjectionSymbols = { InboundTransporter: Symbol('InboundTransporter'), OutboundTransporter: Symbol('OutboundTransporter'), TransportService: Symbol('TransportService'), + RecipientService: Symbol('RecipientService'), + ConnectionService: Symbol('ConnectionService'), } export const DID_COMM_TRANSPORT_QUEUE = 'didcomm:transport/queue' diff --git a/src/modules/connections/services/ConnectionService.ts b/src/modules/connections/services/ConnectionService.ts index 7268df8d33..36077eebca 100644 --- a/src/modules/connections/services/ConnectionService.ts +++ b/src/modules/connections/services/ConnectionService.ts @@ -1,6 +1,6 @@ import type { AgentMessage } from '../../../agent/AgentMessage' import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' -import type { KeylistUpdatedEvent, MediationRecord } from '../../../modules/routing' +import type { MediationRecord } from '../../../modules/routing' import type { AckMessage } from '../../common' import type { ConnectionStateChangedEvent } from '../ConnectionEvents' import type { CustomConnectionTags } from '../repository/ConnectionRecord' @@ -11,16 +11,12 @@ import { inject, scoped, Lifecycle } from 'tsyringe' import { AgentConfig } from '../../../agent/AgentConfig' import { EventEmitter } from '../../../agent/EventEmitter' -import { MessageSender } from '../../../agent/MessageSender' -import { createOutboundMessage } from '../../../agent/helpers' import { InjectionSymbols } from '../../../constants' import { signData, unpackAndVerifySignatureDecorator } from '../../../decorators/signature/SignatureDecoratorUtils' import { AriesFrameworkError } from '../../../error' -import { RoutingEventTypes } from '../../../modules/routing/RoutingEvents' -import { waitForEvent } from '../../../modules/routing/services/RoutingService' import { JsonTransformer } from '../../../utils/JsonTransformer' import { Wallet } from '../../../wallet/Wallet' -import { KeylistUpdate, KeylistUpdateAction, KeylistUpdateMessage } from '../../routing/messages/KeylistUpdatedMessage' +import { RecipientService } from '../../routing/services/RecipientService' import { ConnectionEventTypes } from '../ConnectionEvents' import { ConnectionInvitationMessage, @@ -48,20 +44,20 @@ export class ConnectionService { private config: AgentConfig private connectionRepository: ConnectionRepository private eventEmitter: EventEmitter - private messageSender: MessageSender + private mediationRecipientService: RecipientService public constructor( @inject(InjectionSymbols.Wallet) wallet: Wallet, config: AgentConfig, connectionRepository: ConnectionRepository, eventEmitter: EventEmitter, - messageSender: MessageSender + @inject(InjectionSymbols.RecipientService) mediationRecipientService: RecipientService ) { this.wallet = wallet this.config = config this.connectionRepository = connectionRepository this.eventEmitter = eventEmitter - this.messageSender = messageSender + this.mediationRecipientService = mediationRecipientService } /** @@ -433,63 +429,6 @@ export class ConnectionService { return this.connectionRepository.getSingleByQuery({ threadId }) } - private async getRouting(mediationRecord: MediationRecord | undefined, routingKeys: string[], myEndpoint?: string) { - let endpoint - if (mediationRecord) { - routingKeys = [...routingKeys, ...mediationRecord.routingKeys] - endpoint = mediationRecord.endpoint - } - // Create and store new key - const [did, verkey] = await this.wallet.createDid() - if (mediationRecord) { - // new did has been created and mediator needs to be updated with the public key. - mediationRecord = await this.keylistUpdateAndAwait(mediationRecord, did) - } else { - // TODO: register recipient keys for relay - // TODO: check that recipient keys are in wallet - } - endpoint = endpoint ?? myEndpoint ?? this.config.getEndpoint() - const result = { mediationRecord, endpoint, routingKeys, did, verkey } - return result - } - - public async keylistUpdateAndAwait( - mediationRecord: MediationRecord, - verKey: string, - timeout = 15000 // TODO: this should be a configurable value in agent config - ): Promise { - const message = this.createKeylistUpdateMessage(verKey) - const connection = await this.getById(mediationRecord.connectionId) - - const sendUpdateKeylist = async () => { - const outboundMessage = createOutboundMessage(connection, message) - await this.messageSender.sendMessage(outboundMessage) - } - const condition = async (event: KeylistUpdatedEvent) => { - return mediationRecord.id === event.payload.mediationRecord.id - } - const results = await waitForEvent( - sendUpdateKeylist, - RoutingEventTypes.RecipientKeylistUpdated, - condition, - timeout, - this.eventEmitter - ) - return (results as KeylistUpdatedEvent).payload.mediationRecord - } - - public createKeylistUpdateMessage(verkey: Verkey): KeylistUpdateMessage { - const keylistUpdateMessage = new KeylistUpdateMessage({ - updates: [ - new KeylistUpdate({ - action: KeylistUpdateAction.add, - recipientKey: verkey, - }), - ], - }) - return keylistUpdateMessage - } - private async createConnection(options: { role: ConnectionRole state: ConnectionState @@ -501,7 +440,7 @@ export class ConnectionService { autoAcceptConnection?: boolean tags?: CustomConnectionTags }): Promise { - const myRouting = await this.getRouting( + const myRouting = await this.mediationRecipientService.getRouting( //my routing options.mediator, options.routingKeys ?? [], diff --git a/src/modules/routing/RecipientModule.ts b/src/modules/routing/RecipientModule.ts index 450c2cca54..5cf5601b55 100644 --- a/src/modules/routing/RecipientModule.ts +++ b/src/modules/routing/RecipientModule.ts @@ -108,7 +108,7 @@ export class RecipientModule { } public async notifyKeylistUpdate(connection: ConnectionRecord, verkey: Verkey) { - const message = await this.connectionService.createKeylistUpdateMessage(verkey) + const message = await this.recipientService.createKeylistUpdateMessage(verkey) const outboundMessage = createOutboundMessage(connection, message) const response = await this.messageSender.sendMessage(outboundMessage) return response @@ -130,11 +130,11 @@ export class RecipientModule { } public async getMediatorConnections() { - const all_mediators = await this.getMediators() - const mediators_connection_ids = all_mediators ? all_mediators.map((mediator) => mediator.connectionId) : [] - const all_connections = await this.connectionService.getAll() - return all_connections && mediators_connection_ids - ? all_connections.filter((connection) => mediators_connection_ids.includes(connection.id)) + const allMediators = await this.getMediators() + const mediatorConnectionIds = allMediators ? allMediators.map((mediator) => mediator.connectionId) : [] + const allConnections = await this.connectionService.getAll() + return allConnections && mediatorConnectionIds + ? allConnections.filter((connection) => mediatorConnectionIds.includes(connection.id)) : [] } diff --git a/src/modules/routing/repository/MediationRecord.ts b/src/modules/routing/repository/MediationRecord.ts index 1b620eee5f..e521dbc171 100644 --- a/src/modules/routing/repository/MediationRecord.ts +++ b/src/modules/routing/repository/MediationRecord.ts @@ -83,11 +83,4 @@ export class MediationRecord } } - public assertConnection(currentConnectionId: string) { - if (this.connectionId !== currentConnectionId) { - throw new AriesFrameworkError( - `Proof record is associated with connection '${this.connectionId}'. Current connection is '${currentConnectionId}'` - ) - } - } } diff --git a/src/modules/routing/services/MediatorService.ts b/src/modules/routing/services/MediatorService.ts index c7d1c9a736..ae69577e25 100644 --- a/src/modules/routing/services/MediatorService.ts +++ b/src/modules/routing/services/MediatorService.ts @@ -32,7 +32,7 @@ import { MediationRole } from '../models/MediationRole' import { MediationState } from '../models/MediationState' import { MediationRepository } from '../repository/MediationRepository' -import { createRecord } from './RoutingService' +import { assertConnection, createRecord } from './RoutingService' export interface RoutingTable { [recipientKey: string]: ConnectionRecord | undefined @@ -65,12 +65,6 @@ export class MediatorService { this.connectionService = connectionService } - private _assertConnection(connection: ConnectionRecord | undefined, msgType: BaseMessage): ConnectionRecord { - if (!connection) throw new AriesFrameworkError(`inbound connection is required for ${msgType.constructor.name}!`) - connection.assertReady() - return connection - } - public async processForwardMessage(messageContext: HandlerInboundMessage) { const { message, recipientVerkey } = messageContext @@ -92,24 +86,24 @@ export class MediatorService { public async processKeylistUpdateRequest(messageContext: InboundMessageContext) { const { message } = messageContext - const connection = this._assertConnection(messageContext.connection, KeylistUpdateMessage) + const connection = assertConnection(messageContext.connection) const keylist: KeylistUpdated[] = [] const mediationRecord = await this.findRecipientByConnectionId(connection.id) if (!mediationRecord) { throw new Error(`mediation record for ${connection.id} not found!`) } for (const update of message.updates) { - const update_ = new KeylistUpdated({ + const updated = new KeylistUpdated({ action: update.action, recipientKey: update.recipientKey, result: KeylistUpdateResult.NoChange, }) if (update.action === KeylistUpdateAction.add) { - update_.result = await this.saveRoute(update.recipientKey, mediationRecord) - keylist.push(update_) + updated.result = await this.saveRoute(update.recipientKey, mediationRecord) + keylist.push(updated) } else if (update.action === KeylistUpdateAction.remove) { - update_.result = await this.removeRoute(update.recipientKey, mediationRecord) - keylist.push(update_) + updated.result = await this.removeRoute(update.recipientKey, mediationRecord) + keylist.push(updated) } } // emit event to send message that notifies recipient @@ -160,6 +154,7 @@ export class MediatorService { const records = await this.mediationRepository.findByQuery({ connectionId }) return records[0] } catch (error) { + this.agentConfig.logger.warn(`Could not find record for connection: ${connectionId}`) return null } } @@ -189,7 +184,7 @@ export class MediatorService { public async processMediationRequest(messageContext: InboundMessageContext) { // Assert connection - const connection = this._assertConnection(messageContext.connection, MediationRequestMessage) + const connection = assertConnection(messageContext.connection) const mediationRecord = await createRecord( { diff --git a/src/modules/routing/services/RecipientService.ts b/src/modules/routing/services/RecipientService.ts index 3b0df17d71..8a4e2132d8 100644 --- a/src/modules/routing/services/RecipientService.ts +++ b/src/modules/routing/services/RecipientService.ts @@ -4,12 +4,20 @@ import type { MediationStateChangedEvent, KeylistUpdatedEvent } from '../Routing import type { MediationGrantMessage, MediationDenyMessage, KeylistUpdateResponseMessage } from '../messages' import type { Verkey } from 'indy-sdk' -import { Lifecycle, scoped } from 'tsyringe' +import { filter, first, timeout } from 'rxjs/operators' +import { inject, Lifecycle, scoped } from 'tsyringe' +import { AgentConfig } from '../../../agent/AgentConfig' import { EventEmitter } from '../../../agent/EventEmitter' +import { MessageSender } from '../../../agent/MessageSender' +import { createOutboundMessage } from '../../../agent/helpers' +import { InjectionSymbols } from '../../../constants' +import { Wallet } from '../../../wallet/Wallet' +import { ConnectionService } from '../../connections/services/ConnectionService' import { RoutingEventTypes } from '../RoutingEvents' import { KeylistUpdateAction, MediationRequestMessage } from '../messages' import { KeylistMessage } from '../messages/KeylistMessage' +import { KeylistUpdate, KeylistUpdateMessage } from '../messages/KeylistUpdatedMessage' import { MediationRole, MediationState } from '../models' import { MediationRecord } from '../repository/MediationRecord' import { MediationRepository } from '../repository/MediationRepository' @@ -18,13 +26,28 @@ import { assertConnection } from './RoutingService' @scoped(Lifecycle.ContainerScoped) export class RecipientService { + private wallet: Wallet private mediatorRepository: MediationRepository private defaultMediator?: MediationRecord private eventEmitter: EventEmitter - - public constructor(mediatorRepository: MediationRepository, eventEmitter: EventEmitter) { + private connectionService: ConnectionService + private messageSender: MessageSender + private config: AgentConfig + + public constructor( + @inject(InjectionSymbols.Wallet) wallet: Wallet, + connectionService: ConnectionService, + messageSender: MessageSender, + config: AgentConfig, + mediatorRepository: MediationRepository, + eventEmitter: EventEmitter + ) { + this.config = config + this.wallet = wallet this.mediatorRepository = mediatorRepository this.eventEmitter = eventEmitter + this.connectionService = connectionService + this.messageSender = messageSender } public async init() { @@ -64,7 +87,6 @@ export class RecipientService { } // Assert mediationRecord.assertState(MediationState.Requested) - mediationRecord.assertConnection(connection.id) // Update record mediationRecord.endpoint = messageContext.message.endpoint @@ -113,6 +135,68 @@ export class RecipientService { }) } + public async keylistUpdateAndAwait( + mediationRecord: MediationRecord, + verKey: string, + timeoutMs = 15000 // TODO: this should be a configurable value in agent config + ): Promise { + const message = this.createKeylistUpdateMessage(verKey) + const connection = await this.connectionService.getById(mediationRecord.connectionId) + + // Create observable for event + const observable = this.eventEmitter.observable(RoutingEventTypes.RecipientKeylistUpdated) + + // Apply required filters to observable stream and create promise to subscribe to observable + const keylistUpdatePromise = observable + .pipe( + // Only take event for current mediation record + filter((event) => mediationRecord.id === event.payload.mediationRecord.id), + // Only wait for first event that matches the criteria + first(), + // Do not wait for longer than specified timeout + timeout(timeoutMs) + ) + .toPromise() + const outboundMessage = createOutboundMessage(connection, message) + await this.messageSender.sendMessage(outboundMessage) + + // Await the observable promise + const keylistUpdate = await keylistUpdatePromise + return keylistUpdate.payload.mediationRecord + } + + public createKeylistUpdateMessage(verkey: Verkey): KeylistUpdateMessage { + const keylistUpdateMessage = new KeylistUpdateMessage({ + updates: [ + new KeylistUpdate({ + action: KeylistUpdateAction.add, + recipientKey: verkey, + }), + ], + }) + return keylistUpdateMessage + } + + public async getRouting(mediationRecord: MediationRecord | undefined, routingKeys: string[], myEndpoint?: string) { + let endpoint + if (mediationRecord) { + routingKeys = [...routingKeys, ...mediationRecord.routingKeys] + endpoint = mediationRecord.endpoint + } + // Create and store new key + const [did, verkey] = await this.wallet.createDid() + if (mediationRecord) { + // new did has been created and mediator needs to be updated with the public key. + mediationRecord = await this.keylistUpdateAndAwait(mediationRecord, did) + } else { + // TODO: register recipient keys for relay + // TODO: check that recipient keys are in wallet + } + endpoint = endpoint ?? myEndpoint ?? this.config.getEndpoint() + const result = { mediationRecord, endpoint, routingKeys, did, verkey } + return result + } + public async saveRoute(recipientKey: Verkey, mediationRecord: MediationRecord) { mediationRecord.recipientKeys.push(recipientKey) this.mediatorRepository.update(mediationRecord) @@ -141,7 +225,6 @@ export class RecipientService { // Assert mediationRecord.assertState(MediationState.Requested) - mediationRecord.assertConnection(connection.id) // Update record await this.updateState(mediationRecord, MediationState.Denied) diff --git a/src/modules/routing/services/RoutingService.ts b/src/modules/routing/services/RoutingService.ts index 7bb8d93cc2..62dd7aa434 100644 --- a/src/modules/routing/services/RoutingService.ts +++ b/src/modules/routing/services/RoutingService.ts @@ -8,6 +8,7 @@ import type { MediationRecordProps } from '../repository/MediationRecord' import type { MediationRepository } from '../repository/MediationRepository' import { MediationRecord } from '../repository/MediationRecord' +import { AriesFrameworkError } from '../../../error/AriesFrameworkError' /** * waitForEvent @@ -88,11 +89,8 @@ export async function createRecord( return mediationRecord } -export function assertConnection(record: ConnectionRecord | undefined, errormsg: string): ConnectionRecord { - // Assert connection - record?.assertReady() - if (!record) { - throw new Error(errormsg) - } +export function assertConnection(record: ConnectionRecord | undefined, errormsg: string = "inbound connection is required"): ConnectionRecord { + if (!record) throw new AriesFrameworkError(errormsg) + record.assertReady() return record }