Skip to content

Commit

Permalink
feat(mesage-pickup): option for awaiting completion (#1755)
Browse files Browse the repository at this point in the history
  • Loading branch information
genaris authored Feb 13, 2024
1 parent 2cb9ba8 commit faa390f
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 36 deletions.
63 changes: 51 additions & 12 deletions packages/core/src/modules/message-pickup/MessagePickupApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import type {
DeliverMessagesReturnType,
DeliverMessagesFromQueueReturnType,
} from './MessagePickupApiOptions'
import type { MessagePickupCompletedEvent } from './MessagePickupEvents'
import type { MessagePickupSession, MessagePickupSessionRole } from './MessagePickupSession'
import type { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol'
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol'
import type { MessagePickupRepository } from './storage/MessagePickupRepository'

import { ReplaySubject, Subject, filter, firstValueFrom, takeUntil, timeout } from 'rxjs'

import { AgentContext } from '../../agent'
import { EventEmitter } from '../../agent/EventEmitter'
import { MessageSender } from '../../agent/MessageSender'
import { OutboundMessageContext } from '../../agent/models'
import { InjectionSymbols } from '../../constants'
Expand All @@ -24,6 +28,7 @@ import { Logger } from '../../logger/Logger'
import { inject, injectable } from '../../plugins'
import { ConnectionService } from '../connections/services'

import { MessagePickupEventTypes } from './MessagePickupEvents'
import { MessagePickupModuleConfig } from './MessagePickupModuleConfig'
import { MessagePickupSessionService } from './services/MessagePickupSessionService'

Expand All @@ -47,23 +52,29 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP

private messageSender: MessageSender
private agentContext: AgentContext
private eventEmitter: EventEmitter
private connectionService: ConnectionService
private messagePickupSessionService: MessagePickupSessionService
private logger: Logger
private stop$: Subject<boolean>

public constructor(
messageSender: MessageSender,
agentContext: AgentContext,
connectionService: ConnectionService,
eventEmitter: EventEmitter,
messagePickupSessionService: MessagePickupSessionService,
config: MessagePickupModuleConfig<MPPs>,
@inject(InjectionSymbols.Stop$) stop$: Subject<boolean>,
@inject(InjectionSymbols.Logger) logger: Logger
) {
this.messageSender = messageSender
this.connectionService = connectionService
this.agentContext = agentContext
this.eventEmitter = eventEmitter
this.config = config
this.messagePickupSessionService = messagePickupSessionService
this.stop$ = stop$
this.logger = logger
}

Expand Down Expand Up @@ -123,9 +134,9 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP
const session = this.messagePickupSessionService.getLiveSession(this.agentContext, pickupSessionId)

if (!session) {
this.logger.debug(`No active live mode session found with id ${pickupSessionId}`)
return
throw new CredoError(`No active live mode session found with id ${pickupSessionId}`)
}

const connectionRecord = await this.connectionService.getById(this.agentContext, session.connectionId)

const protocol = this.getProtocol(session.protocolVersion)
Expand Down Expand Up @@ -154,7 +165,7 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP
*
*/
public async deliverMessagesFromQueue(options: DeliverMessagesFromQueueOptions) {
this.logger.debug('Deliverying queued messages')
this.logger.debug('Delivering queued messages')

const { pickupSessionId, recipientDid: recipientKey, batchSize } = options

Expand Down Expand Up @@ -187,7 +198,11 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP
* Pickup queued messages from a message holder. It attempts to retrieve all current messages from the
* queue, receiving up to `batchSize` messages per batch retrieval.
*
* @param options connectionId, protocol version to use and batch size
* By default, this method only waits until the initial pick-up request is sent. Use `options.awaitCompletion`
* if you want to wait until all messages are effectively retrieved.
*
* @param options connectionId, protocol version to use and batch size, awaitCompletion,
* awaitCompletionTimeoutMs
*/
public async pickupMessages(options: PickupMessagesOptions<MPPs>): Promise<PickupMessagesReturnType> {
const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId)
Expand All @@ -199,17 +214,41 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP
recipientDid: options.recipientDid,
})

await this.messageSender.sendMessage(
new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection: connectionRecord,
})
)
const outboundMessageContext = new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection: connectionRecord,
})

const replaySubject = new ReplaySubject(1)

if (options.awaitCompletion) {
// Listen for response to our feature query
this.eventEmitter
.observable<MessagePickupCompletedEvent>(MessagePickupEventTypes.MessagePickupCompleted)
.pipe(
// Stop when the agent shuts down
takeUntil(this.stop$),
// filter by connection id
filter((e) => e.payload.connection.id === connectionRecord.id),
// If we don't receive all messages within timeoutMs miliseconds (no response, not supported, etc...) error
timeout({
first: options.awaitCompletionTimeoutMs ?? 10000,
meta: 'MessagePickupApi.pickupMessages',
})
)
.subscribe(replaySubject)
}

await this.messageSender.sendMessage(outboundMessageContext)

if (options.awaitCompletion) {
await firstValueFrom(replaySubject)
}
}

/**
* Enable or disable Live Delivery mode as a recipient. If there were previous queued messages, it will pick-up them
* automatically.
* Enable or disable Live Delivery mode as a recipient. Depending on the message pickup protocol used,
* after receiving a response from the mediator the agent might retrieve any pending message.
*
* @param options connectionId, protocol version to use and boolean to enable/disable Live Mode
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export interface PickupMessagesOptions<MPPs extends MessagePickupProtocol[] = Me
protocolVersion: MessagePickupProtocolVersionType<MPPs>
recipientDid?: string
batchSize?: number
awaitCompletion?: boolean
awaitCompletionTimeoutMs?: number
}

export interface SetLiveDeliveryModeOptions<MPPs extends MessagePickupProtocol[] = MessagePickupProtocol[]> {
Expand Down
10 changes: 10 additions & 0 deletions packages/core/src/modules/message-pickup/MessagePickupEvents.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { MessagePickupSession } from './MessagePickupSession'
import type { BaseEvent } from '../../agent/Events'
import type { ConnectionRecord } from '../connections'

export enum MessagePickupEventTypes {
LiveSessionSaved = 'LiveSessionSaved',
LiveSessionRemoved = 'LiveSessionRemoved',
MessagePickupCompleted = 'MessagePickupCompleted',
}

export interface MessagePickupLiveSessionSavedEvent extends BaseEvent {
Expand All @@ -19,3 +21,11 @@ export interface MessagePickupLiveSessionRemovedEvent extends BaseEvent {
session: MessagePickupSession
}
}

export interface MessagePickupCompletedEvent extends BaseEvent {
type: typeof MessagePickupEventTypes.MessagePickupCompleted
payload: {
connection: ConnectionRecord
threadId?: string
}
}
127 changes: 126 additions & 1 deletion packages/core/src/modules/message-pickup/__tests__/pickup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ describe('E2E Pick Up protocol', () => {
let mediatorAgent: Agent

afterEach(async () => {
await recipientAgent.mediationRecipient.stopMessagePickup()

await recipientAgent.shutdown()
await recipientAgent.wallet.delete()
await mediatorAgent.shutdown()
Expand Down Expand Up @@ -106,6 +108,66 @@ describe('E2E Pick Up protocol', () => {
expect(basicMessage.content).toBe(message)
})

test('E2E manual Pick Up V1 loop - waiting for completion', async () => {
const mediatorMessages = new Subject<SubjectMessage>()

const subjectMap = {
'wss://mediator': mediatorMessages,
}

// Initialize mediatorReceived message
mediatorAgent = new Agent(mediatorOptions)
mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages))
await mediatorAgent.initialize()

// Create connection to use for recipient
const mediatorOutOfBandRecord = await mediatorAgent.oob.createInvitation({
label: 'mediator invitation',
handshake: true,
handshakeProtocols: [HandshakeProtocol.DidExchange],
})

// Initialize recipient
recipientAgent = new Agent(recipientOptions)
recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
await recipientAgent.initialize()

// Connect
const mediatorInvitation = mediatorOutOfBandRecord.outOfBandInvitation

let { connectionRecord: recipientMediatorConnection } = await recipientAgent.oob.receiveInvitationFromUrl(
mediatorInvitation.toUrl({ domain: 'https://example.com/ssi' })
)

recipientMediatorConnection = await recipientAgent.connections.returnWhenIsConnected(
recipientMediatorConnection!.id
)

let [mediatorRecipientConnection] = await mediatorAgent.connections.findAllByOutOfBandId(mediatorOutOfBandRecord.id)

mediatorRecipientConnection = await mediatorAgent.connections.returnWhenIsConnected(mediatorRecipientConnection!.id)

// Now they are connected, reinitialize recipient agent in order to lose the session (as with SubjectTransport it remains open)
await recipientAgent.shutdown()
await recipientAgent.initialize()

const message = 'hello pickup V1'
await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message)

const basicMessagePromise = waitForBasicMessage(recipientAgent, {
content: message,
})
await recipientAgent.messagePickup.pickupMessages({
connectionId: recipientMediatorConnection.id,
protocolVersion: 'v1',
awaitCompletion: true,
})

const basicMessage = await basicMessagePromise
expect(basicMessage.content).toBe(message)
})

test('E2E manual Pick Up V2 loop', async () => {
const mediatorMessages = new Subject<SubjectMessage>()

Expand Down Expand Up @@ -185,7 +247,70 @@ describe('E2E Pick Up protocol', () => {
})

expect((secondStatusMessage as V2StatusMessage).messageCount).toBe(0)
})

await recipientAgent.mediationRecipient.stopMessagePickup()
test('E2E manual Pick Up V2 loop - waiting for completion', async () => {
const mediatorMessages = new Subject<SubjectMessage>()

// FIXME: we harcoded that pickup of messages MUST be using ws(s) scheme when doing implicit pickup
// For liver delivery we need a duplex transport. however that means we can't test it with the subject transport. Using wss here to 'hack' this. We should
// extend the API to allow custom schemes (or maybe add a `supportsDuplex` transport / `supportMultiReturnMessages`)
// For pickup v2 pickup message (which we're testing here) we could just as well use `http` as it is just request/response.
const subjectMap = {
'wss://mediator': mediatorMessages,
}

// Initialize mediatorReceived message
mediatorAgent = new Agent(mediatorOptions)
mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages))
await mediatorAgent.initialize()

// Create connection to use for recipient
const mediatorOutOfBandRecord = await mediatorAgent.oob.createInvitation({
label: 'mediator invitation',
handshake: true,
handshakeProtocols: [HandshakeProtocol.DidExchange],
})

// Initialize recipient
recipientAgent = new Agent(recipientOptions)
recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
await recipientAgent.initialize()

// Connect
const mediatorInvitation = mediatorOutOfBandRecord.outOfBandInvitation

let { connectionRecord: recipientMediatorConnection } = await recipientAgent.oob.receiveInvitationFromUrl(
mediatorInvitation.toUrl({ domain: 'https://example.com/ssi' })
)

recipientMediatorConnection = await recipientAgent.connections.returnWhenIsConnected(
recipientMediatorConnection!.id
)

let [mediatorRecipientConnection] = await mediatorAgent.connections.findAllByOutOfBandId(mediatorOutOfBandRecord.id)

mediatorRecipientConnection = await mediatorAgent.connections.returnWhenIsConnected(mediatorRecipientConnection!.id)

// Now they are connected, reinitialize recipient agent in order to lose the session (as with SubjectTransport it remains open)
await recipientAgent.shutdown()
await recipientAgent.initialize()

const message = 'hello pickup V2'

await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message)

const basicMessagePromise = waitForBasicMessage(recipientAgent, {
content: message,
})
await recipientAgent.messagePickup.pickupMessages({
connectionId: recipientMediatorConnection.id,
protocolVersion: 'v2',
awaitCompletion: true,
})

const basicMessage = await basicMessagePromise
expect(basicMessage.content).toBe(message)
})
})
Loading

0 comments on commit faa390f

Please sign in to comment.