Skip to content

Commit

Permalink
fix: unsubscribe from emitter after pickup completion (#1806)
Browse files Browse the repository at this point in the history
Signed-off-by: Ariel Gentile <[email protected]>
  • Loading branch information
genaris authored Mar 28, 2024
1 parent 48b31ae commit 9fb6ae0
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { DiscoverFeaturesService } from './services'
import type { Feature } from '../../agent/models'

import { firstValueFrom, of, ReplaySubject, Subject } from 'rxjs'
import { catchError, filter, map, takeUntil, timeout } from 'rxjs/operators'
import { catchError, filter, first, map, takeUntil, timeout } from 'rxjs/operators'

import { AgentContext } from '../../agent'
import { EventEmitter } from '../../agent/EventEmitter'
Expand Down Expand Up @@ -120,6 +120,8 @@ export class DiscoverFeaturesApi<
filter((e) => e.payload.connection?.id === connection.id),
// Return disclosures
map((e) => e.payload.disclosures),
// Only wait for first event that matches the criteria
first(),
// If we don't have an answer in timeoutMs miliseconds (no response, not supported, etc...) error
timeout({
first: options.awaitDisclosuresTimeoutMs ?? 7000,
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/modules/message-pickup/MessagePickupApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import type { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protoco
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol'
import type { MessagePickupRepository } from './storage/MessagePickupRepository'

import { ReplaySubject, Subject, filter, firstValueFrom, takeUntil, timeout } from 'rxjs'
import { ReplaySubject, Subject, filter, first, firstValueFrom, takeUntil, timeout } from 'rxjs'

import { AgentContext } from '../../agent'
import { EventEmitter } from '../../agent/EventEmitter'
Expand Down Expand Up @@ -222,14 +222,15 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP
const replaySubject = new ReplaySubject(1)

if (options.awaitCompletion) {
// Listen for response to our feature query
this.eventEmitter
.observable<MessagePickupCompletedEvent>(MessagePickupEventTypes.MessagePickupCompleted)
.pipe(
// Stop when the agent shuts down
takeUntil(this.stop$),
// filter by connection id
filter((e) => e.payload.connection.id === connectionRecord.id),
// Only wait for first event that matches the criteria
first(),
// If we don't receive all messages within timeoutMs miliseconds (no response, not supported, etc...) error
timeout({
first: options.awaitCompletionTimeoutMs ?? 10000,
Expand Down

0 comments on commit 9fb6ae0

Please sign in to comment.