Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(routing): manual mediator pickup lifecycle management #989

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 66 additions & 29 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { MediationStateChangedEvent } from './RoutingEvents'
import type { MediationRecord } from './index'
import type { GetRoutingOptions } from './services/RoutingService'

import { firstValueFrom, interval, ReplaySubject, timer } from 'rxjs'
import { firstValueFrom, interval, merge, ReplaySubject, Subject, timer } from 'rxjs'
import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen } from 'rxjs/operators'

import { AgentConfig } from '../../agent/AgentConfig'
Expand Down Expand Up @@ -47,6 +47,9 @@ export class RecipientModule {
private mediationRepository: MediationRepository
private routingService: RoutingService

// stopMessagePickup$ is used for stop message pickup signal
private readonly stopMessagePickup$ = new Subject<boolean>()

public constructor(
dispatcher: Dispatcher,
agentConfig: AgentConfig,
Expand Down Expand Up @@ -144,11 +147,13 @@ export class RecipientModule {
// in a recursive back off strategy if it matches the following criteria:
// - Agent is not shutdown
// - Socket was for current mediator connection id

const stopConditions$ = merge(this.agentConfig.stop$, this.stopMessagePickup$).pipe()
this.eventEmitter
.observable<OutboundWebSocketClosedEvent>(TransportEventTypes.OutboundWebSocketClosedEvent)
.pipe(
// Stop when the agent shuts down
takeUntil(this.agentConfig.stop$),
// Stop when the agent shuts down or stop message pickup signal is received
takeUntil(stopConditions$),
filter((e) => e.payload.connectionId === mediator.connectionId),
// Make sure we're not reconnecting multiple times
throttleTime(interval),
Expand All @@ -157,20 +162,23 @@ export class RecipientModule {
// Wait for interval time before reconnecting
delayWhen(() => timer(interval))
)
.subscribe(async () => {
this.logger.debug(
`Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...`
)
try {
if (pickupStrategy === MediatorPickupStrategy.PickUpV2) {
// Start Pickup v2 protocol to receive messages received while websocket offline
await this.sendStatusRequest({ mediatorId: mediator.id })
} else {
await this.openMediationWebSocket(mediator)
.subscribe({
next: async () => {
this.logger.debug(
`Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...`
)
try {
if (pickupStrategy === MediatorPickupStrategy.PickUpV2) {
// Start Pickup v2 protocol to receive messages received while websocket offline
await this.sendStatusRequest({ mediatorId: mediator.id })
} else {
await this.openMediationWebSocket(mediator)
}
} catch (error) {
this.logger.warn('Unable to re-open websocket connection to mediator', { error })
}
} catch (error) {
this.logger.warn('Unable to re-open websocket connection to mediator', { error })
}
},
complete: () => this.agentConfig.logger.info(`Stopping pickup of messages from mediator '${mediator.id}'`),
})
try {
if (pickupStrategy === MediatorPickupStrategy.Implicit) {
Expand All @@ -181,40 +189,69 @@ export class RecipientModule {
}
}

public async initiateMessagePickup(mediator: MediationRecord) {
/**
* Start a Message Pickup flow with a registered Mediator.
*
* @param mediator optional {MediationRecord} corresponding to the mediator to pick messages from. It will use
* default mediator otherwise
* @param pickupStrategy optional {MediatorPickupStrategy} to use in the loop. It will use Agent's default
* strategy or attempt to find it by Discover Features otherwise
* @returns
*/
public async initiateMessagePickup(mediator?: MediationRecord, pickupStrategy?: MediatorPickupStrategy) {
const { mediatorPollingInterval } = this.agentConfig
const mediatorPickupStrategy = await this.getPickupStrategyForMediator(mediator)
const mediatorConnection = await this.connectionService.getById(mediator.connectionId)

const mediatorRecord = mediator ?? (await this.findDefaultMediator())
if (!mediatorRecord) {
throw new AriesFrameworkError('There is no mediator to pickup messages from')
}

const mediatorPickupStrategy = pickupStrategy ?? (await this.getPickupStrategyForMediator(mediatorRecord))
const mediatorConnection = await this.connectionService.getById(mediatorRecord.connectionId)

switch (mediatorPickupStrategy) {
case MediatorPickupStrategy.PickUpV2:
this.agentConfig.logger.info(`Starting pickup of messages from mediator '${mediator.id}'`)
await this.openWebSocketAndPickUp(mediator, mediatorPickupStrategy)
await this.sendStatusRequest({ mediatorId: mediator.id })
this.agentConfig.logger.info(`Starting pickup of messages from mediator '${mediatorRecord.id}'`)
await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy)
await this.sendStatusRequest({ mediatorId: mediatorRecord.id })
break
case MediatorPickupStrategy.PickUpV1: {
const stopConditions$ = merge(this.agentConfig.stop$, this.stopMessagePickup$).pipe()
// Explicit means polling every X seconds with batch message
this.agentConfig.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediator.id}'`)
this.agentConfig.logger.info(
`Starting explicit (batch) pickup of messages from mediator '${mediatorRecord.id}'`
)
const subscription = interval(mediatorPollingInterval)
.pipe(takeUntil(this.agentConfig.stop$))
.subscribe(async () => {
await this.pickupMessages(mediatorConnection)
.pipe(takeUntil(stopConditions$))
.subscribe({
next: async () => {
await this.pickupMessages(mediatorConnection)
},
complete: () =>
this.agentConfig.logger.info(`Stopping pickup of messages from mediator '${mediatorRecord.id}'`),
})
return subscription
}
case MediatorPickupStrategy.Implicit:
// Implicit means sending ping once and keeping connection open. This requires a long-lived transport
// such as WebSockets to work
this.agentConfig.logger.info(`Starting implicit pickup of messages from mediator '${mediator.id}'`)
await this.openWebSocketAndPickUp(mediator, mediatorPickupStrategy)
this.agentConfig.logger.info(`Starting implicit pickup of messages from mediator '${mediatorRecord.id}'`)
await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy)
break
default:
this.agentConfig.logger.info(
`Skipping pickup of messages from mediator '${mediator.id}' due to pickup strategy none`
`Skipping pickup of messages from mediator '${mediatorRecord.id}' due to pickup strategy none`
)
}
}

/**
* Terminate all ongoing Message Pickup loops
*/
public async stopMessagePickup() {
this.stopMessagePickup$.next(true)
}

private async sendStatusRequest(config: { mediatorId: string; recipientKey?: string }) {
const mediationRecord = await this.mediationRecipientService.getById(config.mediatorId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { SubjectInboundTransport } from '../../../../../../tests/transport/Subje
import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport'
import { getBaseConfig, waitForBasicMessage } from '../../../../tests/helpers'
import { Agent } from '../../../agent/Agent'
import { sleep } from '../../../utils/sleep'
import { ConnectionRecord, HandshakeProtocol } from '../../connections'
import { MediatorPickupStrategy } from '../MediatorPickupStrategy'
import { MediationState } from '../models/MediationState'
Expand All @@ -32,12 +31,6 @@ describe('mediator establishment', () => {
let senderAgent: Agent

afterEach(async () => {
// We want to stop the mediator polling before the agent is shutdown.
// FIXME: add a way to stop mediator polling from the public api, and make sure this is
// being handled in the agent shutdown so we don't get any errors with wallets being closed.
recipientAgent.config.stop$.next(true)
await sleep(1000)

await recipientAgent?.shutdown()
await recipientAgent?.wallet.delete()
await mediatorAgent?.shutdown()
Expand Down
8 changes: 2 additions & 6 deletions packages/core/src/modules/routing/__tests__/pickup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,17 @@ import { SubjectInboundTransport } from '../../../../../../tests/transport/Subje
import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport'
import { getBaseConfig, waitForBasicMessage } from '../../../../tests/helpers'
import { Agent } from '../../../agent/Agent'
import { ConsoleLogger, LogLevel } from '../../../logger'
import { HandshakeProtocol } from '../../connections'
import { MediatorPickupStrategy } from '../MediatorPickupStrategy'

const logger = new ConsoleLogger(LogLevel.info)
const recipientConfig = getBaseConfig('Mediation: Recipient', {
const recipientConfig = getBaseConfig('Pickup: Recipient', {
autoAcceptConnections: true,
indyLedgers: [],
logger,
})
const mediatorConfig = getBaseConfig('Mediation: Mediator', {
const mediatorConfig = getBaseConfig('Pickup: Mediator', {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also ran into issues with this 😆

autoAcceptConnections: true,
endpoints: ['rxjs:mediator'],
indyLedgers: [],
logger,
})

describe('E2E Pick Up protocol', () => {
Expand Down
9 changes: 2 additions & 7 deletions packages/core/tests/connectionless-proofs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
} from '../src/modules/proofs'
import { MediatorPickupStrategy } from '../src/modules/routing'
import { LinkedAttachment } from '../src/utils/LinkedAttachment'
import { sleep } from '../src/utils/sleep'
import { uuid } from '../src/utils/uuid'

import {
Expand Down Expand Up @@ -342,11 +341,7 @@ describe('Present Proof', () => {
state: ProofState.Done,
})

// We want to stop the mediator polling before the agent is shutdown.
// FIXME: add a way to stop mediator polling from the public api, and make sure this is
// being handled in the agent shutdown so we don't get any errors with wallets being closed.
faberAgent.config.stop$.next(true)
aliceAgent.config.stop$.next(true)
await sleep(2000)
await aliceAgent.mediationRecipient.stopMessagePickup()
await faberAgent.mediationRecipient.stopMessagePickup()
})
Comment on lines +344 to 346
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much better API

})
4 changes: 1 addition & 3 deletions tests/e2e-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ export async function e2eTest({
expect(verifierProof.state).toBe(ProofState.Done)

// We want to stop the mediator polling before the agent is shutdown.
// FIXME: add a way to stop mediator polling from the public api, and make sure this is
// being handled in the agent shutdown so we don't get any errors with wallets being closed.
recipientAgent.config.stop$.next(true)
await recipientAgent.mediationRecipient.stopMessagePickup()
await sleep(2000)
}