diff --git a/packages/core/src/modules/routing/RecipientApi.ts b/packages/core/src/modules/routing/RecipientApi.ts index 293208a4a5..d7ac52e41c 100644 --- a/packages/core/src/modules/routing/RecipientApi.ts +++ b/packages/core/src/modules/routing/RecipientApi.ts @@ -5,7 +5,7 @@ import type { MediationStateChangedEvent } from './RoutingEvents' import type { MediationRecord } from './repository' import type { GetRoutingOptions } from './services/RoutingService' -import { firstValueFrom, interval, ReplaySubject, Subject, timer } from 'rxjs' +import { firstValueFrom, interval, merge, ReplaySubject, Subject, timer } from 'rxjs' import { delayWhen, filter, first, takeUntil, tap, throttleTime, timeout } from 'rxjs/operators' import { AgentContext } from '../../agent' @@ -52,6 +52,9 @@ export class RecipientApi { private agentContext: AgentContext private stop$: Subject + // stopMessagePickup$ is used for stop message pickup signal + private readonly stopMessagePickup$ = new Subject() + public constructor( dispatcher: Dispatcher, mediationRecipientService: MediationRecipientService, @@ -159,11 +162,13 @@ export class RecipientApi { // in a recursive back off strategy if it matches the following criteria: // - Agent is not shutdown // - Socket was for current mediator connection id + + const stopConditions$ = merge(this.stop$, this.stopMessagePickup$).pipe() this.eventEmitter .observable(TransportEventTypes.OutboundWebSocketClosedEvent) .pipe( // Stop when the agent shuts down - takeUntil(this.stop$), + takeUntil(stopConditions$), filter((e) => e.payload.connectionId === mediator.connectionId), // Make sure we're not reconnecting multiple times throttleTime(interval), @@ -172,20 +177,23 @@ export class RecipientApi { // Wait for interval time before reconnecting delayWhen(() => timer(interval)) ) - .subscribe(async () => { - this.logger.debug( - `Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...` - ) - try { - if (pickupStrategy === MediatorPickupStrategy.PickUpV2) { - // Start Pickup v2 protocol to receive messages received while websocket offline - await this.sendStatusRequest({ mediatorId: mediator.id }) - } else { - await this.openMediationWebSocket(mediator) + .subscribe({ + next: async () => { + this.logger.debug( + `Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...` + ) + try { + if (pickupStrategy === MediatorPickupStrategy.PickUpV2) { + // Start Pickup v2 protocol to receive messages received while websocket offline + await this.sendStatusRequest({ mediatorId: mediator.id }) + } else { + await this.openMediationWebSocket(mediator) + } + } catch (error) { + this.logger.warn('Unable to re-open websocket connection to mediator', { error }) } - } catch (error) { - this.logger.warn('Unable to re-open websocket connection to mediator', { error }) - } + }, + complete: () => this.logger.info(`Stopping pickup of messages from mediator '${mediator.id}'`), }) try { if (pickupStrategy === MediatorPickupStrategy.Implicit) { @@ -196,38 +204,64 @@ export class RecipientApi { } } - public async initiateMessagePickup(mediator: MediationRecord) { + /** + * Start a Message Pickup flow with a registered Mediator. + * + * @param mediator optional {MediationRecord} corresponding to the mediator to pick messages from. It will use + * default mediator otherwise + * @param pickupStrategy optional {MediatorPickupStrategy} to use in the loop. It will use Agent's default + * strategy or attempt to find it by Discover Features otherwise + * @returns + */ + public async initiateMessagePickup(mediator?: MediationRecord, pickupStrategy?: MediatorPickupStrategy) { const { mediatorPollingInterval } = this.config - const mediatorPickupStrategy = await this.getPickupStrategyForMediator(mediator) - const mediatorConnection = await this.connectionService.getById(this.agentContext, mediator.connectionId) + + const mediatorRecord = mediator ?? (await this.findDefaultMediator()) + if (!mediatorRecord) { + throw new AriesFrameworkError('There is no mediator to pickup messages from') + } + + const mediatorPickupStrategy = pickupStrategy ?? (await this.getPickupStrategyForMediator(mediatorRecord)) + const mediatorConnection = await this.connectionService.getById(this.agentContext, mediatorRecord.connectionId) switch (mediatorPickupStrategy) { case MediatorPickupStrategy.PickUpV2: - this.logger.info(`Starting pickup of messages from mediator '${mediator.id}'`) - await this.openWebSocketAndPickUp(mediator, mediatorPickupStrategy) - await this.sendStatusRequest({ mediatorId: mediator.id }) + this.logger.info(`Starting pickup of messages from mediator '${mediatorRecord.id}'`) + await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy) + await this.sendStatusRequest({ mediatorId: mediatorRecord.id }) break case MediatorPickupStrategy.PickUpV1: { + const stopConditions$ = merge(this.stop$, this.stopMessagePickup$).pipe() // Explicit means polling every X seconds with batch message - this.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediator.id}'`) + this.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediatorRecord.id}'`) const subscription = interval(mediatorPollingInterval) - .pipe(takeUntil(this.stop$)) - .subscribe(async () => { - await this.pickupMessages(mediatorConnection) + .pipe(takeUntil(stopConditions$)) + .subscribe({ + next: async () => { + await this.pickupMessages(mediatorConnection) + }, + complete: () => this.logger.info(`Stopping pickup of messages from mediator '${mediatorRecord.id}'`), }) return subscription } case MediatorPickupStrategy.Implicit: // Implicit means sending ping once and keeping connection open. This requires a long-lived transport // such as WebSockets to work - this.logger.info(`Starting implicit pickup of messages from mediator '${mediator.id}'`) - await this.openWebSocketAndPickUp(mediator, mediatorPickupStrategy) + this.logger.info(`Starting implicit pickup of messages from mediator '${mediatorRecord.id}'`) + await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy) break default: - this.logger.info(`Skipping pickup of messages from mediator '${mediator.id}' due to pickup strategy none`) + this.logger.info(`Skipping pickup of messages from mediator '${mediatorRecord.id}' due to pickup strategy none`) } } + /** + * Terminate all ongoing Message Pickup loops + */ + public async stopMessagePickup() { + this.stopMessagePickup$.next(true) + } + private async sendStatusRequest(config: { mediatorId: string; recipientKey?: string }) { const mediationRecord = await this.mediationRecipientService.getById(this.agentContext, config.mediatorId) diff --git a/packages/core/src/modules/routing/__tests__/mediation.test.ts b/packages/core/src/modules/routing/__tests__/mediation.test.ts index d3814e54b3..19cccf0593 100644 --- a/packages/core/src/modules/routing/__tests__/mediation.test.ts +++ b/packages/core/src/modules/routing/__tests__/mediation.test.ts @@ -7,8 +7,6 @@ import { SubjectInboundTransport } from '../../../../../../tests/transport/Subje import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport' import { getAgentOptions, waitForBasicMessage } from '../../../../tests/helpers' import { Agent } from '../../../agent/Agent' -import { InjectionSymbols } from '../../../constants' -import { sleep } from '../../../utils/sleep' import { ConnectionRecord, HandshakeProtocol } from '../../connections' import { MediatorPickupStrategy } from '../MediatorPickupStrategy' import { MediationState } from '../models/MediationState' @@ -33,13 +31,6 @@ describe('mediator establishment', () => { let senderAgent: Agent afterEach(async () => { - // We want to stop the mediator polling before the agent is shutdown. - // FIXME: add a way to stop mediator polling from the public api, and make sure this is - // being handled in the agent shutdown so we don't get any errors with wallets being closed. - const stop$ = recipientAgent.injectionContainer.resolve>(InjectionSymbols.Stop$) - stop$.next(true) - await sleep(1000) - await recipientAgent?.shutdown() await recipientAgent?.wallet.delete() await mediatorAgent?.shutdown() diff --git a/packages/core/tests/v1-connectionless-proofs.test.ts b/packages/core/tests/v1-connectionless-proofs.test.ts index 684584a97a..af5e49edbc 100644 --- a/packages/core/tests/v1-connectionless-proofs.test.ts +++ b/packages/core/tests/v1-connectionless-proofs.test.ts @@ -8,7 +8,6 @@ import { Subject, ReplaySubject } from 'rxjs' import { SubjectInboundTransport } from '../../../tests/transport/SubjectInboundTransport' import { SubjectOutboundTransport } from '../../../tests/transport/SubjectOutboundTransport' -import { InjectionSymbols } from '../src' import { Agent } from '../src/agent/Agent' import { Attachment, AttachmentData } from '../src/decorators/attachment/Attachment' import { HandshakeProtocol } from '../src/modules/connections' @@ -25,7 +24,6 @@ import { } from '../src/modules/proofs' import { MediatorPickupStrategy } from '../src/modules/routing' import { LinkedAttachment } from '../src/utils/LinkedAttachment' -import { sleep } from '../src/utils/sleep' import { uuid } from '../src/utils/uuid' import { @@ -383,13 +381,7 @@ describe('Present Proof', () => { await faberProofRecordPromise - // We want to stop the mediator polling before the agent is shutdown. - // FIXME: add a way to stop mediator polling from the public api, and make sure this is - // being handled in the agent shutdown so we don't get any errors with wallets being closed. - const faberStop$ = faberAgent.injectionContainer.resolve>(InjectionSymbols.Stop$) - const aliceStop$ = aliceAgent.injectionContainer.resolve>(InjectionSymbols.Stop$) - faberStop$.next(true) - aliceStop$.next(true) - await sleep(2000) + await aliceAgent.mediationRecipient.stopMessagePickup() + await faberAgent.mediationRecipient.stopMessagePickup() }) }) diff --git a/tests/e2e-test.ts b/tests/e2e-test.ts index 7c42b6a13d..f5a1d1bee5 100644 --- a/tests/e2e-test.ts +++ b/tests/e2e-test.ts @@ -95,9 +95,6 @@ export async function e2eTest({ expect(verifierProof.state).toBe(ProofState.Done) // We want to stop the mediator polling before the agent is shutdown. - // FIXME: add a way to stop mediator polling from the public api, and make sure this is - // being handled in the agent shutdown so we don't get any errors with wallets being closed. - const recipientStop$ = recipientAgent.injectionContainer.resolve>(InjectionSymbols.Stop$) - recipientStop$.next(true) + await recipientAgent.mediationRecipient.stopMessagePickup() await sleep(2000) }