Skip to content

Commit

Permalink
feat(routing): manual mediator pickup lifecycle management (openwalle…
Browse files Browse the repository at this point in the history
…t-foundation#989)

Signed-off-by: Ariel Gentile <[email protected]>
  • Loading branch information
genaris committed Sep 16, 2022
1 parent 21c63ba commit c63e6a8
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 51 deletions.
90 changes: 62 additions & 28 deletions packages/core/src/modules/routing/RecipientApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { MediationStateChangedEvent } from './RoutingEvents'
import type { MediationRecord } from './repository'
import type { GetRoutingOptions } from './services/RoutingService'

import { firstValueFrom, interval, ReplaySubject, Subject, timer } from 'rxjs'
import { firstValueFrom, interval, merge, ReplaySubject, Subject, timer } from 'rxjs'
import { delayWhen, filter, first, takeUntil, tap, throttleTime, timeout } from 'rxjs/operators'

import { AgentContext } from '../../agent'
Expand Down Expand Up @@ -52,6 +52,9 @@ export class RecipientApi {
private agentContext: AgentContext
private stop$: Subject<boolean>

// stopMessagePickup$ is used for stop message pickup signal
private readonly stopMessagePickup$ = new Subject<boolean>()

public constructor(
dispatcher: Dispatcher,
mediationRecipientService: MediationRecipientService,
Expand Down Expand Up @@ -159,11 +162,13 @@ export class RecipientApi {
// in a recursive back off strategy if it matches the following criteria:
// - Agent is not shutdown
// - Socket was for current mediator connection id

const stopConditions$ = merge(this.stop$, this.stopMessagePickup$).pipe()
this.eventEmitter
.observable<OutboundWebSocketClosedEvent>(TransportEventTypes.OutboundWebSocketClosedEvent)
.pipe(
// Stop when the agent shuts down
takeUntil(this.stop$),
takeUntil(stopConditions$),
filter((e) => e.payload.connectionId === mediator.connectionId),
// Make sure we're not reconnecting multiple times
throttleTime(interval),
Expand All @@ -172,20 +177,23 @@ export class RecipientApi {
// Wait for interval time before reconnecting
delayWhen(() => timer(interval))
)
.subscribe(async () => {
this.logger.debug(
`Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...`
)
try {
if (pickupStrategy === MediatorPickupStrategy.PickUpV2) {
// Start Pickup v2 protocol to receive messages received while websocket offline
await this.sendStatusRequest({ mediatorId: mediator.id })
} else {
await this.openMediationWebSocket(mediator)
.subscribe({
next: async () => {
this.logger.debug(
`Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...`
)
try {
if (pickupStrategy === MediatorPickupStrategy.PickUpV2) {
// Start Pickup v2 protocol to receive messages received while websocket offline
await this.sendStatusRequest({ mediatorId: mediator.id })
} else {
await this.openMediationWebSocket(mediator)
}
} catch (error) {
this.logger.warn('Unable to re-open websocket connection to mediator', { error })
}
} catch (error) {
this.logger.warn('Unable to re-open websocket connection to mediator', { error })
}
},
complete: () => this.logger.info(`Stopping pickup of messages from mediator '${mediator.id}'`),
})
try {
if (pickupStrategy === MediatorPickupStrategy.Implicit) {
Expand All @@ -196,38 +204,64 @@ export class RecipientApi {
}
}

public async initiateMessagePickup(mediator: MediationRecord) {
/**
* Start a Message Pickup flow with a registered Mediator.
*
* @param mediator optional {MediationRecord} corresponding to the mediator to pick messages from. It will use
* default mediator otherwise
* @param pickupStrategy optional {MediatorPickupStrategy} to use in the loop. It will use Agent's default
* strategy or attempt to find it by Discover Features otherwise
* @returns
*/
public async initiateMessagePickup(mediator?: MediationRecord, pickupStrategy?: MediatorPickupStrategy) {
const { mediatorPollingInterval } = this.config
const mediatorPickupStrategy = await this.getPickupStrategyForMediator(mediator)
const mediatorConnection = await this.connectionService.getById(this.agentContext, mediator.connectionId)

const mediatorRecord = mediator ?? (await this.findDefaultMediator())
if (!mediatorRecord) {
throw new AriesFrameworkError('There is no mediator to pickup messages from')
}

const mediatorPickupStrategy = pickupStrategy ?? (await this.getPickupStrategyForMediator(mediatorRecord))
const mediatorConnection = await this.connectionService.getById(this.agentContext, mediatorRecord.connectionId)

switch (mediatorPickupStrategy) {
case MediatorPickupStrategy.PickUpV2:
this.logger.info(`Starting pickup of messages from mediator '${mediator.id}'`)
await this.openWebSocketAndPickUp(mediator, mediatorPickupStrategy)
await this.sendStatusRequest({ mediatorId: mediator.id })
this.logger.info(`Starting pickup of messages from mediator '${mediatorRecord.id}'`)
await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy)
await this.sendStatusRequest({ mediatorId: mediatorRecord.id })
break
case MediatorPickupStrategy.PickUpV1: {
const stopConditions$ = merge(this.stop$, this.stopMessagePickup$).pipe()
// Explicit means polling every X seconds with batch message
this.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediator.id}'`)
this.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediatorRecord.id}'`)
const subscription = interval(mediatorPollingInterval)
.pipe(takeUntil(this.stop$))
.subscribe(async () => {
await this.pickupMessages(mediatorConnection)
.pipe(takeUntil(stopConditions$))
.subscribe({
next: async () => {
await this.pickupMessages(mediatorConnection)
},
complete: () => this.logger.info(`Stopping pickup of messages from mediator '${mediatorRecord.id}'`),
})
return subscription
}
case MediatorPickupStrategy.Implicit:
// Implicit means sending ping once and keeping connection open. This requires a long-lived transport
// such as WebSockets to work
this.logger.info(`Starting implicit pickup of messages from mediator '${mediator.id}'`)
await this.openWebSocketAndPickUp(mediator, mediatorPickupStrategy)
this.logger.info(`Starting implicit pickup of messages from mediator '${mediatorRecord.id}'`)
await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy)
break
default:
this.logger.info(`Skipping pickup of messages from mediator '${mediator.id}' due to pickup strategy none`)
this.logger.info(`Skipping pickup of messages from mediator '${mediatorRecord.id}' due to pickup strategy none`)
}
}

/**
* Terminate all ongoing Message Pickup loops
*/
public async stopMessagePickup() {
this.stopMessagePickup$.next(true)
}

private async sendStatusRequest(config: { mediatorId: string; recipientKey?: string }) {
const mediationRecord = await this.mediationRecipientService.getById(this.agentContext, config.mediatorId)

Expand Down
9 changes: 0 additions & 9 deletions packages/core/src/modules/routing/__tests__/mediation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import { SubjectInboundTransport } from '../../../../../../tests/transport/Subje
import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport'
import { getAgentOptions, waitForBasicMessage } from '../../../../tests/helpers'
import { Agent } from '../../../agent/Agent'
import { InjectionSymbols } from '../../../constants'
import { sleep } from '../../../utils/sleep'
import { ConnectionRecord, HandshakeProtocol } from '../../connections'
import { MediatorPickupStrategy } from '../MediatorPickupStrategy'
import { MediationState } from '../models/MediationState'
Expand All @@ -33,13 +31,6 @@ describe('mediator establishment', () => {
let senderAgent: Agent

afterEach(async () => {
// We want to stop the mediator polling before the agent is shutdown.
// FIXME: add a way to stop mediator polling from the public api, and make sure this is
// being handled in the agent shutdown so we don't get any errors with wallets being closed.
const stop$ = recipientAgent.injectionContainer.resolve<Subject<boolean>>(InjectionSymbols.Stop$)
stop$.next(true)
await sleep(1000)

await recipientAgent?.shutdown()
await recipientAgent?.wallet.delete()
await mediatorAgent?.shutdown()
Expand Down
12 changes: 2 additions & 10 deletions packages/core/tests/v1-connectionless-proofs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { Subject, ReplaySubject } from 'rxjs'

import { SubjectInboundTransport } from '../../../tests/transport/SubjectInboundTransport'
import { SubjectOutboundTransport } from '../../../tests/transport/SubjectOutboundTransport'
import { InjectionSymbols } from '../src'
import { Agent } from '../src/agent/Agent'
import { Attachment, AttachmentData } from '../src/decorators/attachment/Attachment'
import { HandshakeProtocol } from '../src/modules/connections'
Expand All @@ -25,7 +24,6 @@ import {
} from '../src/modules/proofs'
import { MediatorPickupStrategy } from '../src/modules/routing'
import { LinkedAttachment } from '../src/utils/LinkedAttachment'
import { sleep } from '../src/utils/sleep'
import { uuid } from '../src/utils/uuid'

import {
Expand Down Expand Up @@ -383,13 +381,7 @@ describe('Present Proof', () => {

await faberProofRecordPromise

// We want to stop the mediator polling before the agent is shutdown.
// FIXME: add a way to stop mediator polling from the public api, and make sure this is
// being handled in the agent shutdown so we don't get any errors with wallets being closed.
const faberStop$ = faberAgent.injectionContainer.resolve<Subject<boolean>>(InjectionSymbols.Stop$)
const aliceStop$ = aliceAgent.injectionContainer.resolve<Subject<boolean>>(InjectionSymbols.Stop$)
faberStop$.next(true)
aliceStop$.next(true)
await sleep(2000)
await aliceAgent.mediationRecipient.stopMessagePickup()
await faberAgent.mediationRecipient.stopMessagePickup()
})
})
5 changes: 1 addition & 4 deletions tests/e2e-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ export async function e2eTest({
expect(verifierProof.state).toBe(ProofState.Done)

// We want to stop the mediator polling before the agent is shutdown.
// FIXME: add a way to stop mediator polling from the public api, and make sure this is
// being handled in the agent shutdown so we don't get any errors with wallets being closed.
const recipientStop$ = recipientAgent.injectionContainer.resolve<Subject<boolean>>(InjectionSymbols.Stop$)
recipientStop$.next(true)
await recipientAgent.mediationRecipient.stopMessagePickup()
await sleep(2000)
}

0 comments on commit c63e6a8

Please sign in to comment.