Skip to content

Commit

Permalink
feat: pickup v2 protocol (#711)
Browse files Browse the repository at this point in the history
Signed-off-by: KolbyRKunz <[email protected]>

BREAKING CHANGE: The mediator pickup strategy enum value `MediatorPickupStrategy.Explicit` has been renamed to `MediatorPickupStrategy.PickUpV1` to better align with the naming of the new `MediatorPickupStrategy.PickUpV2`
  • Loading branch information
KolbyRKunz authored May 2, 2022
1 parent e3e2ebf commit b281673
Show file tree
Hide file tree
Showing 27 changed files with 641 additions and 99 deletions.
4 changes: 4 additions & 0 deletions packages/core/src/agent/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ export class AgentConfig {
return this.initConfig.mediatorPickupStrategy
}

public get maximumMessagePickup() {
return this.initConfig.maximumMessagePickup ?? 10
}

public get endpoints(): [string, ...string[]] {
// if endpoints is not set, return queue endpoint
// https://github.com/hyperledger/aries-rfcs/issues/405#issuecomment-582612875
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { TransportSession } from './TransportService'
import { Lifecycle, scoped } from 'tsyringe'

import { AriesFrameworkError } from '../error'
import { ConnectionRepository } from '../modules/connections'
import { ConnectionRepository } from '../modules/connections/repository'
import { DidRepository } from '../modules/dids/repository/DidRepository'
import { ProblemReportError, ProblemReportMessage, ProblemReportReason } from '../modules/problem-reports'
import { isValidJweStructure } from '../utils/JWE'
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/agent/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { ConnectionRecord } from '../modules/connections'
import type { OutboundMessage, OutboundServiceMessage } from '../types'
import type { AgentMessage } from './AgentMessage'

import { IndyAgentService } from '../modules/dids'
import { DidCommService } from '../modules/dids/domain/service/DidCommService'

export function createOutboundMessage<T extends AgentMessage = AgentMessage>(
Expand All @@ -25,5 +26,6 @@ export function createOutboundServiceMessage<T extends AgentMessage = AgentMessa
export function isOutboundServiceMessage(
message: OutboundMessage | OutboundServiceMessage
): message is OutboundServiceMessage {
return (message as OutboundServiceMessage).service instanceof DidCommService
const service = (message as OutboundServiceMessage).service
return service instanceof IndyAgentService || service instanceof DidCommService
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,79 @@
import type { BaseMessage } from '../../agent/BaseMessage'
import type { AgentMessageProcessedEvent } from '../../agent/Events'

import { firstValueFrom, of, ReplaySubject } from 'rxjs'
import { filter, takeUntil, timeout, catchError, map } from 'rxjs/operators'
import { Lifecycle, scoped } from 'tsyringe'

import { AgentConfig } from '../../agent/AgentConfig'
import { Dispatcher } from '../../agent/Dispatcher'
import { EventEmitter } from '../../agent/EventEmitter'
import { AgentEventTypes } from '../../agent/Events'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { parseMessageType } from '../../utils/messageType'
import { ConnectionService } from '../connections/services'

import { DiscloseMessageHandler, QueryMessageHandler } from './handlers'
import { DiscloseMessage } from './messages'
import { DiscoverFeaturesService } from './services'

@scoped(Lifecycle.ContainerScoped)
export class DiscoverFeaturesModule {
private connectionService: ConnectionService
private messageSender: MessageSender
private discoverFeaturesService: DiscoverFeaturesService
private eventEmitter: EventEmitter
private agentConfig: AgentConfig

public constructor(
dispatcher: Dispatcher,
connectionService: ConnectionService,
messageSender: MessageSender,
discoverFeaturesService: DiscoverFeaturesService
discoverFeaturesService: DiscoverFeaturesService,
eventEmitter: EventEmitter,
agentConfig: AgentConfig
) {
this.connectionService = connectionService
this.messageSender = messageSender
this.discoverFeaturesService = discoverFeaturesService
this.registerHandlers(dispatcher)
this.eventEmitter = eventEmitter
this.agentConfig = agentConfig
}

public async isProtocolSupported(connectionId: string, message: BaseMessage) {
const { protocolUri } = parseMessageType(message.type)

// Listen for response to our feature query
const replaySubject = new ReplaySubject(1)
this.eventEmitter
.observable<AgentMessageProcessedEvent>(AgentEventTypes.AgentMessageProcessed)
.pipe(
// Stop when the agent shuts down
takeUntil(this.agentConfig.stop$),
// filter by connection id and query disclose message type
filter((e) => e.payload.connection?.id === connectionId && e.payload.message.type === DiscloseMessage.type),
// Return whether the protocol is supported
map((e) => {
const message = e.payload.message as DiscloseMessage
return message.protocols.map((p) => p.protocolId).includes(protocolUri)
}),
// TODO: make configurable
// If we don't have an answer in 7 seconds (no response, not supported, etc...) error
timeout(7000),
// We want to return false if an error occurred
catchError(() => of(false))
)
.subscribe(replaySubject)

await this.queryFeatures(connectionId, {
query: protocolUri,
comment: 'Detect if protocol is supported',
})

const isProtocolSupported = await firstValueFrom(replaySubject)
return isProtocolSupported
}

public async queryFeatures(connectionId: string, options: { query: string; comment?: string }) {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/modules/routing/MediatorModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Lifecycle, scoped } from 'tsyringe'
import { AgentConfig } from '../../agent/AgentConfig'
import { Dispatcher } from '../../agent/Dispatcher'
import { EventEmitter } from '../../agent/EventEmitter'
import { MessageReceiver } from '../../agent/MessageReceiver'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { ConnectionService } from '../connections/services'
Expand All @@ -29,6 +30,7 @@ export class MediatorModule {
mediationService: MediatorService,
messagePickupService: MessagePickupService,
messageSender: MessageSender,
messageReceiver: MessageReceiver,
eventEmitter: EventEmitter,
agentConfig: AgentConfig,
connectionService: ConnectionService
Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/modules/routing/MediatorPickupStrategy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
export enum MediatorPickupStrategy {
// Explicit pickup strategy means picking up messages using the pickup protocol
Explicit = 'Explicit',
PickUpV1 = 'PickUpV1',

// Supports pickup v2
PickUpV2 = 'PickUpV2',

// Implicit pickup strategy means picking up messages only using return route
// decorator. This is what ACA-Py currently uses
Expand Down
Loading

0 comments on commit b281673

Please sign in to comment.