Skip to content

Commit

Permalink
feat: add inbound message queue (#339)
Browse files Browse the repository at this point in the history
* feat: add inbound message queue
* refactor: use observables for serial processing

Signed-off-by: Timo Glastra <[email protected]>
  • Loading branch information
TimoGlastra authored Jun 30, 2021
1 parent e1a4adf commit 93893b7
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/continuous-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ jobs:
run: yarn install

- name: Run tests
run: TEST_AGENT_PUBLIC_DID_SEED=${TEST_AGENT_PUBLIC_DID_SEED} GENESIS_TXN_PATH=${GENESIS_TXN_PATH} yarn test --coverage --detectOpenHandles
run: TEST_AGENT_PUBLIC_DID_SEED=${TEST_AGENT_PUBLIC_DID_SEED} GENESIS_TXN_PATH=${GENESIS_TXN_PATH} yarn test --coverage

- name: Export logs
if: always()
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"object-inspect": "^1.10.3",
"react-native-fs": "^2.18.0",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.1.0",
"tsyringe": "^4.5.0",
"uuid": "^8.3.0",
"ws": "^7.4.5"
Expand Down Expand Up @@ -71,7 +72,6 @@
"npm-run-all": "^4.1.5",
"prettier": "^2.2.1",
"release-it": "^14.6.1",
"rxjs": "^6.6.6",
"ts-jest": "^26.5.3",
"ts-node-dev": "^1.1.6",
"tslog": "^3.1.2",
Expand Down
14 changes: 7 additions & 7 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import type { InitConfig } from '../types'
import type { Wallet } from '../wallet/Wallet'
import type { AgentMessageReceivedEvent } from './Events'
import type { TransportSession } from './TransportService'
import type { Subscription } from 'rxjs'
import type { DependencyContainer } from 'tsyringe'

import { concatMap } from 'rxjs/operators'
import { container as baseContainer } from 'tsyringe'

import { InjectionSymbols } from '../constants'
Expand Down Expand Up @@ -37,6 +39,7 @@ export class Agent {
protected messageSender: MessageSender
public inboundTransporter?: InboundTransporter
private _isInitialized = false
public messageSubscription: Subscription

public readonly connections!: ConnectionsModule
public readonly proofs!: ProofsModule
Expand Down Expand Up @@ -94,13 +97,10 @@ export class Agent {
this.ledger = this.container.resolve(LedgerModule)

// Listen for new messages (either from transports or somewhere else in the framework / extensions)
this.listenForMessages()
}

private listenForMessages() {
this.eventEmitter.on<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived, async (event) => {
await this.receiveMessage(event.payload.message)
})
this.messageSubscription = this.eventEmitter
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(concatMap((e) => this.messageReceiver.receiveMessage(e.payload.message)))
.subscribe()
}

public setInboundTransporter(inboundTransporter: InboundTransporter) {
Expand Down
8 changes: 8 additions & 0 deletions src/agent/EventEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { BaseEvent } from './Events'

import { EventEmitter as NativeEventEmitter } from 'events'
import { fromEventPattern } from 'rxjs'
import { Lifecycle, scoped } from 'tsyringe'

@scoped(Lifecycle.ContainerScoped)
Expand All @@ -18,4 +19,11 @@ export class EventEmitter {
public off<T extends BaseEvent>(event: T['type'], listener: (data: T) => void | Promise<void>) {
this.eventEmitter.off(event, listener)
}

public observable<T extends BaseEvent>(event: T['type']) {
return fromEventPattern<T>(
(handler) => this.on(event, handler),
(handler) => this.off(event, handler)
)
}
}
2 changes: 1 addition & 1 deletion src/modules/credentials/services/CredentialService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export class CredentialService {

/**
* Create a {@link OfferCredentialMessage} not bound to an existing credential exchange.
* To create an offer as response to an existing credential exchange, use {@link ProofService#createOfferAsResponse}.
* To create an offer as response to an existing credential exchange, use {@link CredentialService#createOfferAsResponse}.
*
* @param connectionRecord The connection for which to create the credential offer
* @param credentialTemplate The credential template to use for the offer
Expand Down
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5871,6 +5871,13 @@ rxjs@^6.6.6:
dependencies:
tslib "^1.9.0"

rxjs@^7.1.0:
version "7.1.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-7.1.0.tgz#94202d27b19305ef7b1a4f330277b2065df7039e"
integrity sha512-gCFO5iHIbRPwznl6hAYuwNFld8W4S2shtSJIqG27ReWXo9IWrCyEICxUA+6vJHwSR/OakoenC4QsDxq50tzYmw==
dependencies:
tslib "~2.1.0"

[email protected], safe-buffer@~5.1.0, safe-buffer@~5.1.1:
version "5.1.2"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d"
Expand Down Expand Up @@ -6622,6 +6629,11 @@ tslib@^1.8.1, tslib@^1.9.0, tslib@^1.9.3:
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==

tslib@~2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.1.0.tgz#da60860f1c2ecaa5703ab7d39bc05b6bf988b97a"
integrity sha512-hcVC3wYEziELGGmEEXue7D75zbwIIVUMWAVbHItGPx0ziyXxrOMQx4rQEVEV45Ut/1IotuEvwqPopzIOkDMf0A==

tslog@^3.1.2:
version "3.2.0"
resolved "https://registry.yarnpkg.com/tslog/-/tslog-3.2.0.tgz#4982c289a8948670d6a1c49c29977ae9f861adfa"
Expand Down

0 comments on commit 93893b7

Please sign in to comment.