Skip to content

Commit

Permalink
feat(routing): allow to discover mediator pickup strategy (#669)
Browse files Browse the repository at this point in the history
Signed-off-by: Timo Glastra <[email protected]>
  • Loading branch information
TimoGlastra authored Mar 28, 2022
1 parent 0dd9a5a commit 5966da1
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 9 deletions.
3 changes: 1 addition & 2 deletions packages/core/src/agent/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { AriesFrameworkError } from '../error'
import { ConsoleLogger, LogLevel } from '../logger'
import { AutoAcceptCredential } from '../modules/credentials/CredentialAutoAcceptType'
import { AutoAcceptProof } from '../modules/proofs/ProofAutoAcceptType'
import { MediatorPickupStrategy } from '../modules/routing/MediatorPickupStrategy'
import { DidCommMimeType } from '../types'

export class AgentConfig {
Expand Down Expand Up @@ -77,7 +76,7 @@ export class AgentConfig {
}

public get mediatorPickupStrategy() {
return this.initConfig.mediatorPickupStrategy ?? MediatorPickupStrategy.Explicit
return this.initConfig.mediatorPickupStrategy
}

public get endpoints(): [string, ...string[]] {
Expand Down
78 changes: 73 additions & 5 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import type { AgentMessageProcessedEvent } from '../../agent/Events'
import type { Logger } from '../../logger'
import type { OutboundWebSocketClosedEvent } from '../../transport'
import type { OutboundMessage } from '../../types'
import type { ConnectionRecord } from '../connections'
import type { MediationStateChangedEvent } from './RoutingEvents'
import type { MediationRecord } from './index'

import { firstValueFrom, interval, ReplaySubject, timer } from 'rxjs'
import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen } from 'rxjs/operators'
import { firstValueFrom, interval, of, ReplaySubject, timer } from 'rxjs'
import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen, catchError, map } from 'rxjs/operators'
import { Lifecycle, scoped } from 'tsyringe'

import { AgentConfig } from '../../agent/AgentConfig'
import { Dispatcher } from '../../agent/Dispatcher'
import { EventEmitter } from '../../agent/EventEmitter'
import { AgentEventTypes } from '../../agent/Events'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { AriesFrameworkError } from '../../error'
import { TransportEventTypes } from '../../transport'
import { parseMessageType } from '../../utils/messageType'
import { ConnectionInvitationMessage } from '../connections'
import { ConnectionService } from '../connections/services'
import { DiscloseMessage, DiscoverFeaturesModule } from '../discover-features'

import { MediatorPickupStrategy } from './MediatorPickupStrategy'
import { RoutingEventTypes } from './RoutingEvents'
Expand All @@ -26,6 +30,7 @@ import { MediationDenyHandler } from './handlers/MediationDenyHandler'
import { MediationGrantHandler } from './handlers/MediationGrantHandler'
import { BatchPickupMessage } from './messages/BatchPickupMessage'
import { MediationState } from './models/MediationState'
import { MediationRepository } from './repository'
import { MediationRecipientService } from './services/MediationRecipientService'

@scoped(Lifecycle.ContainerScoped)
Expand All @@ -36,21 +41,27 @@ export class RecipientModule {
private messageSender: MessageSender
private eventEmitter: EventEmitter
private logger: Logger
private discoverFeaturesModule: DiscoverFeaturesModule
private mediationRepository: MediationRepository

public constructor(
dispatcher: Dispatcher,
agentConfig: AgentConfig,
mediationRecipientService: MediationRecipientService,
connectionService: ConnectionService,
messageSender: MessageSender,
eventEmitter: EventEmitter
eventEmitter: EventEmitter,
discoverFeaturesModule: DiscoverFeaturesModule,
mediationRepository: MediationRepository
) {
this.agentConfig = agentConfig
this.connectionService = connectionService
this.mediationRecipientService = mediationRecipientService
this.messageSender = messageSender
this.eventEmitter = eventEmitter
this.logger = agentConfig.logger
this.discoverFeaturesModule = discoverFeaturesModule
this.mediationRepository = mediationRepository
this.registerHandlers(dispatcher)
}

Expand Down Expand Up @@ -153,8 +164,8 @@ export class RecipientModule {
}

public async initiateMessagePickup(mediator: MediationRecord) {
const { mediatorPickupStrategy, mediatorPollingInterval } = this.agentConfig

const { mediatorPollingInterval } = this.agentConfig
const mediatorPickupStrategy = await this.getPickupStrategyForMediator(mediator)
const mediatorConnection = await this.connectionService.getById(mediator.connectionId)

// Explicit means polling every X seconds with batch message
Expand All @@ -181,6 +192,63 @@ export class RecipientModule {
}
}

private async getPickupStrategyForMediator(mediator: MediationRecord) {
let mediatorPickupStrategy = mediator.pickupStrategy ?? this.agentConfig.mediatorPickupStrategy

// If mediator pickup strategy is not configured we try to query if batch pickup
// is supported through the discover features protocol
if (!mediatorPickupStrategy) {
const isBatchPickupSupported = await this.isBatchPickupSupportedByMediator(mediator)

// Use explicit pickup strategy
mediatorPickupStrategy = isBatchPickupSupported
? MediatorPickupStrategy.Explicit
: MediatorPickupStrategy.Implicit

// Store the result so it can be reused next time
mediator.pickupStrategy = mediatorPickupStrategy
await this.mediationRepository.update(mediator)
}

return mediatorPickupStrategy
}

private async isBatchPickupSupportedByMediator(mediator: MediationRecord) {
const { protocolUri } = parseMessageType(BatchPickupMessage.type)

// Listen for response to our feature query
const replaySubject = new ReplaySubject(1)
this.eventEmitter
.observable<AgentMessageProcessedEvent>(AgentEventTypes.AgentMessageProcessed)
.pipe(
// Stop when the agent shuts down
takeUntil(this.agentConfig.stop$),
// filter by mediator connection id and query disclose message type
filter(
(e) => e.payload.connection?.id === mediator.connectionId && e.payload.message.type === DiscloseMessage.type
),
// Return whether the protocol is supported
map((e) => {
const message = e.payload.message as DiscloseMessage
return message.protocols.map((p) => p.protocolId).includes(protocolUri)
}),
// TODO: make configurable
// If we don't have an answer in 7 seconds (no response, not supported, etc...) error
timeout(7000),
// We want to return false if an error occurred
catchError(() => of(false))
)
.subscribe(replaySubject)

await this.discoverFeaturesModule.queryFeatures(mediator.connectionId, {
query: protocolUri,
comment: 'Detect if batch pickup is supported to determine pickup strategy for messages',
})

const isBatchPickupSupported = await firstValueFrom(replaySubject)
return isBatchPickupSupported
}

public async discoverMediation() {
return this.mediationRecipientService.discoverMediation()
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/modules/routing/__tests__/mediation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { SubjectOutboundTransport } from '../../../../../../tests/transport/Subj
import { getBaseConfig, waitForBasicMessage } from '../../../../tests/helpers'
import { Agent } from '../../../agent/Agent'
import { ConnectionRecord } from '../../connections'
import { MediatorPickupStrategy } from '../MediatorPickupStrategy'
import { MediationState } from '../models/MediationState'

const recipientConfig = getBaseConfig('Mediation: Recipient')
Expand Down Expand Up @@ -93,6 +94,7 @@ describe('mediator establishment', () => {
expect(recipientMediatorConnection).toBeConnectedWith(mediatorRecipientConnection)

expect(recipientMediator?.state).toBe(MediationState.Granted)
expect(recipientMediator?.pickupStrategy).toBe(MediatorPickupStrategy.Explicit)

// Initialize sender agent
senderAgent = new Agent(senderConfig.config, senderConfig.agentDependencies)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { MediatorPickupStrategy } from '../MediatorPickupStrategy'
import type { MediationRole } from '../models/MediationRole'

import { AriesFrameworkError } from '../../../error'
Expand All @@ -15,6 +16,7 @@ export interface MediationRecordProps {
endpoint?: string
recipientKeys?: string[]
routingKeys?: string[]
pickupStrategy?: MediatorPickupStrategy
tags?: CustomMediationTags
}

Expand All @@ -40,6 +42,7 @@ export class MediationRecord
public endpoint?: string
public recipientKeys!: string[]
public routingKeys!: string[]
public pickupStrategy?: MediatorPickupStrategy

public static readonly type = 'MediationRecord'
public readonly type = MediationRecord.type
Expand All @@ -57,6 +60,7 @@ export class MediationRecord
this.state = props.state
this.role = props.role
this.endpoint = props.endpoint ?? undefined
this.pickupStrategy = props.pickupStrategy
}
}

Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/storage/didcomm/DidCommMessageRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { DidCommMessageRole } from './DidCommMessageRole'

import { AriesFrameworkError } from '../../error'
import { JsonTransformer } from '../../utils/JsonTransformer'
import { rightSplit } from '../../utils/string'
import { parseMessageType } from '../../utils/messageType'
import { isJsonObject } from '../../utils/type'
import { uuid } from '../../utils/uuid'
import { BaseRecord } from '../BaseRecord'
Expand Down Expand Up @@ -61,7 +61,8 @@ export class DidCommMessageRecord extends BaseRecord<DefaultDidCommMessageTags>
public getTags() {
const messageId = this.message['@id'] as string
const messageType = this.message['@type'] as string
const [, protocolName, protocolVersion, messageName] = rightSplit(messageType, '/', 3)

const { protocolName, protocolVersion, messageName } = parseMessageType(messageType)
const [versionMajor, versionMinor] = protocolVersion.split('.')

const thread = this.message['~thread']
Expand Down
21 changes: 21 additions & 0 deletions packages/core/src/utils/__tests__/messageType.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
parseMessageType,
replaceLegacyDidSovPrefix,
replaceLegacyDidSovPrefixOnMessage,
replaceNewDidCommPrefixWithLegacyDidSov,
Expand Down Expand Up @@ -81,4 +82,24 @@ describe('messageType', () => {
)
})
})

describe('parseMessageType()', () => {
test('correctly parses the message type', () => {
expect(parseMessageType('https://didcomm.org/connections/1.0/request')).toEqual({
documentUri: 'https://didcomm.org',
protocolName: 'connections',
protocolVersion: '1.0',
messageName: 'request',
protocolUri: `https://didcomm.org/connections/1.0`,
})

expect(parseMessageType('https://didcomm.org/issue-credential/1.0/propose-credential')).toEqual({
documentUri: 'https://didcomm.org',
protocolName: 'issue-credential',
protocolVersion: '1.0',
messageName: 'propose-credential',
protocolUri: `https://didcomm.org/issue-credential/1.0`,
})
})
})
})
51 changes: 51 additions & 0 deletions packages/core/src/utils/messageType.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,56 @@
import type { PlaintextMessage } from '../types'

import { rightSplit } from './string'

export interface ParsedMessageType {
/**
* Message name
*
* @example request
*/
messageName: string

/**
* Version of the protocol
*
* @example 1.0
*/
protocolVersion: string

/**
* Name of the protocol
*
* @example connections
*/
protocolName: string

/**
* Document uri of the message.
*
* @example https://didcomm.org
*/
documentUri: string

/**
* Uri identifier of the protocol. Includes the
* documentUri, protocolName and protocolVersion.
* Useful when working with feature discovery
*/
protocolUri: string
}

export function parseMessageType(messageType: string): ParsedMessageType {
const [documentUri, protocolName, protocolVersion, messageName] = rightSplit(messageType, '/', 3)

return {
documentUri,
protocolName,
protocolVersion,
messageName,
protocolUri: `${documentUri}/${protocolName}/${protocolVersion}`,
}
}

export function replaceLegacyDidSovPrefixOnMessage(message: PlaintextMessage | Record<string, unknown>) {
message['@type'] = replaceLegacyDidSovPrefix(message['@type'] as string)
}
Expand Down

0 comments on commit 5966da1

Please sign in to comment.