From a98955666853471d504f8a5c8c4623e18ba8c8ed Mon Sep 17 00:00:00 2001 From: Ariel Gentile Date: Thu, 11 Aug 2022 10:30:22 -0300 Subject: [PATCH] feat(routing): pickup v2 mediator role basic implementation (#975) Signed-off-by: Ariel Gentile --- .../src/modules/routing/MediatorModule.ts | 10 +- .../src/modules/routing/RecipientModule.ts | 21 +- .../modules/routing/__tests__/pickup.test.ts | 141 ++++++++++ .../src/modules/routing/handlers/index.ts | 7 +- packages/core/src/modules/routing/index.ts | 1 + .../src/modules/routing/messages/index.ts | 7 - .../src/modules/routing/protocol/index.ts | 1 + .../modules/routing/protocol/pickup/index.ts | 2 + .../pickup/v1/MessagePickupService.ts | 64 +++++ .../pickup/v1}/handlers/BatchHandler.ts | 10 +- .../pickup/v1}/handlers/BatchPickupHandler.ts | 6 +- .../protocol/pickup/v1/handlers/index.ts | 2 + .../routing/protocol/pickup/v1/index.ts | 2 + .../pickup/v1}/messages/BatchMessage.ts | 10 +- .../pickup/v1}/messages/BatchPickupMessage.ts | 4 +- .../protocol/pickup/v1/messages/index.ts | 2 + .../pickup/v2/V2MessagePickupService.ts | 126 +++++++++ .../v2/handlers/DeliveryRequestHandler.ts | 19 ++ .../v2}/handlers/MessageDeliveryHandler.ts | 10 +- .../v2/handlers/MessagesReceivedHandler.ts | 19 ++ .../pickup/v2}/handlers/StatusHandler.ts | 10 +- .../v2/handlers/StatusRequestHandler.ts | 19 ++ .../protocol/pickup/v2/handlers/index.ts | 5 + .../routing/protocol/pickup/v2/index.ts | 2 + .../v2}/messages/DeliveryRequestMessage.ts | 6 +- .../v2}/messages/MessageDeliveryMessage.ts | 12 +- .../v2}/messages/MessagesReceivedMessage.ts | 6 +- .../pickup/v2}/messages/StatusMessage.ts | 12 +- .../v2}/messages/StatusRequestMessage.ts | 4 +- .../protocol/pickup/v2/messages/index.ts | 5 + .../services/MediationRecipientService.ts | 30 +-- .../routing/services/MessagePickupService.ts | 45 ---- .../MediationRecipientService.test.ts | 61 +---- .../__tests__/V2MessagePickupService.test.ts | 249 ++++++++++++++++++ .../src/modules/routing/services/index.ts | 1 - .../src/storage/InMemoryMessageRepository.ts | 10 +- .../core/src/storage/MessageRepository.ts | 7 +- tests/e2e-ws-pickup-v2.test.ts | 69 +++++ 38 files changed, 822 insertions(+), 195 deletions(-) create mode 100644 packages/core/src/modules/routing/__tests__/pickup.test.ts create mode 100644 packages/core/src/modules/routing/protocol/index.ts create mode 100644 packages/core/src/modules/routing/protocol/pickup/index.ts create mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/MessagePickupService.ts rename packages/core/src/modules/routing/{ => protocol/pickup/v1}/handlers/BatchHandler.ts (69%) rename packages/core/src/modules/routing/{ => protocol/pickup/v1}/handlers/BatchPickupHandler.ts (75%) create mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/handlers/index.ts create mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/index.ts rename packages/core/src/modules/routing/{ => protocol/pickup/v1}/messages/BatchMessage.ts (79%) rename packages/core/src/modules/routing/{ => protocol/pickup/v1}/messages/BatchPickupMessage.ts (86%) create mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/messages/index.ts create mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/V2MessagePickupService.ts create mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/DeliveryRequestHandler.ts rename packages/core/src/modules/routing/{ => protocol/pickup/v2}/handlers/MessageDeliveryHandler.ts (64%) create mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessagesReceivedHandler.ts rename packages/core/src/modules/routing/{ => protocol/pickup/v2}/handlers/StatusHandler.ts (65%) create mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusRequestHandler.ts create mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/index.ts create mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/index.ts rename packages/core/src/modules/routing/{ => protocol/pickup/v2}/messages/DeliveryRequestMessage.ts (78%) rename packages/core/src/modules/routing/{ => protocol/pickup/v2}/messages/MessageDeliveryMessage.ts (67%) rename packages/core/src/modules/routing/{ => protocol/pickup/v2}/messages/MessagesReceivedMessage.ts (76%) rename packages/core/src/modules/routing/{ => protocol/pickup/v2}/messages/StatusMessage.ts (83%) rename packages/core/src/modules/routing/{ => protocol/pickup/v2}/messages/StatusRequestMessage.ts (82%) create mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/messages/index.ts delete mode 100644 packages/core/src/modules/routing/services/MessagePickupService.ts create mode 100644 packages/core/src/modules/routing/services/__tests__/V2MessagePickupService.test.ts create mode 100644 tests/e2e-ws-pickup-v2.test.ts diff --git a/packages/core/src/modules/routing/MediatorModule.ts b/packages/core/src/modules/routing/MediatorModule.ts index 9d3c614a84..a4a933329f 100644 --- a/packages/core/src/modules/routing/MediatorModule.ts +++ b/packages/core/src/modules/routing/MediatorModule.ts @@ -11,10 +11,10 @@ import { createOutboundMessage } from '../../agent/helpers' import { injectable, module } from '../../plugins' import { ConnectionService } from '../connections/services' -import { KeylistUpdateHandler, ForwardHandler, BatchPickupHandler, BatchHandler } from './handlers' +import { KeylistUpdateHandler, ForwardHandler } from './handlers' import { MediationRequestHandler } from './handlers/MediationRequestHandler' +import { MessagePickupService, V2MessagePickupService } from './protocol' import { MediatorService } from './services/MediatorService' -import { MessagePickupService } from './services/MessagePickupService' @module() @injectable() @@ -64,8 +64,6 @@ export class MediatorModule { private registerHandlers(dispatcher: Dispatcher) { dispatcher.registerHandler(new KeylistUpdateHandler(this.mediatorService)) dispatcher.registerHandler(new ForwardHandler(this.mediatorService, this.connectionService, this.messageSender)) - dispatcher.registerHandler(new BatchPickupHandler(this.messagePickupService)) - dispatcher.registerHandler(new BatchHandler(this.eventEmitter)) dispatcher.registerHandler(new MediationRequestHandler(this.mediatorService, this.agentConfig)) } @@ -79,5 +77,9 @@ export class MediatorModule { // Services dependencyManager.registerSingleton(MediatorService) dependencyManager.registerSingleton(MessagePickupService) + dependencyManager.registerSingleton(V2MessagePickupService) + + // FIXME: Inject in constructor + dependencyManager.resolve(V2MessagePickupService) } } diff --git a/packages/core/src/modules/routing/RecipientModule.ts b/packages/core/src/modules/routing/RecipientModule.ts index 078a5eb8c6..c7801b26c1 100644 --- a/packages/core/src/modules/routing/RecipientModule.ts +++ b/packages/core/src/modules/routing/RecipientModule.ts @@ -24,13 +24,11 @@ import { DiscoverFeaturesModule } from '../discover-features' import { MediatorPickupStrategy } from './MediatorPickupStrategy' import { RoutingEventTypes } from './RoutingEvents' -import { MessageDeliveryHandler, StatusHandler } from './handlers' import { KeylistUpdateResponseHandler } from './handlers/KeylistUpdateResponseHandler' import { MediationDenyHandler } from './handlers/MediationDenyHandler' import { MediationGrantHandler } from './handlers/MediationGrantHandler' -import { StatusRequestMessage } from './messages' -import { BatchPickupMessage } from './messages/BatchPickupMessage' import { MediationState } from './models/MediationState' +import { BatchPickupMessage, StatusRequestMessage } from './protocol' import { MediationRepository, MediatorRoutingRepository } from './repository' import { MediationRecipientService } from './services/MediationRecipientService' import { RoutingService } from './services/RoutingService' @@ -94,8 +92,8 @@ export class RecipientModule { } } - private async sendMessage(outboundMessage: OutboundMessage) { - const { mediatorPickupStrategy } = this.agentConfig + private async sendMessage(outboundMessage: OutboundMessage, pickupStrategy?: MediatorPickupStrategy) { + const mediatorPickupStrategy = pickupStrategy ?? this.agentConfig.mediatorPickupStrategy const transportPriority = mediatorPickupStrategy === MediatorPickupStrategy.Implicit ? { schemes: ['wss', 'ws'], restrictive: true } @@ -264,12 +262,15 @@ export class RecipientModule { return this.mediationRecipientService.discoverMediation() } - public async pickupMessages(mediatorConnection: ConnectionRecord) { + public async pickupMessages(mediatorConnection: ConnectionRecord, pickupStrategy?: MediatorPickupStrategy) { mediatorConnection.assertReady() - const batchPickupMessage = new BatchPickupMessage({ batchSize: 10 }) - const outboundMessage = createOutboundMessage(mediatorConnection, batchPickupMessage) - await this.sendMessage(outboundMessage) + const pickupMessage = + pickupStrategy === MediatorPickupStrategy.PickUpV2 + ? new StatusRequestMessage({}) + : new BatchPickupMessage({ batchSize: 10 }) + const outboundMessage = createOutboundMessage(mediatorConnection, pickupMessage) + await this.sendMessage(outboundMessage, pickupStrategy) } public async setDefaultMediator(mediatorRecord: MediationRecord) { @@ -376,8 +377,6 @@ export class RecipientModule { dispatcher.registerHandler(new KeylistUpdateResponseHandler(this.mediationRecipientService)) dispatcher.registerHandler(new MediationGrantHandler(this.mediationRecipientService)) dispatcher.registerHandler(new MediationDenyHandler(this.mediationRecipientService)) - dispatcher.registerHandler(new StatusHandler(this.mediationRecipientService)) - dispatcher.registerHandler(new MessageDeliveryHandler(this.mediationRecipientService)) //dispatcher.registerHandler(new KeylistListHandler(this.mediationRecipientService)) // TODO: write this } diff --git a/packages/core/src/modules/routing/__tests__/pickup.test.ts b/packages/core/src/modules/routing/__tests__/pickup.test.ts new file mode 100644 index 0000000000..0869fd5c53 --- /dev/null +++ b/packages/core/src/modules/routing/__tests__/pickup.test.ts @@ -0,0 +1,141 @@ +/* eslint-disable @typescript-eslint/no-non-null-assertion */ +import type { SubjectMessage } from '../../../../../../tests/transport/SubjectInboundTransport' + +import { Subject } from 'rxjs' + +import { SubjectInboundTransport } from '../../../../../../tests/transport/SubjectInboundTransport' +import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport' +import { getBaseConfig, waitForBasicMessage } from '../../../../tests/helpers' +import { Agent } from '../../../agent/Agent' +import { ConsoleLogger, LogLevel } from '../../../logger' +import { HandshakeProtocol } from '../../connections' +import { MediatorPickupStrategy } from '../MediatorPickupStrategy' + +const logger = new ConsoleLogger(LogLevel.info) +const recipientConfig = getBaseConfig('Mediation: Recipient', { + autoAcceptConnections: true, + indyLedgers: [], + logger, +}) +const mediatorConfig = getBaseConfig('Mediation: Mediator', { + autoAcceptConnections: true, + endpoints: ['rxjs:mediator'], + indyLedgers: [], + logger, +}) + +describe('E2E Pick Up protocol', () => { + let recipientAgent: Agent + let mediatorAgent: Agent + + afterEach(async () => { + await recipientAgent?.shutdown() + await recipientAgent?.wallet.delete() + await mediatorAgent?.shutdown() + await mediatorAgent?.wallet.delete() + }) + + test('E2E Pick Up V1 protocol', async () => { + const mediatorMessages = new Subject() + + const subjectMap = { + 'rxjs:mediator': mediatorMessages, + } + + // Initialize mediatorReceived message + mediatorAgent = new Agent(mediatorConfig.config, recipientConfig.agentDependencies) + mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap)) + mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages)) + await mediatorAgent.initialize() + + // Create connection to use for recipient + const mediatorOutOfBandRecord = await mediatorAgent.oob.createInvitation({ + label: 'mediator invitation', + handshake: true, + handshakeProtocols: [HandshakeProtocol.DidExchange], + }) + + // Initialize recipient + recipientAgent = new Agent(recipientConfig.config, recipientConfig.agentDependencies) + recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap)) + await recipientAgent.initialize() + + // Connect + const mediatorInvitation = mediatorOutOfBandRecord.outOfBandInvitation + + let { connectionRecord: recipientMediatorConnection } = await recipientAgent.oob.receiveInvitationFromUrl( + mediatorInvitation.toUrl({ domain: 'https://example.com/ssi' }) + ) + + recipientMediatorConnection = await recipientAgent.connections.returnWhenIsConnected( + recipientMediatorConnection!.id + ) + + let [mediatorRecipientConnection] = await mediatorAgent.connections.findAllByOutOfBandId(mediatorOutOfBandRecord.id) + + mediatorRecipientConnection = await mediatorAgent.connections.returnWhenIsConnected(mediatorRecipientConnection!.id) + + const message = 'hello pickup V1' + await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message) + + await recipientAgent.mediationRecipient.pickupMessages(recipientMediatorConnection) + + const basicMessage = await waitForBasicMessage(recipientAgent, { + content: message, + }) + + expect(basicMessage.content).toBe(message) + }) + + test('E2E Pick Up V2 protocol', async () => { + const mediatorMessages = new Subject() + + const subjectMap = { + 'rxjs:mediator': mediatorMessages, + } + + // Initialize mediatorReceived message + mediatorAgent = new Agent(mediatorConfig.config, recipientConfig.agentDependencies) + mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap)) + mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages)) + await mediatorAgent.initialize() + + // Create connection to use for recipient + const mediatorOutOfBandRecord = await mediatorAgent.oob.createInvitation({ + label: 'mediator invitation', + handshake: true, + handshakeProtocols: [HandshakeProtocol.DidExchange], + }) + + // Initialize recipient + recipientAgent = new Agent(recipientConfig.config, recipientConfig.agentDependencies) + recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap)) + await recipientAgent.initialize() + + // Connect + const mediatorInvitation = mediatorOutOfBandRecord.outOfBandInvitation + + let { connectionRecord: recipientMediatorConnection } = await recipientAgent.oob.receiveInvitationFromUrl( + mediatorInvitation.toUrl({ domain: 'https://example.com/ssi' }) + ) + + recipientMediatorConnection = await recipientAgent.connections.returnWhenIsConnected( + recipientMediatorConnection!.id + ) + + let [mediatorRecipientConnection] = await mediatorAgent.connections.findAllByOutOfBandId(mediatorOutOfBandRecord.id) + + mediatorRecipientConnection = await mediatorAgent.connections.returnWhenIsConnected(mediatorRecipientConnection!.id) + + const message = 'hello pickup V2' + await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message) + + await recipientAgent.mediationRecipient.pickupMessages(recipientMediatorConnection, MediatorPickupStrategy.PickUpV2) + + const basicMessage = await waitForBasicMessage(recipientAgent, { + content: message, + }) + + expect(basicMessage.content).toBe(message) + }) +}) diff --git a/packages/core/src/modules/routing/handlers/index.ts b/packages/core/src/modules/routing/handlers/index.ts index 0c8f48dc41..6ac04444f5 100644 --- a/packages/core/src/modules/routing/handlers/index.ts +++ b/packages/core/src/modules/routing/handlers/index.ts @@ -1,7 +1,6 @@ export * from './ForwardHandler' export * from './KeylistUpdateHandler' -export * from './BatchHandler' -export * from './BatchPickupHandler' export * from './KeylistUpdateResponseHandler' -export * from './StatusHandler' -export * from './MessageDeliveryHandler' +export * from './MediationDenyHandler' +export * from './MediationGrantHandler' +export * from './MediationRequestHandler' diff --git a/packages/core/src/modules/routing/index.ts b/packages/core/src/modules/routing/index.ts index f0655fe116..f3bf782a20 100644 --- a/packages/core/src/modules/routing/index.ts +++ b/packages/core/src/modules/routing/index.ts @@ -1,5 +1,6 @@ export * from './messages' export * from './services' +export * from './protocol' export * from './repository' export * from './models' export * from './RoutingEvents' diff --git a/packages/core/src/modules/routing/messages/index.ts b/packages/core/src/modules/routing/messages/index.ts index 5859a18cd5..06af8aeb93 100644 --- a/packages/core/src/modules/routing/messages/index.ts +++ b/packages/core/src/modules/routing/messages/index.ts @@ -1,13 +1,6 @@ -export * from './BatchMessage' -export * from './BatchPickupMessage' export * from './ForwardMessage' export * from './KeylistUpdateMessage' export * from './KeylistUpdateResponseMessage' export * from './MediationGrantMessage' export * from './MediationDenyMessage' export * from './MediationRequestMessage' -export * from './DeliveryRequestMessage' -export * from './StatusMessage' -export * from './StatusRequestMessage' -export * from './MessageDeliveryMessage' -export * from './MessagesReceivedMessage' diff --git a/packages/core/src/modules/routing/protocol/index.ts b/packages/core/src/modules/routing/protocol/index.ts new file mode 100644 index 0000000000..c18db7326d --- /dev/null +++ b/packages/core/src/modules/routing/protocol/index.ts @@ -0,0 +1 @@ +export * from './pickup' diff --git a/packages/core/src/modules/routing/protocol/pickup/index.ts b/packages/core/src/modules/routing/protocol/pickup/index.ts new file mode 100644 index 0000000000..4d9da63573 --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/index.ts @@ -0,0 +1,2 @@ +export * from './v1' +export * from './v2' diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/MessagePickupService.ts b/packages/core/src/modules/routing/protocol/pickup/v1/MessagePickupService.ts new file mode 100644 index 0000000000..a906a7357f --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v1/MessagePickupService.ts @@ -0,0 +1,64 @@ +import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' +import type { EncryptedMessage } from '../../../../../types' +import type { BatchPickupMessage } from './messages' + +import { Dispatcher } from '../../../../../agent/Dispatcher' +import { EventEmitter } from '../../../../../agent/EventEmitter' +import { createOutboundMessage } from '../../../../../agent/helpers' +import { InjectionSymbols } from '../../../../../constants' +import { inject, injectable } from '../../../../../plugins' +import { MessageRepository } from '../../../../../storage/MessageRepository' + +import { BatchHandler, BatchPickupHandler } from './handlers' +import { BatchMessage, BatchMessageMessage } from './messages' + +@injectable() +export class MessagePickupService { + private messageRepository: MessageRepository + private dispatcher: Dispatcher + private eventEmitter: EventEmitter + + public constructor( + @inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository, + dispatcher: Dispatcher, + eventEmitter: EventEmitter + ) { + this.messageRepository = messageRepository + this.dispatcher = dispatcher + this.eventEmitter = eventEmitter + + this.registerHandlers() + } + + public async batch(messageContext: InboundMessageContext) { + // Assert ready connection + const connection = messageContext.assertReadyConnection() + + const { message } = messageContext + const messages = await this.messageRepository.takeFromQueue(connection.id, message.batchSize) + + // TODO: each message should be stored with an id. to be able to conform to the id property + // of batch message + const batchMessages = messages.map( + (msg) => + new BatchMessageMessage({ + message: msg, + }) + ) + + const batchMessage = new BatchMessage({ + messages: batchMessages, + }) + + return createOutboundMessage(connection, batchMessage) + } + + public async queueMessage(connectionId: string, message: EncryptedMessage) { + await this.messageRepository.add(connectionId, message) + } + + protected registerHandlers() { + this.dispatcher.registerHandler(new BatchPickupHandler(this)) + this.dispatcher.registerHandler(new BatchHandler(this.eventEmitter)) + } +} diff --git a/packages/core/src/modules/routing/handlers/BatchHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchHandler.ts similarity index 69% rename from packages/core/src/modules/routing/handlers/BatchHandler.ts rename to packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchHandler.ts index c18861a673..53791f4335 100644 --- a/packages/core/src/modules/routing/handlers/BatchHandler.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchHandler.ts @@ -1,9 +1,9 @@ -import type { EventEmitter } from '../../../agent/EventEmitter' -import type { AgentMessageReceivedEvent } from '../../../agent/Events' -import type { Handler, HandlerInboundMessage } from '../../../agent/Handler' +import type { EventEmitter } from '../../../../../../agent/EventEmitter' +import type { AgentMessageReceivedEvent } from '../../../../../../agent/Events' +import type { Handler, HandlerInboundMessage } from '../../../../../../agent/Handler' -import { AgentEventTypes } from '../../../agent/Events' -import { AriesFrameworkError } from '../../../error' +import { AgentEventTypes } from '../../../../../../agent/Events' +import { AriesFrameworkError } from '../../../../../../error' import { BatchMessage } from '../messages' export class BatchHandler implements Handler { diff --git a/packages/core/src/modules/routing/handlers/BatchPickupHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchPickupHandler.ts similarity index 75% rename from packages/core/src/modules/routing/handlers/BatchPickupHandler.ts rename to packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchPickupHandler.ts index 841ba039e8..f5d8839ece 100644 --- a/packages/core/src/modules/routing/handlers/BatchPickupHandler.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchPickupHandler.ts @@ -1,7 +1,7 @@ -import type { Handler, HandlerInboundMessage } from '../../../agent/Handler' -import type { MessagePickupService } from '../services' +import type { Handler, HandlerInboundMessage } from '../../../../../../agent/Handler' +import type { MessagePickupService } from '../MessagePickupService' -import { AriesFrameworkError } from '../../../error' +import { AriesFrameworkError } from '../../../../../../error' import { BatchPickupMessage } from '../messages' export class BatchPickupHandler implements Handler { diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/handlers/index.ts b/packages/core/src/modules/routing/protocol/pickup/v1/handlers/index.ts new file mode 100644 index 0000000000..d7a709a49d --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v1/handlers/index.ts @@ -0,0 +1,2 @@ +export * from './BatchHandler' +export * from './BatchPickupHandler' diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/index.ts b/packages/core/src/modules/routing/protocol/pickup/v1/index.ts new file mode 100644 index 0000000000..9174e24a93 --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v1/index.ts @@ -0,0 +1,2 @@ +export * from './MessagePickupService' +export * from './messages' diff --git a/packages/core/src/modules/routing/messages/BatchMessage.ts b/packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchMessage.ts similarity index 79% rename from packages/core/src/modules/routing/messages/BatchMessage.ts rename to packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchMessage.ts index 44f021edc0..fae8f4d3d6 100644 --- a/packages/core/src/modules/routing/messages/BatchMessage.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchMessage.ts @@ -1,11 +1,11 @@ import { Type, Expose } from 'class-transformer' import { Matches, IsArray, ValidateNested, IsObject, IsInstance } from 'class-validator' -import { AgentMessage } from '../../../agent/AgentMessage' -import { MessageIdRegExp } from '../../../agent/BaseMessage' -import { EncryptedMessage } from '../../../types' -import { IsValidMessageType, parseMessageType } from '../../../utils/messageType' -import { uuid } from '../../../utils/uuid' +import { AgentMessage } from '../../../../../../agent/AgentMessage' +import { MessageIdRegExp } from '../../../../../../agent/BaseMessage' +import { EncryptedMessage } from '../../../../../../types' +import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' +import { uuid } from '../../../../../../utils/uuid' export class BatchMessageMessage { public constructor(options: { id?: string; message: EncryptedMessage }) { diff --git a/packages/core/src/modules/routing/messages/BatchPickupMessage.ts b/packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchPickupMessage.ts similarity index 86% rename from packages/core/src/modules/routing/messages/BatchPickupMessage.ts rename to packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchPickupMessage.ts index 83952cedeb..4756bc4416 100644 --- a/packages/core/src/modules/routing/messages/BatchPickupMessage.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchPickupMessage.ts @@ -1,8 +1,8 @@ import { Expose } from 'class-transformer' import { IsInt } from 'class-validator' -import { AgentMessage } from '../../../agent/AgentMessage' -import { IsValidMessageType, parseMessageType } from '../../../utils/messageType' +import { AgentMessage } from '../../../../../../agent/AgentMessage' +import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' export interface BatchPickupMessageOptions { id?: string diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/messages/index.ts b/packages/core/src/modules/routing/protocol/pickup/v1/messages/index.ts new file mode 100644 index 0000000000..8e32f97f68 --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v1/messages/index.ts @@ -0,0 +1,2 @@ +export * from './BatchMessage' +export * from './BatchPickupMessage' diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/V2MessagePickupService.ts b/packages/core/src/modules/routing/protocol/pickup/v2/V2MessagePickupService.ts new file mode 100644 index 0000000000..dbd1a84e42 --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v2/V2MessagePickupService.ts @@ -0,0 +1,126 @@ +import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' +import type { EncryptedMessage } from '../../../../../types' +import type { DeliveryRequestMessage, MessagesReceivedMessage, StatusRequestMessage } from './messages' + +import { Dispatcher } from '../../../../../agent/Dispatcher' +import { createOutboundMessage } from '../../../../../agent/helpers' +import { InjectionSymbols } from '../../../../../constants' +import { Attachment } from '../../../../../decorators/attachment/Attachment' +import { AriesFrameworkError } from '../../../../../error' +import { inject, injectable } from '../../../../../plugins' +import { MessageRepository } from '../../../../../storage/MessageRepository' +import { MediationRecipientService } from '../../../services' + +import { + DeliveryRequestHandler, + MessageDeliveryHandler, + MessagesReceivedHandler, + StatusHandler, + StatusRequestHandler, +} from './handlers' +import { MessageDeliveryMessage, StatusMessage } from './messages' + +@injectable() +export class V2MessagePickupService { + private messageRepository: MessageRepository + private dispatcher: Dispatcher + private mediationRecipientService: MediationRecipientService + + public constructor( + @inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository, + dispatcher: Dispatcher, + mediationRecipientService: MediationRecipientService + ) { + this.messageRepository = messageRepository + this.dispatcher = dispatcher + this.mediationRecipientService = mediationRecipientService + + this.registerHandlers() + } + + public async processStatusRequest(messageContext: InboundMessageContext) { + // Assert ready connection + const connection = messageContext.assertReadyConnection() + + if (messageContext.message.recipientKey) { + throw new AriesFrameworkError('recipient_key parameter not supported') + } + + const statusMessage = new StatusMessage({ + threadId: messageContext.message.threadId, + messageCount: await this.messageRepository.getAvailableMessageCount(connection.id), + }) + + return createOutboundMessage(connection, statusMessage) + } + + public async queueMessage(connectionId: string, message: EncryptedMessage) { + await this.messageRepository.add(connectionId, message) + } + + public async processDeliveryRequest(messageContext: InboundMessageContext) { + // Assert ready connection + const connection = messageContext.assertReadyConnection() + + if (messageContext.message.recipientKey) { + throw new AriesFrameworkError('recipient_key parameter not supported') + } + + const { message } = messageContext + + // Get available messages from queue, but don't delete them + const messages = await this.messageRepository.takeFromQueue(connection.id, message.limit, true) + + // TODO: each message should be stored with an id. to be able to conform to the id property + // of delivery message + const attachments = messages.map( + (msg) => + new Attachment({ + data: { + json: msg, + }, + }) + ) + + const outboundMessage = + messages.length > 0 + ? new MessageDeliveryMessage({ + threadId: messageContext.message.threadId, + attachments, + }) + : new StatusMessage({ + threadId: messageContext.message.threadId, + messageCount: 0, + }) + + return createOutboundMessage(connection, outboundMessage) + } + + public async processMessagesReceived(messageContext: InboundMessageContext) { + // Assert ready connection + const connection = messageContext.assertReadyConnection() + + const { message } = messageContext + + // TODO: Add Queued Message ID + await this.messageRepository.takeFromQueue( + connection.id, + message.messageIdList ? message.messageIdList.length : undefined + ) + + const statusMessage = new StatusMessage({ + threadId: messageContext.message.threadId, + messageCount: await this.messageRepository.getAvailableMessageCount(connection.id), + }) + + return createOutboundMessage(connection, statusMessage) + } + + protected registerHandlers() { + this.dispatcher.registerHandler(new StatusRequestHandler(this)) + this.dispatcher.registerHandler(new DeliveryRequestHandler(this)) + this.dispatcher.registerHandler(new MessagesReceivedHandler(this)) + this.dispatcher.registerHandler(new StatusHandler(this.mediationRecipientService)) + this.dispatcher.registerHandler(new MessageDeliveryHandler(this.mediationRecipientService)) + } +} diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/DeliveryRequestHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/DeliveryRequestHandler.ts new file mode 100644 index 0000000000..a992cc990d --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/DeliveryRequestHandler.ts @@ -0,0 +1,19 @@ +import type { Handler } from '../../../../../../agent/Handler' +import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' +import type { V2MessagePickupService } from '../V2MessagePickupService' + +import { DeliveryRequestMessage } from '../messages' + +export class DeliveryRequestHandler implements Handler { + public supportedMessages = [DeliveryRequestMessage] + private messagePickupService: V2MessagePickupService + + public constructor(messagePickupService: V2MessagePickupService) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: InboundMessageContext) { + messageContext.assertReadyConnection() + return this.messagePickupService.processDeliveryRequest(messageContext) + } +} diff --git a/packages/core/src/modules/routing/handlers/MessageDeliveryHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessageDeliveryHandler.ts similarity index 64% rename from packages/core/src/modules/routing/handlers/MessageDeliveryHandler.ts rename to packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessageDeliveryHandler.ts index 1eb11ed0ed..07b01a3d7f 100644 --- a/packages/core/src/modules/routing/handlers/MessageDeliveryHandler.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessageDeliveryHandler.ts @@ -1,9 +1,9 @@ -import type { Handler } from '../../../agent/Handler' -import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' -import type { MediationRecipientService } from '../services' +import type { Handler } from '../../../../../../agent/Handler' +import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' +import type { MediationRecipientService } from '../../../../services' -import { createOutboundMessage } from '../../../agent/helpers' -import { MessageDeliveryMessage } from '../messages' +import { createOutboundMessage } from '../../../../../../agent/helpers' +import { MessageDeliveryMessage } from '../messages/MessageDeliveryMessage' export class MessageDeliveryHandler implements Handler { public supportedMessages = [MessageDeliveryMessage] diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessagesReceivedHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessagesReceivedHandler.ts new file mode 100644 index 0000000000..079762881e --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessagesReceivedHandler.ts @@ -0,0 +1,19 @@ +import type { Handler } from '../../../../../../agent/Handler' +import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' +import type { V2MessagePickupService } from '../V2MessagePickupService' + +import { MessagesReceivedMessage } from '../messages' + +export class MessagesReceivedHandler implements Handler { + public supportedMessages = [MessagesReceivedMessage] + private messagePickupService: V2MessagePickupService + + public constructor(messagePickupService: V2MessagePickupService) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: InboundMessageContext) { + messageContext.assertReadyConnection() + return this.messagePickupService.processMessagesReceived(messageContext) + } +} diff --git a/packages/core/src/modules/routing/handlers/StatusHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusHandler.ts similarity index 65% rename from packages/core/src/modules/routing/handlers/StatusHandler.ts rename to packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusHandler.ts index b3ea61fe3d..08e4278c61 100644 --- a/packages/core/src/modules/routing/handlers/StatusHandler.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusHandler.ts @@ -1,9 +1,9 @@ -import type { Handler } from '../../../agent/Handler' -import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' -import type { MediationRecipientService } from '../services' +import type { Handler } from '../../../../../../agent/Handler' +import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' +import type { MediationRecipientService } from '../../../../services' -import { createOutboundMessage } from '../../../agent/helpers' -import { StatusMessage } from '../messages/StatusMessage' +import { createOutboundMessage } from '../../../../../../agent/helpers' +import { StatusMessage } from '../messages' export class StatusHandler implements Handler { public supportedMessages = [StatusMessage] diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusRequestHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusRequestHandler.ts new file mode 100644 index 0000000000..7d069ddcd2 --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusRequestHandler.ts @@ -0,0 +1,19 @@ +import type { Handler } from '../../../../../../agent/Handler' +import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' +import type { V2MessagePickupService } from '../V2MessagePickupService' + +import { StatusRequestMessage } from '../messages' + +export class StatusRequestHandler implements Handler { + public supportedMessages = [StatusRequestMessage] + private messagePickupService: V2MessagePickupService + + public constructor(messagePickupService: V2MessagePickupService) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: InboundMessageContext) { + messageContext.assertReadyConnection() + return this.messagePickupService.processStatusRequest(messageContext) + } +} diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/index.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/index.ts new file mode 100644 index 0000000000..c8f4456634 --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/index.ts @@ -0,0 +1,5 @@ +export * from './DeliveryRequestHandler' +export * from './MessageDeliveryHandler' +export * from './MessagesReceivedHandler' +export * from './StatusHandler' +export * from './StatusRequestHandler' diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/index.ts b/packages/core/src/modules/routing/protocol/pickup/v2/index.ts new file mode 100644 index 0000000000..b6a5eb72c5 --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v2/index.ts @@ -0,0 +1,2 @@ +export * from './V2MessagePickupService' +export * from './messages' diff --git a/packages/core/src/modules/routing/messages/DeliveryRequestMessage.ts b/packages/core/src/modules/routing/protocol/pickup/v2/messages/DeliveryRequestMessage.ts similarity index 78% rename from packages/core/src/modules/routing/messages/DeliveryRequestMessage.ts rename to packages/core/src/modules/routing/protocol/pickup/v2/messages/DeliveryRequestMessage.ts index fc190a8603..21e044309a 100644 --- a/packages/core/src/modules/routing/messages/DeliveryRequestMessage.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v2/messages/DeliveryRequestMessage.ts @@ -1,9 +1,9 @@ import { Expose } from 'class-transformer' import { IsInt, IsOptional, IsString } from 'class-validator' -import { AgentMessage } from '../../../agent/AgentMessage' -import { ReturnRouteTypes } from '../../../decorators/transport/TransportDecorator' -import { IsValidMessageType, parseMessageType } from '../../../utils/messageType' +import { AgentMessage } from '../../../../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../../../../decorators/transport/TransportDecorator' +import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' export interface DeliveryRequestMessageOptions { id?: string diff --git a/packages/core/src/modules/routing/messages/MessageDeliveryMessage.ts b/packages/core/src/modules/routing/protocol/pickup/v2/messages/MessageDeliveryMessage.ts similarity index 67% rename from packages/core/src/modules/routing/messages/MessageDeliveryMessage.ts rename to packages/core/src/modules/routing/protocol/pickup/v2/messages/MessageDeliveryMessage.ts index c3ebf34a16..fc3e215720 100644 --- a/packages/core/src/modules/routing/messages/MessageDeliveryMessage.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v2/messages/MessageDeliveryMessage.ts @@ -1,15 +1,16 @@ -import type { Attachment } from '../../../decorators/attachment/Attachment' +import type { Attachment } from '../../../../../../decorators/attachment/Attachment' import { Expose } from 'class-transformer' import { IsOptional, IsString } from 'class-validator' -import { AgentMessage } from '../../../agent/AgentMessage' -import { ReturnRouteTypes } from '../../../decorators/transport/TransportDecorator' -import { IsValidMessageType, parseMessageType } from '../../../utils/messageType' +import { AgentMessage } from '../../../../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../../../../decorators/transport/TransportDecorator' +import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' export interface MessageDeliveryMessageOptions { id?: string recipientKey?: string + threadId: string attachments: Attachment[] } @@ -21,6 +22,9 @@ export class MessageDeliveryMessage extends AgentMessage { this.id = options.id || this.generateId() this.recipientKey = options.recipientKey this.appendedAttachments = options.attachments + this.setThread({ + threadId: options.threadId, + }) } this.setReturnRouting(ReturnRouteTypes.all) } diff --git a/packages/core/src/modules/routing/messages/MessagesReceivedMessage.ts b/packages/core/src/modules/routing/protocol/pickup/v2/messages/MessagesReceivedMessage.ts similarity index 76% rename from packages/core/src/modules/routing/messages/MessagesReceivedMessage.ts rename to packages/core/src/modules/routing/protocol/pickup/v2/messages/MessagesReceivedMessage.ts index 84edf6f920..be59ba7639 100644 --- a/packages/core/src/modules/routing/messages/MessagesReceivedMessage.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v2/messages/MessagesReceivedMessage.ts @@ -1,9 +1,9 @@ import { Expose } from 'class-transformer' import { IsArray, IsOptional } from 'class-validator' -import { AgentMessage } from '../../../agent/AgentMessage' -import { ReturnRouteTypes } from '../../../decorators/transport/TransportDecorator' -import { IsValidMessageType, parseMessageType } from '../../../utils/messageType' +import { AgentMessage } from '../../../../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../../../../decorators/transport/TransportDecorator' +import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' export interface MessagesReceivedMessageOptions { id?: string diff --git a/packages/core/src/modules/routing/messages/StatusMessage.ts b/packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusMessage.ts similarity index 83% rename from packages/core/src/modules/routing/messages/StatusMessage.ts rename to packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusMessage.ts index 467806767d..8e2851ba6c 100644 --- a/packages/core/src/modules/routing/messages/StatusMessage.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusMessage.ts @@ -1,14 +1,15 @@ import { Expose, Transform } from 'class-transformer' import { IsBoolean, IsDate, IsInt, IsOptional, IsString } from 'class-validator' -import { AgentMessage } from '../../../agent/AgentMessage' -import { ReturnRouteTypes } from '../../../decorators/transport/TransportDecorator' -import { IsValidMessageType, parseMessageType } from '../../../utils/messageType' -import { DateParser } from '../../../utils/transformers' +import { AgentMessage } from '../../../../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../../../../decorators/transport/TransportDecorator' +import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' +import { DateParser } from '../../../../../../utils/transformers' export interface StatusMessageOptions { id?: string recipientKey?: string + threadId: string messageCount: number longestWaitedSeconds?: number newestReceivedTime?: Date @@ -29,6 +30,9 @@ export class StatusMessage extends AgentMessage { this.oldestReceivedTime = options.oldestReceivedTime this.totalBytes = options.totalBytes this.liveDelivery = options.liveDelivery + this.setThread({ + threadId: options.threadId, + }) } this.setReturnRouting(ReturnRouteTypes.all) } diff --git a/packages/core/src/modules/routing/messages/StatusRequestMessage.ts b/packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusRequestMessage.ts similarity index 82% rename from packages/core/src/modules/routing/messages/StatusRequestMessage.ts rename to packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusRequestMessage.ts index 1ab1ffbe89..c25c1d8c4a 100644 --- a/packages/core/src/modules/routing/messages/StatusRequestMessage.ts +++ b/packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusRequestMessage.ts @@ -1,8 +1,8 @@ import { Expose } from 'class-transformer' import { IsOptional, IsString } from 'class-validator' -import { AgentMessage } from '../../../agent/AgentMessage' -import { IsValidMessageType, parseMessageType } from '../../../utils/messageType' +import { AgentMessage } from '../../../../../../agent/AgentMessage' +import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' export interface StatusRequestMessageOptions { id?: string diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/messages/index.ts b/packages/core/src/modules/routing/protocol/pickup/v2/messages/index.ts new file mode 100644 index 0000000000..fa807e7249 --- /dev/null +++ b/packages/core/src/modules/routing/protocol/pickup/v2/messages/index.ts @@ -0,0 +1,5 @@ +export * from './DeliveryRequestMessage' +export * from './MessageDeliveryMessage' +export * from './MessagesReceivedMessage' +export * from './StatusMessage' +export * from './StatusRequestMessage' diff --git a/packages/core/src/modules/routing/services/MediationRecipientService.ts b/packages/core/src/modules/routing/services/MediationRecipientService.ts index 94f4cbfa09..5d5073689e 100644 --- a/packages/core/src/modules/routing/services/MediationRecipientService.ts +++ b/packages/core/src/modules/routing/services/MediationRecipientService.ts @@ -5,13 +5,8 @@ import type { EncryptedMessage } from '../../../types' import type { ConnectionRecord } from '../../connections' import type { Routing } from '../../connections/services/ConnectionService' import type { MediationStateChangedEvent, KeylistUpdatedEvent } from '../RoutingEvents' -import type { - KeylistUpdateResponseMessage, - MediationDenyMessage, - MediationGrantMessage, - MessageDeliveryMessage, -} from '../messages' -import type { StatusMessage } from '../messages/StatusMessage' +import type { KeylistUpdateResponseMessage, MediationDenyMessage, MediationGrantMessage } from '../messages' +import type { StatusMessage, MessageDeliveryMessage } from '../protocol' import type { GetRoutingOptions } from './RoutingService' import { firstValueFrom, ReplaySubject } from 'rxjs' @@ -32,15 +27,10 @@ import { didKeyToVerkey } from '../../dids/helpers' import { ProblemReportError } from '../../problem-reports' import { RoutingEventTypes } from '../RoutingEvents' import { RoutingProblemReportReason } from '../error' -import { - StatusRequestMessage, - DeliveryRequestMessage, - MessagesReceivedMessage, - KeylistUpdateAction, - MediationRequestMessage, -} from '../messages' +import { KeylistUpdateAction, MediationRequestMessage } from '../messages' import { KeylistUpdate, KeylistUpdateMessage } from '../messages/KeylistUpdateMessage' import { MediationRole, MediationState } from '../models' +import { DeliveryRequestMessage, MessagesReceivedMessage, StatusRequestMessage } from '../protocol/pickup/v2/messages' import { MediationRecord } from '../repository/MediationRecord' import { MediationRepository } from '../repository/MediationRepository' @@ -249,11 +239,6 @@ export class MediationRecipientService { const { message: statusMessage } = messageContext const { messageCount, recipientKey } = statusMessage - const mediationRecord = await this.mediationRepository.getByConnectionId(connection.id) - - mediationRecord.assertReady() - mediationRecord.assertRole(MediationRole.Recipient) - //No messages to be sent if (messageCount === 0) { const { message, connectionRecord } = await this.connectionService.createTrustPing(connection, { @@ -287,15 +272,10 @@ export class MediationRecipientService { } public async processDelivery(messageContext: InboundMessageContext) { - const connection = messageContext.assertReadyConnection() + messageContext.assertReadyConnection() const { appendedAttachments } = messageContext.message - const mediationRecord = await this.mediationRepository.getByConnectionId(connection.id) - - mediationRecord.assertReady() - mediationRecord.assertRole(MediationRole.Recipient) - if (!appendedAttachments) throw new ProblemReportError('Error processing attachments', { problemCode: RoutingProblemReportReason.ErrorProcessingAttachments, diff --git a/packages/core/src/modules/routing/services/MessagePickupService.ts b/packages/core/src/modules/routing/services/MessagePickupService.ts deleted file mode 100644 index cc02a2e3cc..0000000000 --- a/packages/core/src/modules/routing/services/MessagePickupService.ts +++ /dev/null @@ -1,45 +0,0 @@ -import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' -import type { EncryptedMessage } from '../../../types' -import type { BatchPickupMessage } from '../messages' - -import { createOutboundMessage } from '../../../agent/helpers' -import { InjectionSymbols } from '../../../constants' -import { inject, injectable } from '../../../plugins' -import { MessageRepository } from '../../../storage/MessageRepository' -import { BatchMessage, BatchMessageMessage } from '../messages' - -@injectable() -export class MessagePickupService { - private messageRepository: MessageRepository - - public constructor(@inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository) { - this.messageRepository = messageRepository - } - - public async batch(messageContext: InboundMessageContext) { - // Assert ready connection - const connection = messageContext.assertReadyConnection() - - const { message } = messageContext - const messages = await this.messageRepository.takeFromQueue(connection.id, message.batchSize) - - // TODO: each message should be stored with an id. to be able to conform to the id property - // of batch message - const batchMessages = messages.map( - (msg) => - new BatchMessageMessage({ - message: msg, - }) - ) - - const batchMessage = new BatchMessage({ - messages: batchMessages, - }) - - return createOutboundMessage(connection, batchMessage) - } - - public async queueMessage(connectionId: string, message: EncryptedMessage) { - await this.messageRepository.add(connectionId, message) - } -} diff --git a/packages/core/src/modules/routing/services/__tests__/MediationRecipientService.test.ts b/packages/core/src/modules/routing/services/__tests__/MediationRecipientService.test.ts index 73748ce2d7..4f1253a5c2 100644 --- a/packages/core/src/modules/routing/services/__tests__/MediationRecipientService.test.ts +++ b/packages/core/src/modules/routing/services/__tests__/MediationRecipientService.test.ts @@ -8,20 +8,16 @@ import { MessageSender } from '../../../../agent/MessageSender' import { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' import { Attachment } from '../../../../decorators/attachment/Attachment' import { AriesFrameworkError } from '../../../../error' +import { uuid } from '../../../../utils/uuid' import { IndyWallet } from '../../../../wallet/IndyWallet' import { DidExchangeState } from '../../../connections' import { ConnectionRepository } from '../../../connections/repository/ConnectionRepository' import { ConnectionService } from '../../../connections/services/ConnectionService' import { Key } from '../../../dids' import { DidRepository } from '../../../dids/repository/DidRepository' -import { - DeliveryRequestMessage, - MediationGrantMessage, - MessageDeliveryMessage, - MessagesReceivedMessage, - StatusMessage, -} from '../../messages' +import { MediationGrantMessage } from '../../messages' import { MediationRole, MediationState } from '../../models' +import { DeliveryRequestMessage, MessageDeliveryMessage, MessagesReceivedMessage, StatusMessage } from '../../protocol' import { MediationRecord } from '../../repository/MediationRecord' import { MediationRepository } from '../../repository/MediationRepository' import { MediationRecipientService } from '../MediationRecipientService' @@ -161,6 +157,7 @@ describe('MediationRecipientService', () => { describe('processStatus', () => { it('if status request has a message count of zero returns nothing', async () => { const status = new StatusMessage({ + threadId: uuid(), messageCount: 0, }) @@ -171,6 +168,7 @@ describe('MediationRecipientService', () => { it('if it has a message count greater than zero return a valid delivery request', async () => { const status = new StatusMessage({ + threadId: uuid(), messageCount: 1, }) const messageContext = new InboundMessageContext(status, { connection: mockConnection }) @@ -179,25 +177,6 @@ describe('MediationRecipientService', () => { expect(deliveryRequestMessage) expect(deliveryRequestMessage).toEqual(new DeliveryRequestMessage({ id: deliveryRequestMessage?.id, limit: 1 })) }) - - it('it throws an error when the mediation record has incorrect role or state', async () => { - const status = new StatusMessage({ - messageCount: 1, - }) - const messageContext = new InboundMessageContext(status, { connection: mockConnection }) - - mediationRecord.role = MediationRole.Mediator - await expect(mediationRecipientService.processStatus(messageContext)).rejects.toThrowError( - 'Mediation record has invalid role MEDIATOR. Expected role RECIPIENT.' - ) - - mediationRecord.role = MediationRole.Recipient - mediationRecord.state = MediationState.Requested - - await expect(mediationRecipientService.processStatus(messageContext)).rejects.toThrowError( - 'Mediation record is not ready to be used. Expected granted, found invalid state requested' - ) - }) }) describe('processDelivery', () => { @@ -211,6 +190,7 @@ describe('MediationRecipientService', () => { it('should return a message received with an message id list in it', async () => { const messageDeliveryMessage = new MessageDeliveryMessage({ + threadId: uuid(), attachments: [ new Attachment({ id: '1', @@ -236,6 +216,7 @@ describe('MediationRecipientService', () => { it('calls the event emitter for each message', async () => { const messageDeliveryMessage = new MessageDeliveryMessage({ + threadId: uuid(), attachments: [ new Attachment({ id: '1', @@ -273,34 +254,6 @@ describe('MediationRecipientService', () => { }, }) }) - - it('it throws an error when the mediation record has incorrect role or state', async () => { - const messageDeliveryMessage = new MessageDeliveryMessage({ - attachments: [ - new Attachment({ - id: '1', - data: { - json: { - a: 'value', - }, - }, - }), - ], - }) - const messageContext = new InboundMessageContext(messageDeliveryMessage, { connection: mockConnection }) - - mediationRecord.role = MediationRole.Mediator - await expect(mediationRecipientService.processDelivery(messageContext)).rejects.toThrowError( - 'Mediation record has invalid role MEDIATOR. Expected role RECIPIENT.' - ) - - mediationRecord.role = MediationRole.Recipient - mediationRecord.state = MediationState.Requested - - await expect(mediationRecipientService.processDelivery(messageContext)).rejects.toThrowError( - 'Mediation record is not ready to be used. Expected granted, found invalid state requested' - ) - }) }) describe('addMediationRouting', () => { diff --git a/packages/core/src/modules/routing/services/__tests__/V2MessagePickupService.test.ts b/packages/core/src/modules/routing/services/__tests__/V2MessagePickupService.test.ts new file mode 100644 index 0000000000..c88bb7e7b2 --- /dev/null +++ b/packages/core/src/modules/routing/services/__tests__/V2MessagePickupService.test.ts @@ -0,0 +1,249 @@ +import type { MessageRepository } from '../../../../storage/MessageRepository' +import type { EncryptedMessage } from '../../../../types' + +import { getMockConnection, mockFunction } from '../../../../../tests/helpers' +import { Dispatcher } from '../../../../agent/Dispatcher' +import { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' +import { InMemoryMessageRepository } from '../../../../storage/InMemoryMessageRepository' +import { DidExchangeState } from '../../../connections' +import { + DeliveryRequestMessage, + MessageDeliveryMessage, + MessagesReceivedMessage, + StatusMessage, + StatusRequestMessage, + V2MessagePickupService, +} from '../../protocol' +import { MediationRecipientService } from '../MediationRecipientService' + +const mockConnection = getMockConnection({ + state: DidExchangeState.Completed, +}) + +// Mock classes +jest.mock('../MediationRecipientService') +jest.mock('../../../../storage/InMemoryMessageRepository') +jest.mock('../../../../agent/Dispatcher') + +// Mock typed object +const MediationRecipientServiceMock = MediationRecipientService as jest.Mock +const DispatcherMock = Dispatcher as jest.Mock +const InMessageRepositoryMock = InMemoryMessageRepository as jest.Mock + +const encryptedMessage: EncryptedMessage = { + protected: 'base64url', + iv: 'base64url', + ciphertext: 'base64url', + tag: 'base64url', +} +const queuedMessages = [encryptedMessage, encryptedMessage, encryptedMessage] + +describe('V2MessagePickupService', () => { + let pickupService: V2MessagePickupService + let messageRepository: MessageRepository + + beforeEach(async () => { + const dispatcher = new DispatcherMock() + const mediationRecipientService = new MediationRecipientServiceMock() + + messageRepository = new InMessageRepositoryMock() + pickupService = new V2MessagePickupService(messageRepository, dispatcher, mediationRecipientService) + }) + + describe('processStatusRequest', () => { + test('no available messages in queue', async () => { + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(0) + + const statusRequest = new StatusRequestMessage({}) + + const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection }) + + const { connection, payload } = await pickupService.processStatusRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(payload).toEqual( + new StatusMessage({ + id: payload.id, + threadId: statusRequest.threadId, + messageCount: 0, + }) + ) + expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(connection.id) + }) + + test('multiple messages in queue', async () => { + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(5) + const statusRequest = new StatusRequestMessage({}) + + const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection }) + + const { connection, payload } = await pickupService.processStatusRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(payload).toEqual( + new StatusMessage({ + id: payload.id, + threadId: statusRequest.threadId, + messageCount: 5, + }) + ) + expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(connection.id) + }) + + test('status request specifying recipient key', async () => { + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(10) + + const statusRequest = new StatusRequestMessage({ + recipientKey: 'recipientKey', + }) + + const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection }) + + await expect(pickupService.processStatusRequest(messageContext)).rejects.toThrowError( + 'recipient_key parameter not supported' + ) + }) + }) + + describe('processDeliveryRequest', () => { + test('no available messages in queue', async () => { + mockFunction(messageRepository.takeFromQueue).mockResolvedValue([]) + + const deliveryRequest = new DeliveryRequestMessage({ limit: 10 }) + + const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection }) + + const { connection, payload } = await pickupService.processDeliveryRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(payload).toEqual( + new StatusMessage({ + id: payload.id, + threadId: deliveryRequest.threadId, + messageCount: 0, + }) + ) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(connection.id, 10, true) + }) + + test('less messages in queue than limit', async () => { + mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages) + + const deliveryRequest = new DeliveryRequestMessage({ limit: 10 }) + + const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection }) + + const { connection, payload } = await pickupService.processDeliveryRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(payload).toBeInstanceOf(MessageDeliveryMessage) + expect(payload.threadId).toEqual(deliveryRequest.threadId) + expect(payload.appendedAttachments?.length).toEqual(3) + expect(payload.appendedAttachments).toEqual( + expect.arrayContaining( + queuedMessages.map((msg) => + expect.objectContaining({ + data: { + json: msg, + }, + }) + ) + ) + ) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(connection.id, 10, true) + }) + + test('more messages in queue than limit', async () => { + mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages.slice(0, 2)) + + const deliveryRequest = new DeliveryRequestMessage({ limit: 2 }) + + const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection }) + + const { connection, payload } = await pickupService.processDeliveryRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(payload).toBeInstanceOf(MessageDeliveryMessage) + expect(payload.threadId).toEqual(deliveryRequest.threadId) + expect(payload.appendedAttachments?.length).toEqual(2) + expect(payload.appendedAttachments).toEqual( + expect.arrayContaining( + queuedMessages.slice(0, 2).map((msg) => + expect.objectContaining({ + data: { + json: msg, + }, + }) + ) + ) + ) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(connection.id, 2, true) + }) + + test('delivery request specifying recipient key', async () => { + mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages) + + const statusRequest = new DeliveryRequestMessage({ + limit: 10, + recipientKey: 'recipientKey', + }) + + const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection }) + + await expect(pickupService.processStatusRequest(messageContext)).rejects.toThrowError( + 'recipient_key parameter not supported' + ) + }) + }) + + describe('processMessagesReceived', () => { + test('messages received partially', async () => { + mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages) + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(4) + + const messagesReceived = new MessagesReceivedMessage({ + messageIdList: ['1', '2'], + }) + + const messageContext = new InboundMessageContext(messagesReceived, { connection: mockConnection }) + + const { connection, payload } = await pickupService.processMessagesReceived(messageContext) + + expect(connection).toEqual(mockConnection) + expect(payload).toEqual( + new StatusMessage({ + id: payload.id, + threadId: messagesReceived.threadId, + messageCount: 4, + }) + ) + expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(connection.id) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(connection.id, 2) + }) + + test('all messages have been received', async () => { + mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages) + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(0) + + const messagesReceived = new MessagesReceivedMessage({ + messageIdList: ['1', '2'], + }) + + const messageContext = new InboundMessageContext(messagesReceived, { connection: mockConnection }) + + const { connection, payload } = await pickupService.processMessagesReceived(messageContext) + + expect(connection).toEqual(mockConnection) + expect(payload).toEqual( + new StatusMessage({ + id: payload.id, + threadId: messagesReceived.threadId, + messageCount: 0, + }) + ) + + expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(connection.id) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(connection.id, 2) + }) + }) +}) diff --git a/packages/core/src/modules/routing/services/index.ts b/packages/core/src/modules/routing/services/index.ts index 4026cfae8e..804d2eb3f9 100644 --- a/packages/core/src/modules/routing/services/index.ts +++ b/packages/core/src/modules/routing/services/index.ts @@ -1,4 +1,3 @@ -export * from './MessagePickupService' export * from './MediationRecipientService' export * from './MediatorService' export * from './RoutingService' diff --git a/packages/core/src/storage/InMemoryMessageRepository.ts b/packages/core/src/storage/InMemoryMessageRepository.ts index d2e404d40b..c73a873476 100644 --- a/packages/core/src/storage/InMemoryMessageRepository.ts +++ b/packages/core/src/storage/InMemoryMessageRepository.ts @@ -14,7 +14,11 @@ export class InMemoryMessageRepository implements MessageRepository { this.logger = agentConfig.logger } - public takeFromQueue(connectionId: string, limit?: number) { + public getAvailableMessageCount(connectionId: string): number | Promise { + return this.messages[connectionId] ? this.messages[connectionId].length : 0 + } + + public takeFromQueue(connectionId: string, limit?: number, keepMessages?: boolean) { if (!this.messages[connectionId]) { return [] } @@ -22,7 +26,9 @@ export class InMemoryMessageRepository implements MessageRepository { const messagesToTake = limit ?? this.messages[connectionId].length this.logger.debug(`Taking ${messagesToTake} messages from queue for connection ${connectionId}`) - return this.messages[connectionId].splice(0, messagesToTake) + return keepMessages + ? this.messages[connectionId].slice(0, messagesToTake) + : this.messages[connectionId].splice(0, messagesToTake) } public add(connectionId: string, payload: EncryptedMessage) { diff --git a/packages/core/src/storage/MessageRepository.ts b/packages/core/src/storage/MessageRepository.ts index 75fcd0cbd1..d12c7b6c07 100644 --- a/packages/core/src/storage/MessageRepository.ts +++ b/packages/core/src/storage/MessageRepository.ts @@ -1,6 +1,11 @@ import type { EncryptedMessage } from '../types' export interface MessageRepository { - takeFromQueue(connectionId: string, limit?: number): EncryptedMessage[] | Promise + getAvailableMessageCount(connectionId: string): number | Promise + takeFromQueue( + connectionId: string, + limit?: number, + keepMessages?: boolean + ): EncryptedMessage[] | Promise add(connectionId: string, payload: EncryptedMessage): void | Promise } diff --git a/tests/e2e-ws-pickup-v2.test.ts b/tests/e2e-ws-pickup-v2.test.ts new file mode 100644 index 0000000000..8acb7bb96b --- /dev/null +++ b/tests/e2e-ws-pickup-v2.test.ts @@ -0,0 +1,69 @@ +import { getBaseConfig } from '../packages/core/tests/helpers' + +import { e2eTest } from './e2e-test' + +import { Agent, WsOutboundTransport, AutoAcceptCredential, MediatorPickupStrategy } from '@aries-framework/core' +import { WsInboundTransport } from '@aries-framework/node' + +const recipientConfig = getBaseConfig('E2E WS Pickup V2 Recipient ', { + autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV2, +}) + +// FIXME: port numbers should not depend on availability from other test suites that use web sockets +const mediatorPort = 4100 +const mediatorConfig = getBaseConfig('E2E WS Pickup V2 Mediator', { + endpoints: [`ws://localhost:${mediatorPort}`], + autoAcceptMediationRequests: true, +}) + +const senderPort = 4101 +const senderConfig = getBaseConfig('E2E WS Pickup V2 Sender', { + endpoints: [`ws://localhost:${senderPort}`], + mediatorPollingInterval: 1000, + autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV2, +}) + +describe('E2E WS Pickup V2 tests', () => { + let recipientAgent: Agent + let mediatorAgent: Agent + let senderAgent: Agent + + beforeEach(async () => { + recipientAgent = new Agent(recipientConfig.config, recipientConfig.agentDependencies) + mediatorAgent = new Agent(mediatorConfig.config, mediatorConfig.agentDependencies) + senderAgent = new Agent(senderConfig.config, senderConfig.agentDependencies) + }) + + afterEach(async () => { + await recipientAgent.shutdown() + await recipientAgent.wallet.delete() + await mediatorAgent.shutdown() + await mediatorAgent.wallet.delete() + await senderAgent.shutdown() + await senderAgent.wallet.delete() + }) + + test('Full WS flow (connect, request mediation, issue, verify) using Message Pickup V2', async () => { + // Recipient Setup + recipientAgent.registerOutboundTransport(new WsOutboundTransport()) + await recipientAgent.initialize() + + // Mediator Setup + mediatorAgent.registerInboundTransport(new WsInboundTransport({ port: mediatorPort })) + mediatorAgent.registerOutboundTransport(new WsOutboundTransport()) + await mediatorAgent.initialize() + + // Sender Setup + senderAgent.registerInboundTransport(new WsInboundTransport({ port: senderPort })) + senderAgent.registerOutboundTransport(new WsOutboundTransport()) + await senderAgent.initialize() + + await e2eTest({ + mediatorAgent, + senderAgent, + recipientAgent, + }) + }) +})