diff --git a/packages/core/src/agent/AgentConfig.ts b/packages/core/src/agent/AgentConfig.ts index 1766ac48db..682e6a9685 100644 --- a/packages/core/src/agent/AgentConfig.ts +++ b/packages/core/src/agent/AgentConfig.ts @@ -79,6 +79,10 @@ export class AgentConfig { return this.initConfig.mediatorPickupStrategy } + public get maximumMessagePickup() { + return this.initConfig.maximumMessagePickup ?? 10 + } + public get endpoints(): [string, ...string[]] { // if endpoints is not set, return queue endpoint // https://github.com/hyperledger/aries-rfcs/issues/405#issuecomment-582612875 diff --git a/packages/core/src/agent/MessageReceiver.ts b/packages/core/src/agent/MessageReceiver.ts index 9ae31bb189..35ef9cdd26 100644 --- a/packages/core/src/agent/MessageReceiver.ts +++ b/packages/core/src/agent/MessageReceiver.ts @@ -8,7 +8,7 @@ import type { TransportSession } from './TransportService' import { Lifecycle, scoped } from 'tsyringe' import { AriesFrameworkError } from '../error' -import { ConnectionRepository } from '../modules/connections' +import { ConnectionRepository } from '../modules/connections/repository' import { DidRepository } from '../modules/dids/repository/DidRepository' import { ProblemReportError, ProblemReportMessage, ProblemReportReason } from '../modules/problem-reports' import { isValidJweStructure } from '../utils/JWE' diff --git a/packages/core/src/agent/helpers.ts b/packages/core/src/agent/helpers.ts index bd3d6dced6..c9aa0d5442 100644 --- a/packages/core/src/agent/helpers.ts +++ b/packages/core/src/agent/helpers.ts @@ -2,6 +2,7 @@ import type { ConnectionRecord } from '../modules/connections' import type { OutboundMessage, OutboundServiceMessage } from '../types' import type { AgentMessage } from './AgentMessage' +import { IndyAgentService } from '../modules/dids' import { DidCommService } from '../modules/dids/domain/service/DidCommService' export function createOutboundMessage( @@ -25,5 +26,6 @@ export function createOutboundServiceMessage(AgentEventTypes.AgentMessageProcessed) + .pipe( + // Stop when the agent shuts down + takeUntil(this.agentConfig.stop$), + // filter by connection id and query disclose message type + filter((e) => e.payload.connection?.id === connectionId && e.payload.message.type === DiscloseMessage.type), + // Return whether the protocol is supported + map((e) => { + const message = e.payload.message as DiscloseMessage + return message.protocols.map((p) => p.protocolId).includes(protocolUri) + }), + // TODO: make configurable + // If we don't have an answer in 7 seconds (no response, not supported, etc...) error + timeout(7000), + // We want to return false if an error occurred + catchError(() => of(false)) + ) + .subscribe(replaySubject) + + await this.queryFeatures(connectionId, { + query: protocolUri, + comment: 'Detect if protocol is supported', + }) + + const isProtocolSupported = await firstValueFrom(replaySubject) + return isProtocolSupported } public async queryFeatures(connectionId: string, options: { query: string; comment?: string }) { diff --git a/packages/core/src/modules/routing/MediatorModule.ts b/packages/core/src/modules/routing/MediatorModule.ts index f4c3a8a995..891c31ce03 100644 --- a/packages/core/src/modules/routing/MediatorModule.ts +++ b/packages/core/src/modules/routing/MediatorModule.ts @@ -6,6 +6,7 @@ import { Lifecycle, scoped } from 'tsyringe' import { AgentConfig } from '../../agent/AgentConfig' import { Dispatcher } from '../../agent/Dispatcher' import { EventEmitter } from '../../agent/EventEmitter' +import { MessageReceiver } from '../../agent/MessageReceiver' import { MessageSender } from '../../agent/MessageSender' import { createOutboundMessage } from '../../agent/helpers' import { ConnectionService } from '../connections/services' @@ -29,6 +30,7 @@ export class MediatorModule { mediationService: MediatorService, messagePickupService: MessagePickupService, messageSender: MessageSender, + messageReceiver: MessageReceiver, eventEmitter: EventEmitter, agentConfig: AgentConfig, connectionService: ConnectionService diff --git a/packages/core/src/modules/routing/MediatorPickupStrategy.ts b/packages/core/src/modules/routing/MediatorPickupStrategy.ts index 59841c6b8d..d4889b6ac9 100644 --- a/packages/core/src/modules/routing/MediatorPickupStrategy.ts +++ b/packages/core/src/modules/routing/MediatorPickupStrategy.ts @@ -1,6 +1,9 @@ export enum MediatorPickupStrategy { // Explicit pickup strategy means picking up messages using the pickup protocol - Explicit = 'Explicit', + PickUpV1 = 'PickUpV1', + + // Supports pickup v2 + PickUpV2 = 'PickUpV2', // Implicit pickup strategy means picking up messages only using return route // decorator. This is what ACA-Py currently uses diff --git a/packages/core/src/modules/routing/RecipientModule.ts b/packages/core/src/modules/routing/RecipientModule.ts index c3c302c3ed..1a2e7a6815 100644 --- a/packages/core/src/modules/routing/RecipientModule.ts +++ b/packages/core/src/modules/routing/RecipientModule.ts @@ -1,4 +1,3 @@ -import type { AgentMessageProcessedEvent } from '../../agent/Events' import type { Logger } from '../../logger' import type { OutboundWebSocketClosedEvent } from '../../transport' import type { OutboundMessage } from '../../types' @@ -6,28 +5,29 @@ import type { ConnectionRecord } from '../connections' import type { MediationStateChangedEvent } from './RoutingEvents' import type { MediationRecord } from './index' -import { firstValueFrom, interval, of, ReplaySubject, timer } from 'rxjs' -import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen, catchError, map } from 'rxjs/operators' +import { firstValueFrom, interval, ReplaySubject, timer } from 'rxjs' +import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen } from 'rxjs/operators' import { Lifecycle, scoped } from 'tsyringe' import { AgentConfig } from '../../agent/AgentConfig' import { Dispatcher } from '../../agent/Dispatcher' import { EventEmitter } from '../../agent/EventEmitter' -import { AgentEventTypes } from '../../agent/Events' +import { MessageReceiver } from '../../agent/MessageReceiver' import { MessageSender } from '../../agent/MessageSender' import { createOutboundMessage } from '../../agent/helpers' import { AriesFrameworkError } from '../../error' import { TransportEventTypes } from '../../transport' -import { parseMessageType } from '../../utils/messageType' import { ConnectionInvitationMessage } from '../connections' import { ConnectionService } from '../connections/services' -import { DiscloseMessage, DiscoverFeaturesModule } from '../discover-features' +import { DiscoverFeaturesModule } from '../discover-features' import { MediatorPickupStrategy } from './MediatorPickupStrategy' import { RoutingEventTypes } from './RoutingEvents' +import { MessageDeliveryHandler, StatusHandler } from './handlers' import { KeylistUpdateResponseHandler } from './handlers/KeylistUpdateResponseHandler' import { MediationDenyHandler } from './handlers/MediationDenyHandler' import { MediationGrantHandler } from './handlers/MediationGrantHandler' +import { StatusRequestMessage } from './messages' import { BatchPickupMessage } from './messages/BatchPickupMessage' import { MediationState } from './models/MediationState' import { MediationRepository } from './repository' @@ -39,6 +39,7 @@ export class RecipientModule { private mediationRecipientService: MediationRecipientService private connectionService: ConnectionService private messageSender: MessageSender + private messageReceiver: MessageReceiver private eventEmitter: EventEmitter private logger: Logger private discoverFeaturesModule: DiscoverFeaturesModule @@ -50,6 +51,7 @@ export class RecipientModule { mediationRecipientService: MediationRecipientService, connectionService: ConnectionService, messageSender: MessageSender, + messageReceiver: MessageReceiver, eventEmitter: EventEmitter, discoverFeaturesModule: DiscoverFeaturesModule, mediationRepository: MediationRepository @@ -58,6 +60,7 @@ export class RecipientModule { this.connectionService = connectionService this.mediationRecipientService = mediationRecipientService this.messageSender = messageSender + this.messageReceiver = messageReceiver this.eventEmitter = eventEmitter this.logger = agentConfig.logger this.discoverFeaturesModule = discoverFeaturesModule @@ -116,24 +119,20 @@ export class RecipientModule { throw new AriesFrameworkError('Cannot open websocket to connection without websocket service endpoint') } - try { - await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message), { - transportPriority: { - schemes: websocketSchemes, - restrictive: true, - // TODO: add keepAlive: true to enforce through the public api - // we need to keep the socket alive. It already works this way, but would - // be good to make more explicit from the public facing API. - // This would also make it easier to change the internal API later on. - // keepAlive: true, - }, - }) - } catch (error) { - this.logger.warn('Unable to open websocket connection to mediator', { error }) - } + await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message), { + transportPriority: { + schemes: websocketSchemes, + restrictive: true, + // TODO: add keepAlive: true to enforce through the public api + // we need to keep the socket alive. It already works this way, but would + // be good to make more explicit from the public facing API. + // This would also make it easier to change the internal API later on. + // keepAlive: true, + }, + }) } - private async initiateImplicitPickup(mediator: MediationRecord) { + private async openWebSocketAndPickUp(mediator: MediationRecord) { let interval = 50 // Listens to Outbound websocket closed events and will reopen the websocket connection @@ -157,10 +156,21 @@ export class RecipientModule { this.logger.warn( `Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...` ) - await this.openMediationWebSocket(mediator) + try { + await this.openMediationWebSocket(mediator) + if (mediator.pickupStrategy === MediatorPickupStrategy.PickUpV2) { + // Start Pickup v2 protocol to receive messages received while websocket offline + await this.mediationRecipientService.requestStatus({ mediatorId: mediator.id }) + } + } catch (error) { + this.logger.warn('Unable to re-open websocket connection to mediator', { error }) + } }) - - await this.openMediationWebSocket(mediator) + try { + await this.openMediationWebSocket(mediator) + } catch (error) { + this.logger.warn('Unable to open websocket connection to mediator', { error }) + } } public async initiateMessagePickup(mediator: MediationRecord) { @@ -168,27 +178,32 @@ export class RecipientModule { const mediatorPickupStrategy = await this.getPickupStrategyForMediator(mediator) const mediatorConnection = await this.connectionService.getById(mediator.connectionId) - // Explicit means polling every X seconds with batch message - if (mediatorPickupStrategy === MediatorPickupStrategy.Explicit) { - this.agentConfig.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediator.id}'`) - const subscription = interval(mediatorPollingInterval) - .pipe(takeUntil(this.agentConfig.stop$)) - .subscribe(async () => { - await this.pickupMessages(mediatorConnection) - }) - - return subscription - } - - // Implicit means sending ping once and keeping connection open. This requires a long-lived transport - // such as WebSockets to work - else if (mediatorPickupStrategy === MediatorPickupStrategy.Implicit) { - this.agentConfig.logger.info(`Starting implicit pickup of messages from mediator '${mediator.id}'`) - await this.initiateImplicitPickup(mediator) - } else { - this.agentConfig.logger.info( - `Skipping pickup of messages from mediator '${mediator.id}' due to pickup strategy none` - ) + switch (mediatorPickupStrategy) { + case MediatorPickupStrategy.PickUpV2: + this.agentConfig.logger.info(`Starting pickup of messages from mediator '${mediator.id}'`) + await this.openWebSocketAndPickUp(mediator) + await this.mediationRecipientService.requestStatus({ mediatorId: mediator.id }) + break + case MediatorPickupStrategy.PickUpV1: { + // Explicit means polling every X seconds with batch message + this.agentConfig.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediator.id}'`) + const subscription = interval(mediatorPollingInterval) + .pipe(takeUntil(this.agentConfig.stop$)) + .subscribe(async () => { + await this.pickupMessages(mediatorConnection) + }) + return subscription + } + case MediatorPickupStrategy.Implicit: + // Implicit means sending ping once and keeping connection open. This requires a long-lived transport + // such as WebSockets to work + this.agentConfig.logger.info(`Starting implicit pickup of messages from mediator '${mediator.id}'`) + await this.openWebSocketAndPickUp(mediator) + break + default: + this.agentConfig.logger.info( + `Skipping pickup of messages from mediator '${mediator.id}' due to pickup strategy none` + ) } } @@ -198,12 +213,23 @@ export class RecipientModule { // If mediator pickup strategy is not configured we try to query if batch pickup // is supported through the discover features protocol if (!mediatorPickupStrategy) { - const isBatchPickupSupported = await this.isBatchPickupSupportedByMediator(mediator) + const isPickUpV2Supported = await this.discoverFeaturesModule.isProtocolSupported( + mediator.connectionId, + StatusRequestMessage + ) + if (isPickUpV2Supported) { + mediatorPickupStrategy = MediatorPickupStrategy.PickUpV2 + } else { + const isBatchPickupSupported = await this.discoverFeaturesModule.isProtocolSupported( + mediator.connectionId, + BatchPickupMessage + ) - // Use explicit pickup strategy - mediatorPickupStrategy = isBatchPickupSupported - ? MediatorPickupStrategy.Explicit - : MediatorPickupStrategy.Implicit + // Use explicit pickup strategy + mediatorPickupStrategy = isBatchPickupSupported + ? MediatorPickupStrategy.PickUpV1 + : MediatorPickupStrategy.Implicit + } // Store the result so it can be reused next time mediator.pickupStrategy = mediatorPickupStrategy @@ -213,42 +239,6 @@ export class RecipientModule { return mediatorPickupStrategy } - private async isBatchPickupSupportedByMediator(mediator: MediationRecord) { - const { protocolUri } = parseMessageType(BatchPickupMessage.type) - - // Listen for response to our feature query - const replaySubject = new ReplaySubject(1) - this.eventEmitter - .observable(AgentEventTypes.AgentMessageProcessed) - .pipe( - // Stop when the agent shuts down - takeUntil(this.agentConfig.stop$), - // filter by mediator connection id and query disclose message type - filter( - (e) => e.payload.connection?.id === mediator.connectionId && e.payload.message.type === DiscloseMessage.type - ), - // Return whether the protocol is supported - map((e) => { - const message = e.payload.message as DiscloseMessage - return message.protocols.map((p) => p.protocolId).includes(protocolUri) - }), - // TODO: make configurable - // If we don't have an answer in 7 seconds (no response, not supported, etc...) error - timeout(7000), - // We want to return false if an error occurred - catchError(() => of(false)) - ) - .subscribe(replaySubject) - - await this.discoverFeaturesModule.queryFeatures(mediator.connectionId, { - query: protocolUri, - comment: 'Detect if batch pickup is supported to determine pickup strategy for messages', - }) - - const isBatchPickupSupported = await firstValueFrom(replaySubject) - return isBatchPickupSupported - } - public async discoverMediation() { return this.mediationRecipientService.discoverMediation() } @@ -396,6 +386,8 @@ export class RecipientModule { dispatcher.registerHandler(new KeylistUpdateResponseHandler(this.mediationRecipientService)) dispatcher.registerHandler(new MediationGrantHandler(this.mediationRecipientService)) dispatcher.registerHandler(new MediationDenyHandler(this.mediationRecipientService)) + dispatcher.registerHandler(new StatusHandler(this.mediationRecipientService)) + dispatcher.registerHandler(new MessageDeliveryHandler(this.mediationRecipientService)) //dispatcher.registerHandler(new KeylistListHandler(this.mediationRecipientService)) // TODO: write this } } diff --git a/packages/core/src/modules/routing/__tests__/mediation.test.ts b/packages/core/src/modules/routing/__tests__/mediation.test.ts index b12a618506..a267352a87 100644 --- a/packages/core/src/modules/routing/__tests__/mediation.test.ts +++ b/packages/core/src/modules/routing/__tests__/mediation.test.ts @@ -72,6 +72,7 @@ describe('mediator establishment', () => { mediatorConnectionsInvite: mediatorInvitation.toUrl({ domain: 'https://example.com/ssi', }), + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }, recipientConfig.agentDependencies ) @@ -94,7 +95,6 @@ describe('mediator establishment', () => { expect(recipientMediatorConnection).toBeConnectedWith(mediatorRecipientConnection) expect(recipientMediator?.state).toBe(MediationState.Granted) - expect(recipientMediator?.pickupStrategy).toBe(MediatorPickupStrategy.Explicit) // Initialize sender agent senderAgent = new Agent(senderConfig.config, senderConfig.agentDependencies) @@ -172,6 +172,7 @@ describe('mediator establishment', () => { { ...recipientConfig.config, mediatorConnectionsInvite: mediatorInvitation.toUrl({ domain: 'https://example.com/ssi' }), + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }, recipientConfig.agentDependencies ) @@ -203,6 +204,7 @@ describe('mediator establishment', () => { mediatorConnectionsInvite: mediatorInvitation.toUrl({ domain: 'https://example.com/ssi', }), + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }, recipientConfig.agentDependencies ) diff --git a/packages/core/src/modules/routing/__tests__/mediationRecipient.test.ts b/packages/core/src/modules/routing/__tests__/mediationRecipient.test.ts new file mode 100644 index 0000000000..0a519c5491 --- /dev/null +++ b/packages/core/src/modules/routing/__tests__/mediationRecipient.test.ts @@ -0,0 +1,116 @@ +import type { Wallet } from '../../../wallet/Wallet' + +import { getAgentConfig } from '../../../../tests/helpers' +import { EventEmitter } from '../../../agent/EventEmitter' +import { MessageReceiver } from '../../../agent/MessageReceiver' +import { MessageSender } from '../../../agent/MessageSender' +import { Attachment } from '../../../decorators/attachment/Attachment' +import { AriesFrameworkError } from '../../../error' +import { IndyWallet } from '../../../wallet/IndyWallet' +import { ConnectionRepository } from '../../connections' +import { ConnectionService } from '../../connections/services/ConnectionService' +import { DeliveryRequestMessage, MessageDeliveryMessage, MessagesReceivedMessage, StatusMessage } from '../messages' +import { MediationRepository } from '../repository' +import { MediationRecipientService } from '../services' + +jest.mock('../repository/MediationRepository') +const MediationRepositoryMock = MediationRepository as jest.Mock + +jest.mock('../../connections/repository/ConnectionRepository') +const ConnectionRepositoryMock = ConnectionRepository as jest.Mock + +jest.mock('../../../agent/MessageSender') +const MessageSenderMock = MessageSender as jest.Mock + +jest.mock('../../../agent/MessageReceiver') +const MessageReceiverMock = MessageReceiver as jest.Mock + +const connectionImageUrl = 'https://example.com/image.png' + +describe('MediationRecipientService', () => { + const config = getAgentConfig('MediationRecipientServiceTest', { + endpoints: ['http://agent.com:8080'], + connectionImageUrl, + }) + + let wallet: Wallet + let mediationRepository: MediationRepository + let eventEmitter: EventEmitter + let connectionService: ConnectionService + let connectionRepository: ConnectionRepository + let messageSender: MessageSender + let mediationRecipientService: MediationRecipientService + let messageReceiver: MessageReceiver + + beforeAll(async () => { + wallet = new IndyWallet(config) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + await wallet.createAndOpen(config.walletConfig!) + }) + + afterAll(async () => { + await wallet.delete() + }) + + beforeEach(async () => { + eventEmitter = new EventEmitter(config) + connectionRepository = new ConnectionRepositoryMock() + connectionService = new ConnectionService(wallet, config, connectionRepository, eventEmitter) + mediationRepository = new MediationRepositoryMock() + messageSender = new MessageSenderMock() + messageReceiver = new MessageReceiverMock() + mediationRecipientService = new MediationRecipientService( + wallet, + connectionService, + messageSender, + config, + mediationRepository, + eventEmitter, + messageReceiver + ) + }) + + describe('processStatus', () => { + it('if status request has a message count of zero returns nothing', async () => { + const status = new StatusMessage({ + messageCount: 0, + }) + const deliveryRequestMessage = await mediationRecipientService.processStatus(status) + expect(deliveryRequestMessage).toBeNull() + }) + + it('if it has a message count greater than zero return a valid delivery request', async () => { + const status = new StatusMessage({ + messageCount: 1, + }) + const deliveryRequestMessage = await mediationRecipientService.processStatus(status) + expect(deliveryRequestMessage) + expect(deliveryRequestMessage).toEqual(new DeliveryRequestMessage({ id: deliveryRequestMessage?.id, limit: 1 })) + }) + }) + + describe('processDelivery', () => { + it('if the delivery has no attachments expect an error', async () => { + expect(mediationRecipientService.processDelivery({} as MessageDeliveryMessage)).rejects.toThrowError( + new AriesFrameworkError('Error processing attachments') + ) + }) + it('other we should expect a message recieved with an message id list in it', async () => { + const messageDeliveryMessage = new MessageDeliveryMessage({ + attachments: [ + new Attachment({ + id: '1', + data: {}, + }), + ], + }) + const messagesReceivedMessage = await mediationRecipientService.processDelivery(messageDeliveryMessage) + expect(messagesReceivedMessage).toEqual( + new MessagesReceivedMessage({ + id: messagesReceivedMessage.id, + messageIdList: ['1'], + }) + ) + }) + }) +}) diff --git a/packages/core/src/modules/routing/error/RoutingProblemReportReason.ts b/packages/core/src/modules/routing/error/RoutingProblemReportReason.ts new file mode 100644 index 0000000000..be5b373257 --- /dev/null +++ b/packages/core/src/modules/routing/error/RoutingProblemReportReason.ts @@ -0,0 +1,3 @@ +export enum RoutingProblemReportReason { + ErrorProcessingAttachments = 'error-processing-attachments', +} diff --git a/packages/core/src/modules/routing/error/index.ts b/packages/core/src/modules/routing/error/index.ts new file mode 100644 index 0000000000..d117e1d699 --- /dev/null +++ b/packages/core/src/modules/routing/error/index.ts @@ -0,0 +1 @@ +export * from './RoutingProblemReportReason' diff --git a/packages/core/src/modules/routing/handlers/MessageDeliveryHandler.ts b/packages/core/src/modules/routing/handlers/MessageDeliveryHandler.ts new file mode 100644 index 0000000000..3d7702a77b --- /dev/null +++ b/packages/core/src/modules/routing/handlers/MessageDeliveryHandler.ts @@ -0,0 +1,24 @@ +import type { Handler } from '../../../agent/Handler' +import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' +import type { MediationRecipientService } from '../services' + +import { createOutboundMessage } from '../../../agent/helpers' +import { MessageDeliveryMessage } from '../messages' + +export class MessageDeliveryHandler implements Handler { + public supportedMessages = [MessageDeliveryMessage] + private mediationRecipientService: MediationRecipientService + + public constructor(mediationRecipientService: MediationRecipientService) { + this.mediationRecipientService = mediationRecipientService + } + + public async handle(messageContext: InboundMessageContext) { + const connection = messageContext.assertReadyConnection() + const deliveryReceivedMessage = await this.mediationRecipientService.processDelivery(messageContext.message) + + if (deliveryReceivedMessage) { + return createOutboundMessage(connection, deliveryReceivedMessage) + } + } +} diff --git a/packages/core/src/modules/routing/handlers/StatusHandler.ts b/packages/core/src/modules/routing/handlers/StatusHandler.ts new file mode 100644 index 0000000000..c281430afd --- /dev/null +++ b/packages/core/src/modules/routing/handlers/StatusHandler.ts @@ -0,0 +1,24 @@ +import type { Handler } from '../../../agent/Handler' +import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' +import type { MediationRecipientService } from '../services' + +import { createOutboundMessage } from '../../../agent/helpers' +import { StatusMessage } from '../messages/StatusMessage' + +export class StatusHandler implements Handler { + public supportedMessages = [StatusMessage] + private mediatorRecipientService: MediationRecipientService + + public constructor(mediatorRecipientService: MediationRecipientService) { + this.mediatorRecipientService = mediatorRecipientService + } + + public async handle(messageContext: InboundMessageContext) { + const connection = messageContext.assertReadyConnection() + const deliveryRequestMessage = this.mediatorRecipientService.processStatus(messageContext.message) + + if (deliveryRequestMessage) { + return createOutboundMessage(connection, deliveryRequestMessage) + } + } +} diff --git a/packages/core/src/modules/routing/handlers/index.ts b/packages/core/src/modules/routing/handlers/index.ts index 52096e760b..0c8f48dc41 100644 --- a/packages/core/src/modules/routing/handlers/index.ts +++ b/packages/core/src/modules/routing/handlers/index.ts @@ -3,3 +3,5 @@ export * from './KeylistUpdateHandler' export * from './BatchHandler' export * from './BatchPickupHandler' export * from './KeylistUpdateResponseHandler' +export * from './StatusHandler' +export * from './MessageDeliveryHandler' diff --git a/packages/core/src/modules/routing/messages/DeliveryRequestMessage.ts b/packages/core/src/modules/routing/messages/DeliveryRequestMessage.ts new file mode 100644 index 0000000000..bff4cf05c3 --- /dev/null +++ b/packages/core/src/modules/routing/messages/DeliveryRequestMessage.ts @@ -0,0 +1,36 @@ +import { Expose } from 'class-transformer' +import { Equals, IsInt, IsOptional, IsString } from 'class-validator' + +import { AgentMessage } from '../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../decorators/transport/TransportDecorator' + +export interface DeliveryRequestMessageOptions { + id?: string + recipientKey?: string + limit: number +} + +export class DeliveryRequestMessage extends AgentMessage { + public constructor(options: DeliveryRequestMessageOptions) { + super() + + if (options) { + this.id = options.id || this.generateId() + this.recipientKey = options.recipientKey + this.limit = options.limit + } + this.setReturnRouting(ReturnRouteTypes.all) + } + + @Equals(DeliveryRequestMessage.type) + public readonly type = DeliveryRequestMessage.type + public static readonly type = 'https://didcomm.org/messagepickup/2.0/delivery-request' + + @IsString() + @IsOptional() + @Expose({ name: 'recipient_key' }) + public recipientKey?: string + + @IsInt() + public limit!: number +} diff --git a/packages/core/src/modules/routing/messages/MessageDeliveryMessage.ts b/packages/core/src/modules/routing/messages/MessageDeliveryMessage.ts new file mode 100644 index 0000000000..a5dbf8f57f --- /dev/null +++ b/packages/core/src/modules/routing/messages/MessageDeliveryMessage.ts @@ -0,0 +1,35 @@ +import type { Attachment } from '../../../decorators/attachment/Attachment' + +import { Expose } from 'class-transformer' +import { Equals, IsOptional, IsString } from 'class-validator' + +import { AgentMessage } from '../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../decorators/transport/TransportDecorator' + +export interface MessageDeliveryMessageOptions { + id?: string + recipientKey?: string + attachments: Attachment[] +} + +export class MessageDeliveryMessage extends AgentMessage { + public constructor(options: MessageDeliveryMessageOptions) { + super() + + if (options) { + this.id = options.id || this.generateId() + this.recipientKey = options.recipientKey + this.attachments = options.attachments + } + this.setReturnRouting(ReturnRouteTypes.all) + } + + @Equals(MessageDeliveryMessage.type) + public readonly type = MessageDeliveryMessage.type + public static readonly type = 'https://didcomm.org/messagepickup/2.0/delivery' + + @IsString() + @IsOptional() + @Expose({ name: 'recipient_key' }) + public recipientKey?: string +} diff --git a/packages/core/src/modules/routing/messages/MessagesReceivedMessage.ts b/packages/core/src/modules/routing/messages/MessagesReceivedMessage.ts new file mode 100644 index 0000000000..3f946b5c78 --- /dev/null +++ b/packages/core/src/modules/routing/messages/MessagesReceivedMessage.ts @@ -0,0 +1,31 @@ +import { Expose } from 'class-transformer' +import { Equals, IsArray, IsOptional } from 'class-validator' + +import { AgentMessage } from '../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../decorators/transport/TransportDecorator' + +export interface MessagesReceivedMessageOptions { + id?: string + messageIdList: string[] +} + +export class MessagesReceivedMessage extends AgentMessage { + public constructor(options: MessagesReceivedMessageOptions) { + super() + + if (options) { + this.id = options.id || this.generateId() + this.messageIdList = options.messageIdList + } + this.setReturnRouting(ReturnRouteTypes.all) + } + + @Equals(MessagesReceivedMessage.type) + public readonly type = MessagesReceivedMessage.type + public static readonly type = 'https://didcomm.org/messagepickup/2.0/messages-received' + + @IsArray() + @IsOptional() + @Expose({ name: 'message_id_list' }) + public messageIdList?: string[] +} diff --git a/packages/core/src/modules/routing/messages/StatusMessage.ts b/packages/core/src/modules/routing/messages/StatusMessage.ts new file mode 100644 index 0000000000..5cfc356f6d --- /dev/null +++ b/packages/core/src/modules/routing/messages/StatusMessage.ts @@ -0,0 +1,74 @@ +import { Expose, Transform } from 'class-transformer' +import { Equals, IsBoolean, IsDate, IsInt, IsOptional, IsString } from 'class-validator' + +import { AgentMessage } from '../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../decorators/transport/TransportDecorator' +import { DateParser } from '../../../utils/transformers' + +export interface StatusMessageOptions { + id?: string + recipientKey?: string + messageCount: number + longestWaitedSeconds?: number + newestReceivedTime?: Date + oldestReceivedTime?: Date + totalBytes?: number + liveDelivery?: boolean +} + +export class StatusMessage extends AgentMessage { + public constructor(options: StatusMessageOptions) { + super() + if (options) { + this.id = options.id || this.generateId() + this.recipientKey = options.recipientKey + this.messageCount = options.messageCount + this.longestWaitedSeconds = options.longestWaitedSeconds + this.newestReceivedTime = options.newestReceivedTime + this.oldestReceivedTime = options.oldestReceivedTime + this.totalBytes = options.totalBytes + this.liveDelivery = options.liveDelivery + } + this.setReturnRouting(ReturnRouteTypes.all) + } + + @Equals(StatusMessage.type) + public readonly type = StatusMessage.type + public static readonly type = 'https://didcomm.org/messagepickup/2.0/status' + + @IsString() + @IsOptional() + @Expose({ name: 'recipient_key' }) + public recipientKey?: string + + @IsInt() + @Expose({ name: 'message_count' }) + public messageCount!: number + + @IsInt() + @IsOptional() + @Expose({ name: 'longest_waited_seconds' }) + public longestWaitedSeconds?: number + + @Expose({ name: 'newest_received_time' }) + @Transform(({ value }) => DateParser(value)) + @IsDate() + @IsOptional() + public newestReceivedTime?: Date + + @IsOptional() + @Transform(({ value }) => DateParser(value)) + @IsDate() + @Expose({ name: 'oldest_received_time' }) + public oldestReceivedTime?: Date + + @IsOptional() + @IsInt() + @Expose({ name: 'total_bytes' }) + public totalBytes?: number + + @IsOptional() + @IsBoolean() + @Expose({ name: 'live_delivery' }) + public liveDelivery?: boolean +} diff --git a/packages/core/src/modules/routing/messages/StatusRequestMessage.ts b/packages/core/src/modules/routing/messages/StatusRequestMessage.ts new file mode 100644 index 0000000000..0af5494d79 --- /dev/null +++ b/packages/core/src/modules/routing/messages/StatusRequestMessage.ts @@ -0,0 +1,29 @@ +import { Expose } from 'class-transformer' +import { Equals, IsOptional, IsString } from 'class-validator' + +import { AgentMessage } from '../../../agent/AgentMessage' + +export interface StatusRequestMessageOptions { + id?: string + recipientKey?: string +} + +export class StatusRequestMessage extends AgentMessage { + public constructor(options: StatusRequestMessageOptions) { + super() + + if (options) { + this.id = options.id || this.generateId() + this.recipientKey = options.recipientKey + } + } + + @Equals(StatusRequestMessage.type) + public readonly type = StatusRequestMessage.type + public static readonly type = 'https://didcomm.org/messagepickup/2.0/status-request' + + @IsString() + @IsOptional() + @Expose({ name: 'recipient_key' }) + public recipientKey?: string +} diff --git a/packages/core/src/modules/routing/messages/index.ts b/packages/core/src/modules/routing/messages/index.ts index b267aeacf1..5859a18cd5 100644 --- a/packages/core/src/modules/routing/messages/index.ts +++ b/packages/core/src/modules/routing/messages/index.ts @@ -6,3 +6,8 @@ export * from './KeylistUpdateResponseMessage' export * from './MediationGrantMessage' export * from './MediationDenyMessage' export * from './MediationRequestMessage' +export * from './DeliveryRequestMessage' +export * from './StatusMessage' +export * from './StatusRequestMessage' +export * from './MessageDeliveryMessage' +export * from './MessagesReceivedMessage' diff --git a/packages/core/src/modules/routing/repository/MediationRecord.ts b/packages/core/src/modules/routing/repository/MediationRecord.ts index dda677b85f..24115007b8 100644 --- a/packages/core/src/modules/routing/repository/MediationRecord.ts +++ b/packages/core/src/modules/routing/repository/MediationRecord.ts @@ -1,9 +1,11 @@ -import type { MediatorPickupStrategy } from '../MediatorPickupStrategy' import type { MediationRole } from '../models/MediationRole' +import { Transform } from 'class-transformer' + import { AriesFrameworkError } from '../../../error' import { BaseRecord } from '../../../storage/BaseRecord' import { uuid } from '../../../utils/uuid' +import { MediatorPickupStrategy } from '../MediatorPickupStrategy' import { MediationState } from '../models/MediationState' export interface MediationRecordProps { @@ -42,6 +44,14 @@ export class MediationRecord public endpoint?: string public recipientKeys!: string[] public routingKeys!: string[] + + @Transform(({ value }) => { + if (value === 'Explicit') { + return MediatorPickupStrategy.PickUpV1 + } else { + return value + } + }) public pickupStrategy?: MediatorPickupStrategy public static readonly type = 'MediationRecord' diff --git a/packages/core/src/modules/routing/services/MediationRecipientService.ts b/packages/core/src/modules/routing/services/MediationRecipientService.ts index dd63b39827..268e7a4e78 100644 --- a/packages/core/src/modules/routing/services/MediationRecipientService.ts +++ b/packages/core/src/modules/routing/services/MediationRecipientService.ts @@ -1,9 +1,17 @@ import type { AgentMessage } from '../../../agent/AgentMessage' import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' +import type { Logger } from '../../../logger' +import type { EncryptedMessage } from '../../../types' import type { ConnectionRecord } from '../../connections' import type { Routing } from '../../connections/services/ConnectionService' import type { MediationStateChangedEvent, KeylistUpdatedEvent } from '../RoutingEvents' -import type { MediationGrantMessage, MediationDenyMessage, KeylistUpdateResponseMessage } from '../messages' +import type { + KeylistUpdateResponseMessage, + MediationDenyMessage, + MediationGrantMessage, + MessageDeliveryMessage, + StatusMessage, +} from '../messages' import { firstValueFrom, ReplaySubject } from 'rxjs' import { filter, first, timeout } from 'rxjs/operators' @@ -11,14 +19,23 @@ import { inject, Lifecycle, scoped } from 'tsyringe' import { AgentConfig } from '../../../agent/AgentConfig' import { EventEmitter } from '../../../agent/EventEmitter' +import { MessageReceiver } from '../../../agent/MessageReceiver' import { MessageSender } from '../../../agent/MessageSender' import { createOutboundMessage } from '../../../agent/helpers' import { InjectionSymbols } from '../../../constants' import { AriesFrameworkError } from '../../../error' import { Wallet } from '../../../wallet/Wallet' import { ConnectionService } from '../../connections/services/ConnectionService' +import { ProblemReportError } from '../../problem-reports' import { RoutingEventTypes } from '../RoutingEvents' -import { KeylistUpdateAction, MediationRequestMessage } from '../messages' +import { RoutingProblemReportReason } from '../error' +import { + StatusRequestMessage, + DeliveryRequestMessage, + MessagesReceivedMessage, + KeylistUpdateAction, + MediationRequestMessage, +} from '../messages' import { KeylistUpdate, KeylistUpdateMessage } from '../messages/KeylistUpdateMessage' import { MediationRole, MediationState } from '../models' import { MediationRecord } from '../repository/MediationRecord' @@ -32,6 +49,8 @@ export class MediationRecipientService { private connectionService: ConnectionService private messageSender: MessageSender private config: AgentConfig + private logger: Logger + private messageReceiver: MessageReceiver public constructor( @inject(InjectionSymbols.Wallet) wallet: Wallet, @@ -39,7 +58,8 @@ export class MediationRecipientService { messageSender: MessageSender, config: AgentConfig, mediatorRepository: MediationRepository, - eventEmitter: EventEmitter + eventEmitter: EventEmitter, + messageReveiver: MessageReceiver ) { this.config = config this.wallet = wallet @@ -47,6 +67,33 @@ export class MediationRecipientService { this.eventEmitter = eventEmitter this.connectionService = connectionService this.messageSender = messageSender + this.logger = config.logger + this.messageReceiver = messageReveiver + } + + public async requestStatus( + config: { + mediatorId?: string + recipientKey?: string + } = {} + ) { + let mediator + let mediatorRecord + + if (config.mediatorId) { + const record = await this.getById(config.mediatorId) + mediator = await this.connectionService.findById(record.id) + } else { + mediatorRecord = await this.findDefaultMediator() + if (mediatorRecord) mediator = await this.connectionService.getById(mediatorRecord.connectionId) + } + + const { recipientKey } = config + const statusRequest = new StatusRequestMessage({ + recipientKey, + }) + if (!mediator) throw new AriesFrameworkError('Could not find mediator connection') + return this.messageSender.sendMessage(createOutboundMessage(mediator, statusRequest)) } public async createRequest( @@ -213,6 +260,46 @@ export class MediationRecipientService { return mediationRecord } + public processStatus(statusMessage: StatusMessage) { + const { messageCount, recipientKey } = statusMessage + + //No messages to be sent + if (messageCount === 0) return null + + const { maximumMessagePickup } = this.config + const limit = messageCount < maximumMessagePickup ? messageCount : maximumMessagePickup + + const deliveryRequestMessage = new DeliveryRequestMessage({ + limit, + recipientKey, + }) + + return deliveryRequestMessage + } + + public async processDelivery(messageDeliveryMessage: MessageDeliveryMessage) { + const { attachments } = messageDeliveryMessage + + if (!attachments) + throw new ProblemReportError('Error processing attachments', { + problemCode: RoutingProblemReportReason.ErrorProcessingAttachments, + }) + + const ids: string[] = [] + for (const attachment of attachments) { + ids.push(attachment.id) + try { + await this.messageReceiver.receiveMessage(attachment.getDataAsJson()) + } catch (error) { + this.logger.error(`Failed to process message id: ${attachment.id}`, { error, attachment }) + } + } + + return new MessagesReceivedMessage({ + messageIdList: ids, + }) + } + /** * Update the record to a new state and emit an state changed event. Also updates the record * in storage. diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index f7067aa451..6910eabd1d 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -68,6 +68,7 @@ export interface InitConfig { clearDefaultMediator?: boolean mediatorPollingInterval?: number mediatorPickupStrategy?: MediatorPickupStrategy + maximumMessagePickup?: number useLegacyDidSovPrefix?: boolean connectionImageUrl?: string diff --git a/packages/core/tests/connectionless-proofs.test.ts b/packages/core/tests/connectionless-proofs.test.ts index 8f77c7dff3..ebddd35ea7 100644 --- a/packages/core/tests/connectionless-proofs.test.ts +++ b/packages/core/tests/connectionless-proofs.test.ts @@ -17,6 +17,7 @@ import { AutoAcceptProof, ProofEventTypes, } from '../src/modules/proofs' +import { MediatorPickupStrategy } from '../src/modules/routing' import { LinkedAttachment } from '../src/utils/LinkedAttachment' import { uuid } from '../src/utils/uuid' @@ -209,12 +210,14 @@ describe('Present Proof', () => { const faberConfig = getBaseConfig(`Connectionless proofs with mediator Faber-${unique}`, { autoAcceptProofs: AutoAcceptProof.Always, mediatorConnectionsInvite: faberMediationInvitation.invitation.toUrl({ domain: 'https://example.com' }), + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }) const aliceConfig = getBaseConfig(`Connectionless proofs with mediator Alice-${unique}`, { autoAcceptProofs: AutoAcceptProof.Always, // logger: new TestLogger(LogLevel.test), mediatorConnectionsInvite: aliceMediationInvitation.invitation.toUrl({ domain: 'https://example.com' }), + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }) const faberAgent = new Agent(faberConfig.config, faberConfig.agentDependencies) diff --git a/tests/e2e-http.test.ts b/tests/e2e-http.test.ts index ac9805df51..fa140f4220 100644 --- a/tests/e2e-http.test.ts +++ b/tests/e2e-http.test.ts @@ -2,11 +2,12 @@ import { getBaseConfig } from '../packages/core/tests/helpers' import { e2eTest } from './e2e-test' -import { HttpOutboundTransport, Agent, AutoAcceptCredential } from '@aries-framework/core' +import { HttpOutboundTransport, Agent, AutoAcceptCredential, MediatorPickupStrategy } from '@aries-framework/core' import { HttpInboundTransport } from '@aries-framework/node' const recipientConfig = getBaseConfig('E2E HTTP Recipient', { autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }) const mediatorPort = 3000 @@ -20,6 +21,7 @@ const senderConfig = getBaseConfig('E2E HTTP Sender', { endpoints: [`http://localhost:${senderPort}`], mediatorPollingInterval: 1000, autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }) describe('E2E HTTP tests', () => { diff --git a/tests/e2e-subject.test.ts b/tests/e2e-subject.test.ts index 0f820c6885..51945cf1ed 100644 --- a/tests/e2e-subject.test.ts +++ b/tests/e2e-subject.test.ts @@ -8,10 +8,11 @@ import { e2eTest } from './e2e-test' import { SubjectInboundTransport } from './transport/SubjectInboundTransport' import { SubjectOutboundTransport } from './transport/SubjectOutboundTransport' -import { Agent, AutoAcceptCredential } from '@aries-framework/core' +import { Agent, AutoAcceptCredential, MediatorPickupStrategy } from '@aries-framework/core' const recipientConfig = getBaseConfig('E2E Subject Recipient', { autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }) const mediatorConfig = getBaseConfig('E2E Subject Mediator', { endpoints: ['rxjs:mediator'], @@ -21,6 +22,7 @@ const senderConfig = getBaseConfig('E2E Subject Sender', { endpoints: ['rxjs:sender'], mediatorPollingInterval: 1000, autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }) describe('E2E Subject tests', () => { diff --git a/tests/e2e-ws.test.ts b/tests/e2e-ws.test.ts index b66f528103..f8452eb484 100644 --- a/tests/e2e-ws.test.ts +++ b/tests/e2e-ws.test.ts @@ -2,11 +2,12 @@ import { getBaseConfig } from '../packages/core/tests/helpers' import { e2eTest } from './e2e-test' -import { Agent, WsOutboundTransport, AutoAcceptCredential } from '@aries-framework/core' +import { Agent, WsOutboundTransport, AutoAcceptCredential, MediatorPickupStrategy } from '@aries-framework/core' import { WsInboundTransport } from '@aries-framework/node' const recipientConfig = getBaseConfig('E2E WS Recipient ', { autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }) const mediatorPort = 4000 @@ -20,6 +21,7 @@ const senderConfig = getBaseConfig('E2E WS Sender', { endpoints: [`ws://localhost:${senderPort}`], mediatorPollingInterval: 1000, autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV1, }) describe('E2E WS tests', () => {