diff --git a/.github/workflows/continuous-integration.yml b/.github/workflows/continuous-integration.yml index bf88bd3a9b..7a65ed915e 100644 --- a/.github/workflows/continuous-integration.yml +++ b/.github/workflows/continuous-integration.yml @@ -108,7 +108,7 @@ jobs: run: yarn install - name: Run tests - run: TEST_AGENT_PUBLIC_DID_SEED=${TEST_AGENT_PUBLIC_DID_SEED} GENESIS_TXN_PATH=${GENESIS_TXN_PATH} yarn test --coverage --detectOpenHandles + run: TEST_AGENT_PUBLIC_DID_SEED=${TEST_AGENT_PUBLIC_DID_SEED} GENESIS_TXN_PATH=${GENESIS_TXN_PATH} yarn test --coverage - name: Export logs if: always() diff --git a/package.json b/package.json index 0875070577..3a602643c9 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ "object-inspect": "^1.10.3", "react-native-fs": "^2.18.0", "reflect-metadata": "^0.1.13", + "rxjs": "^7.1.0", "tsyringe": "^4.5.0", "uuid": "^8.3.0", "ws": "^7.4.5" @@ -71,7 +72,6 @@ "npm-run-all": "^4.1.5", "prettier": "^2.2.1", "release-it": "^14.6.1", - "rxjs": "^6.6.6", "ts-jest": "^26.5.3", "ts-node-dev": "^1.1.6", "tslog": "^3.1.2", diff --git a/src/agent/Agent.ts b/src/agent/Agent.ts index 6833208231..413bf5312d 100644 --- a/src/agent/Agent.ts +++ b/src/agent/Agent.ts @@ -6,8 +6,10 @@ import type { InitConfig } from '../types' import type { Wallet } from '../wallet/Wallet' import type { AgentMessageReceivedEvent } from './Events' import type { TransportSession } from './TransportService' +import type { Subscription } from 'rxjs' import type { DependencyContainer } from 'tsyringe' +import { concatMap } from 'rxjs/operators' import { container as baseContainer } from 'tsyringe' import { InjectionSymbols } from '../constants' @@ -37,6 +39,7 @@ export class Agent { protected messageSender: MessageSender public inboundTransporter?: InboundTransporter private _isInitialized = false + public messageSubscription: Subscription public readonly connections!: ConnectionsModule public readonly proofs!: ProofsModule @@ -94,13 +97,10 @@ export class Agent { this.ledger = this.container.resolve(LedgerModule) // Listen for new messages (either from transports or somewhere else in the framework / extensions) - this.listenForMessages() - } - - private listenForMessages() { - this.eventEmitter.on(AgentEventTypes.AgentMessageReceived, async (event) => { - await this.receiveMessage(event.payload.message) - }) + this.messageSubscription = this.eventEmitter + .observable(AgentEventTypes.AgentMessageReceived) + .pipe(concatMap((e) => this.messageReceiver.receiveMessage(e.payload.message))) + .subscribe() } public setInboundTransporter(inboundTransporter: InboundTransporter) { diff --git a/src/agent/EventEmitter.ts b/src/agent/EventEmitter.ts index 36e8a953df..c33d42b09f 100644 --- a/src/agent/EventEmitter.ts +++ b/src/agent/EventEmitter.ts @@ -1,6 +1,7 @@ import type { BaseEvent } from './Events' import { EventEmitter as NativeEventEmitter } from 'events' +import { fromEventPattern } from 'rxjs' import { Lifecycle, scoped } from 'tsyringe' @scoped(Lifecycle.ContainerScoped) @@ -18,4 +19,11 @@ export class EventEmitter { public off(event: T['type'], listener: (data: T) => void | Promise) { this.eventEmitter.off(event, listener) } + + public observable(event: T['type']) { + return fromEventPattern( + (handler) => this.on(event, handler), + (handler) => this.off(event, handler) + ) + } } diff --git a/src/modules/credentials/services/CredentialService.ts b/src/modules/credentials/services/CredentialService.ts index a7179d6571..8cbe93789f 100644 --- a/src/modules/credentials/services/CredentialService.ts +++ b/src/modules/credentials/services/CredentialService.ts @@ -234,7 +234,7 @@ export class CredentialService { /** * Create a {@link OfferCredentialMessage} not bound to an existing credential exchange. - * To create an offer as response to an existing credential exchange, use {@link ProofService#createOfferAsResponse}. + * To create an offer as response to an existing credential exchange, use {@link CredentialService#createOfferAsResponse}. * * @param connectionRecord The connection for which to create the credential offer * @param credentialTemplate The credential template to use for the offer diff --git a/yarn.lock b/yarn.lock index b1c5a8a67b..a3c4401c9b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5871,6 +5871,13 @@ rxjs@^6.6.6: dependencies: tslib "^1.9.0" +rxjs@^7.1.0: + version "7.1.0" + resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-7.1.0.tgz#94202d27b19305ef7b1a4f330277b2065df7039e" + integrity sha512-gCFO5iHIbRPwznl6hAYuwNFld8W4S2shtSJIqG27ReWXo9IWrCyEICxUA+6vJHwSR/OakoenC4QsDxq50tzYmw== + dependencies: + tslib "~2.1.0" + safe-buffer@5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1: version "5.1.2" resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d" @@ -6622,6 +6629,11 @@ tslib@^1.8.1, tslib@^1.9.0, tslib@^1.9.3: resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00" integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg== +tslib@~2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.1.0.tgz#da60860f1c2ecaa5703ab7d39bc05b6bf988b97a" + integrity sha512-hcVC3wYEziELGGmEEXue7D75zbwIIVUMWAVbHItGPx0ziyXxrOMQx4rQEVEV45Ut/1IotuEvwqPopzIOkDMf0A== + tslog@^3.1.2: version "3.2.0" resolved "https://registry.yarnpkg.com/tslog/-/tslog-3.2.0.tgz#4982c289a8948670d6a1c49c29977ae9f861adfa"