Skip to content

Commit

Permalink
feat(routing): pickup v2 mediator role basic implementation (#975)
Browse files Browse the repository at this point in the history
Signed-off-by: Ariel Gentile <[email protected]>
  • Loading branch information
genaris authored Aug 11, 2022
1 parent 52247d9 commit a989556
Show file tree
Hide file tree
Showing 38 changed files with 822 additions and 195 deletions.
10 changes: 6 additions & 4 deletions packages/core/src/modules/routing/MediatorModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import { createOutboundMessage } from '../../agent/helpers'
import { injectable, module } from '../../plugins'
import { ConnectionService } from '../connections/services'

import { KeylistUpdateHandler, ForwardHandler, BatchPickupHandler, BatchHandler } from './handlers'
import { KeylistUpdateHandler, ForwardHandler } from './handlers'
import { MediationRequestHandler } from './handlers/MediationRequestHandler'
import { MessagePickupService, V2MessagePickupService } from './protocol'
import { MediatorService } from './services/MediatorService'
import { MessagePickupService } from './services/MessagePickupService'

@module()
@injectable()
Expand Down Expand Up @@ -64,8 +64,6 @@ export class MediatorModule {
private registerHandlers(dispatcher: Dispatcher) {
dispatcher.registerHandler(new KeylistUpdateHandler(this.mediatorService))
dispatcher.registerHandler(new ForwardHandler(this.mediatorService, this.connectionService, this.messageSender))
dispatcher.registerHandler(new BatchPickupHandler(this.messagePickupService))
dispatcher.registerHandler(new BatchHandler(this.eventEmitter))
dispatcher.registerHandler(new MediationRequestHandler(this.mediatorService, this.agentConfig))
}

Expand All @@ -79,5 +77,9 @@ export class MediatorModule {
// Services
dependencyManager.registerSingleton(MediatorService)
dependencyManager.registerSingleton(MessagePickupService)
dependencyManager.registerSingleton(V2MessagePickupService)

// FIXME: Inject in constructor
dependencyManager.resolve(V2MessagePickupService)
}
}
21 changes: 10 additions & 11 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ import { DiscoverFeaturesModule } from '../discover-features'

import { MediatorPickupStrategy } from './MediatorPickupStrategy'
import { RoutingEventTypes } from './RoutingEvents'
import { MessageDeliveryHandler, StatusHandler } from './handlers'
import { KeylistUpdateResponseHandler } from './handlers/KeylistUpdateResponseHandler'
import { MediationDenyHandler } from './handlers/MediationDenyHandler'
import { MediationGrantHandler } from './handlers/MediationGrantHandler'
import { StatusRequestMessage } from './messages'
import { BatchPickupMessage } from './messages/BatchPickupMessage'
import { MediationState } from './models/MediationState'
import { BatchPickupMessage, StatusRequestMessage } from './protocol'
import { MediationRepository, MediatorRoutingRepository } from './repository'
import { MediationRecipientService } from './services/MediationRecipientService'
import { RoutingService } from './services/RoutingService'
Expand Down Expand Up @@ -94,8 +92,8 @@ export class RecipientModule {
}
}

private async sendMessage(outboundMessage: OutboundMessage) {
const { mediatorPickupStrategy } = this.agentConfig
private async sendMessage(outboundMessage: OutboundMessage, pickupStrategy?: MediatorPickupStrategy) {
const mediatorPickupStrategy = pickupStrategy ?? this.agentConfig.mediatorPickupStrategy
const transportPriority =
mediatorPickupStrategy === MediatorPickupStrategy.Implicit
? { schemes: ['wss', 'ws'], restrictive: true }
Expand Down Expand Up @@ -264,12 +262,15 @@ export class RecipientModule {
return this.mediationRecipientService.discoverMediation()
}

public async pickupMessages(mediatorConnection: ConnectionRecord) {
public async pickupMessages(mediatorConnection: ConnectionRecord, pickupStrategy?: MediatorPickupStrategy) {
mediatorConnection.assertReady()

const batchPickupMessage = new BatchPickupMessage({ batchSize: 10 })
const outboundMessage = createOutboundMessage(mediatorConnection, batchPickupMessage)
await this.sendMessage(outboundMessage)
const pickupMessage =
pickupStrategy === MediatorPickupStrategy.PickUpV2
? new StatusRequestMessage({})
: new BatchPickupMessage({ batchSize: 10 })
const outboundMessage = createOutboundMessage(mediatorConnection, pickupMessage)
await this.sendMessage(outboundMessage, pickupStrategy)
}

public async setDefaultMediator(mediatorRecord: MediationRecord) {
Expand Down Expand Up @@ -376,8 +377,6 @@ export class RecipientModule {
dispatcher.registerHandler(new KeylistUpdateResponseHandler(this.mediationRecipientService))
dispatcher.registerHandler(new MediationGrantHandler(this.mediationRecipientService))
dispatcher.registerHandler(new MediationDenyHandler(this.mediationRecipientService))
dispatcher.registerHandler(new StatusHandler(this.mediationRecipientService))
dispatcher.registerHandler(new MessageDeliveryHandler(this.mediationRecipientService))
//dispatcher.registerHandler(new KeylistListHandler(this.mediationRecipientService)) // TODO: write this
}

Expand Down
141 changes: 141 additions & 0 deletions packages/core/src/modules/routing/__tests__/pickup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import type { SubjectMessage } from '../../../../../../tests/transport/SubjectInboundTransport'

import { Subject } from 'rxjs'

import { SubjectInboundTransport } from '../../../../../../tests/transport/SubjectInboundTransport'
import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport'
import { getBaseConfig, waitForBasicMessage } from '../../../../tests/helpers'
import { Agent } from '../../../agent/Agent'
import { ConsoleLogger, LogLevel } from '../../../logger'
import { HandshakeProtocol } from '../../connections'
import { MediatorPickupStrategy } from '../MediatorPickupStrategy'

const logger = new ConsoleLogger(LogLevel.info)
const recipientConfig = getBaseConfig('Mediation: Recipient', {
autoAcceptConnections: true,
indyLedgers: [],
logger,
})
const mediatorConfig = getBaseConfig('Mediation: Mediator', {
autoAcceptConnections: true,
endpoints: ['rxjs:mediator'],
indyLedgers: [],
logger,
})

describe('E2E Pick Up protocol', () => {
let recipientAgent: Agent
let mediatorAgent: Agent

afterEach(async () => {
await recipientAgent?.shutdown()
await recipientAgent?.wallet.delete()
await mediatorAgent?.shutdown()
await mediatorAgent?.wallet.delete()
})

test('E2E Pick Up V1 protocol', async () => {
const mediatorMessages = new Subject<SubjectMessage>()

const subjectMap = {
'rxjs:mediator': mediatorMessages,
}

// Initialize mediatorReceived message
mediatorAgent = new Agent(mediatorConfig.config, recipientConfig.agentDependencies)
mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages))
await mediatorAgent.initialize()

// Create connection to use for recipient
const mediatorOutOfBandRecord = await mediatorAgent.oob.createInvitation({
label: 'mediator invitation',
handshake: true,
handshakeProtocols: [HandshakeProtocol.DidExchange],
})

// Initialize recipient
recipientAgent = new Agent(recipientConfig.config, recipientConfig.agentDependencies)
recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
await recipientAgent.initialize()

// Connect
const mediatorInvitation = mediatorOutOfBandRecord.outOfBandInvitation

let { connectionRecord: recipientMediatorConnection } = await recipientAgent.oob.receiveInvitationFromUrl(
mediatorInvitation.toUrl({ domain: 'https://example.com/ssi' })
)

recipientMediatorConnection = await recipientAgent.connections.returnWhenIsConnected(
recipientMediatorConnection!.id
)

let [mediatorRecipientConnection] = await mediatorAgent.connections.findAllByOutOfBandId(mediatorOutOfBandRecord.id)

mediatorRecipientConnection = await mediatorAgent.connections.returnWhenIsConnected(mediatorRecipientConnection!.id)

const message = 'hello pickup V1'
await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message)

await recipientAgent.mediationRecipient.pickupMessages(recipientMediatorConnection)

const basicMessage = await waitForBasicMessage(recipientAgent, {
content: message,
})

expect(basicMessage.content).toBe(message)
})

test('E2E Pick Up V2 protocol', async () => {
const mediatorMessages = new Subject<SubjectMessage>()

const subjectMap = {
'rxjs:mediator': mediatorMessages,
}

// Initialize mediatorReceived message
mediatorAgent = new Agent(mediatorConfig.config, recipientConfig.agentDependencies)
mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages))
await mediatorAgent.initialize()

// Create connection to use for recipient
const mediatorOutOfBandRecord = await mediatorAgent.oob.createInvitation({
label: 'mediator invitation',
handshake: true,
handshakeProtocols: [HandshakeProtocol.DidExchange],
})

// Initialize recipient
recipientAgent = new Agent(recipientConfig.config, recipientConfig.agentDependencies)
recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
await recipientAgent.initialize()

// Connect
const mediatorInvitation = mediatorOutOfBandRecord.outOfBandInvitation

let { connectionRecord: recipientMediatorConnection } = await recipientAgent.oob.receiveInvitationFromUrl(
mediatorInvitation.toUrl({ domain: 'https://example.com/ssi' })
)

recipientMediatorConnection = await recipientAgent.connections.returnWhenIsConnected(
recipientMediatorConnection!.id
)

let [mediatorRecipientConnection] = await mediatorAgent.connections.findAllByOutOfBandId(mediatorOutOfBandRecord.id)

mediatorRecipientConnection = await mediatorAgent.connections.returnWhenIsConnected(mediatorRecipientConnection!.id)

const message = 'hello pickup V2'
await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message)

await recipientAgent.mediationRecipient.pickupMessages(recipientMediatorConnection, MediatorPickupStrategy.PickUpV2)

const basicMessage = await waitForBasicMessage(recipientAgent, {
content: message,
})

expect(basicMessage.content).toBe(message)
})
})
7 changes: 3 additions & 4 deletions packages/core/src/modules/routing/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
export * from './ForwardHandler'
export * from './KeylistUpdateHandler'
export * from './BatchHandler'
export * from './BatchPickupHandler'
export * from './KeylistUpdateResponseHandler'
export * from './StatusHandler'
export * from './MessageDeliveryHandler'
export * from './MediationDenyHandler'
export * from './MediationGrantHandler'
export * from './MediationRequestHandler'
1 change: 1 addition & 0 deletions packages/core/src/modules/routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './messages'
export * from './services'
export * from './protocol'
export * from './repository'
export * from './models'
export * from './RoutingEvents'
Expand Down
7 changes: 0 additions & 7 deletions packages/core/src/modules/routing/messages/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
export * from './BatchMessage'
export * from './BatchPickupMessage'
export * from './ForwardMessage'
export * from './KeylistUpdateMessage'
export * from './KeylistUpdateResponseMessage'
export * from './MediationGrantMessage'
export * from './MediationDenyMessage'
export * from './MediationRequestMessage'
export * from './DeliveryRequestMessage'
export * from './StatusMessage'
export * from './StatusRequestMessage'
export * from './MessageDeliveryMessage'
export * from './MessagesReceivedMessage'
1 change: 1 addition & 0 deletions packages/core/src/modules/routing/protocol/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './pickup'
2 changes: 2 additions & 0 deletions packages/core/src/modules/routing/protocol/pickup/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './v1'
export * from './v2'
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext'
import type { EncryptedMessage } from '../../../../../types'
import type { BatchPickupMessage } from './messages'

import { Dispatcher } from '../../../../../agent/Dispatcher'
import { EventEmitter } from '../../../../../agent/EventEmitter'
import { createOutboundMessage } from '../../../../../agent/helpers'
import { InjectionSymbols } from '../../../../../constants'
import { inject, injectable } from '../../../../../plugins'
import { MessageRepository } from '../../../../../storage/MessageRepository'

import { BatchHandler, BatchPickupHandler } from './handlers'
import { BatchMessage, BatchMessageMessage } from './messages'

@injectable()
export class MessagePickupService {
private messageRepository: MessageRepository
private dispatcher: Dispatcher
private eventEmitter: EventEmitter

public constructor(
@inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository,
dispatcher: Dispatcher,
eventEmitter: EventEmitter
) {
this.messageRepository = messageRepository
this.dispatcher = dispatcher
this.eventEmitter = eventEmitter

this.registerHandlers()
}

public async batch(messageContext: InboundMessageContext<BatchPickupMessage>) {
// Assert ready connection
const connection = messageContext.assertReadyConnection()

const { message } = messageContext
const messages = await this.messageRepository.takeFromQueue(connection.id, message.batchSize)

// TODO: each message should be stored with an id. to be able to conform to the id property
// of batch message
const batchMessages = messages.map(
(msg) =>
new BatchMessageMessage({
message: msg,
})
)

const batchMessage = new BatchMessage({
messages: batchMessages,
})

return createOutboundMessage(connection, batchMessage)
}

public async queueMessage(connectionId: string, message: EncryptedMessage) {
await this.messageRepository.add(connectionId, message)
}

protected registerHandlers() {
this.dispatcher.registerHandler(new BatchPickupHandler(this))
this.dispatcher.registerHandler(new BatchHandler(this.eventEmitter))
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { EventEmitter } from '../../../agent/EventEmitter'
import type { AgentMessageReceivedEvent } from '../../../agent/Events'
import type { Handler, HandlerInboundMessage } from '../../../agent/Handler'
import type { EventEmitter } from '../../../../../../agent/EventEmitter'
import type { AgentMessageReceivedEvent } from '../../../../../../agent/Events'
import type { Handler, HandlerInboundMessage } from '../../../../../../agent/Handler'

import { AgentEventTypes } from '../../../agent/Events'
import { AriesFrameworkError } from '../../../error'
import { AgentEventTypes } from '../../../../../../agent/Events'
import { AriesFrameworkError } from '../../../../../../error'
import { BatchMessage } from '../messages'

export class BatchHandler implements Handler {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Handler, HandlerInboundMessage } from '../../../agent/Handler'
import type { MessagePickupService } from '../services'
import type { Handler, HandlerInboundMessage } from '../../../../../../agent/Handler'
import type { MessagePickupService } from '../MessagePickupService'

import { AriesFrameworkError } from '../../../error'
import { AriesFrameworkError } from '../../../../../../error'
import { BatchPickupMessage } from '../messages'

export class BatchPickupHandler implements Handler {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './BatchHandler'
export * from './BatchPickupHandler'
2 changes: 2 additions & 0 deletions packages/core/src/modules/routing/protocol/pickup/v1/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './MessagePickupService'
export * from './messages'
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Type, Expose } from 'class-transformer'
import { Matches, IsArray, ValidateNested, IsObject, IsInstance } from 'class-validator'

import { AgentMessage } from '../../../agent/AgentMessage'
import { MessageIdRegExp } from '../../../agent/BaseMessage'
import { EncryptedMessage } from '../../../types'
import { IsValidMessageType, parseMessageType } from '../../../utils/messageType'
import { uuid } from '../../../utils/uuid'
import { AgentMessage } from '../../../../../../agent/AgentMessage'
import { MessageIdRegExp } from '../../../../../../agent/BaseMessage'
import { EncryptedMessage } from '../../../../../../types'
import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType'
import { uuid } from '../../../../../../utils/uuid'

export class BatchMessageMessage {
public constructor(options: { id?: string; message: EncryptedMessage }) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Expose } from 'class-transformer'
import { IsInt } from 'class-validator'

import { AgentMessage } from '../../../agent/AgentMessage'
import { IsValidMessageType, parseMessageType } from '../../../utils/messageType'
import { AgentMessage } from '../../../../../../agent/AgentMessage'
import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType'

export interface BatchPickupMessageOptions {
id?: string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './BatchMessage'
export * from './BatchPickupMessage'
Loading

0 comments on commit a989556

Please sign in to comment.