From 1cb5af49500359085390508bd7ee00104e2acf15 Mon Sep 17 00:00:00 2001 From: Timo Glastra Date: Thu, 21 Apr 2022 17:42:05 +0200 Subject: [PATCH 1/3] fix: close session early if no return route Signed-off-by: Timo Glastra --- packages/core/src/agent/MessageReceiver.ts | 3 +++ packages/core/src/agent/TransportService.ts | 1 + packages/core/src/agent/__tests__/stubs.ts | 4 ++++ packages/node/src/transport/HttpInboundTransport.ts | 6 ++++++ packages/node/src/transport/WsInboundTransport.ts | 6 ++++++ tests/transport/SubjectInboundTransport.ts | 6 ++++++ 6 files changed, 26 insertions(+) diff --git a/packages/core/src/agent/MessageReceiver.ts b/packages/core/src/agent/MessageReceiver.ts index cb201f009e..b7186c227b 100644 --- a/packages/core/src/agent/MessageReceiver.ts +++ b/packages/core/src/agent/MessageReceiver.ts @@ -112,6 +112,9 @@ export class MessageReceiver { // with mediators when you don't have a public endpoint yet. session.connection = connection ?? undefined this.transportService.saveSession(session) + } else if (session) { + // No need to wait for session to stay open if we're not actually going to respond to the message. + await session.close() } const messageContext = new InboundMessageContext(message, { diff --git a/packages/core/src/agent/TransportService.ts b/packages/core/src/agent/TransportService.ts index 230a79a17d..68d3e52139 100644 --- a/packages/core/src/agent/TransportService.ts +++ b/packages/core/src/agent/TransportService.ts @@ -67,4 +67,5 @@ export interface TransportSession { inboundMessage?: AgentMessage connection?: ConnectionRecord send(encryptedMessage: EncryptedMessage): Promise + close(): Promise } diff --git a/packages/core/src/agent/__tests__/stubs.ts b/packages/core/src/agent/__tests__/stubs.ts index 5bdb3b5bb6..49fcaae660 100644 --- a/packages/core/src/agent/__tests__/stubs.ts +++ b/packages/core/src/agent/__tests__/stubs.ts @@ -17,4 +17,8 @@ export class DummyTransportSession implements TransportSession { public send(): Promise { throw new Error('Method not implemented.') } + + public close(): Promise { + throw new Error('Method not implemented.') + } } diff --git a/packages/node/src/transport/HttpInboundTransport.ts b/packages/node/src/transport/HttpInboundTransport.ts index 4ee555a395..d6ecd27910 100644 --- a/packages/node/src/transport/HttpInboundTransport.ts +++ b/packages/node/src/transport/HttpInboundTransport.ts @@ -75,6 +75,12 @@ export class HttpTransportSession implements TransportSession { this.res = res } + public async close(): Promise { + if (!this.res.headersSent) { + this.res.status(200).end() + } + } + public async send(encryptedMessage: EncryptedMessage): Promise { if (this.res.headersSent) { throw new AriesFrameworkError(`${this.type} transport session has been closed.`) diff --git a/packages/node/src/transport/WsInboundTransport.ts b/packages/node/src/transport/WsInboundTransport.ts index 7be023570f..58f21f1557 100644 --- a/packages/node/src/transport/WsInboundTransport.ts +++ b/packages/node/src/transport/WsInboundTransport.ts @@ -88,4 +88,10 @@ export class WebSocketTransportSession implements TransportSession { this.socket.send(JSON.stringify(encryptedMessage)) } + + public async close(): Promise { + if (this.socket.readyState === WebSocket.OPEN) { + this.socket.close() + } + } } diff --git a/tests/transport/SubjectInboundTransport.ts b/tests/transport/SubjectInboundTransport.ts index 45097dbadb..6b2f1ee8ff 100644 --- a/tests/transport/SubjectInboundTransport.ts +++ b/tests/transport/SubjectInboundTransport.ts @@ -55,4 +55,10 @@ export class SubjectTransportSession implements TransportSession { public async send(encryptedMessage: EncryptedMessage): Promise { this.replySubject.next({ message: encryptedMessage }) } + + public async close(): Promise { + // FIXME: we should call complete, but this requires some changes already in this PR: https://github.com/hyperledger/aries-framework-javascript/pull/712 + // We can then create a replySubject for each interaction, which mimics the the behavior of real transports better. + // this.replySubject.complete() + } } From 1fd017de9a91cb8a90555a681511a8af7ec1f60b Mon Sep 17 00:00:00 2001 From: Timo Glastra Date: Mon, 2 May 2022 10:12:15 +0200 Subject: [PATCH 2/3] test: subject transport session Signed-off-by: Timo Glastra --- tests/transport/SubjectInboundTransport.ts | 4 +- tests/transport/SubjectOutboundTransport.ts | 41 ++++++++++----------- 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/tests/transport/SubjectInboundTransport.ts b/tests/transport/SubjectInboundTransport.ts index 09b8211bed..10c978654d 100644 --- a/tests/transport/SubjectInboundTransport.ts +++ b/tests/transport/SubjectInboundTransport.ts @@ -66,8 +66,6 @@ export class SubjectTransportSession implements TransportSession { } public async close(): Promise { - // FIXME: we should call complete, but this requires some changes already in this PR: https://github.com/hyperledger/aries-framework-javascript/pull/712 - // We can then create a replySubject for each interaction, which mimics the the behavior of real transports better. - // this.replySubject.complete() + this.replySubject.complete() } } diff --git a/tests/transport/SubjectOutboundTransport.ts b/tests/transport/SubjectOutboundTransport.ts index bd9986baf8..e099ccd506 100644 --- a/tests/transport/SubjectOutboundTransport.ts +++ b/tests/transport/SubjectOutboundTransport.ts @@ -1,18 +1,14 @@ -import type { Agent, Logger } from '../../packages/core/src' -import type { OutboundTransport } from '../../packages/core/src/transport/OutboundTransport' -import type { OutboundPackage } from '../../packages/core/src/types' import type { SubjectMessage } from './SubjectInboundTransport' -import type { Subscription } from 'rxjs' +import type { OutboundPackage, OutboundTransport, Agent, Logger } from '@aries-framework/core' -import { Subject } from 'rxjs' +import { takeUntil, Subject } from 'rxjs' -import { InjectionSymbols, AriesFrameworkError } from '../../packages/core/src' +import { InjectionSymbols, AriesFrameworkError } from '@aries-framework/core' export class SubjectOutboundTransport implements OutboundTransport { private logger!: Logger - private ourSubject = new Subject() - private returnRouteMessageSubscription?: Subscription private subjectMap: { [key: string]: Subject | undefined } + private agent!: Agent public supportedSchemes = ['rxjs'] @@ -21,23 +17,13 @@ export class SubjectOutboundTransport implements OutboundTransport { } public async start(agent: Agent): Promise { + this.agent = agent + this.logger = agent.injectionContainer.resolve(InjectionSymbols.Logger) - this.subscribe(agent) } public async stop(): Promise { - this.returnRouteMessageSubscription?.unsubscribe() - this.ourSubject.complete() - } - - private subscribe(agent: Agent) { - this.returnRouteMessageSubscription = this.ourSubject.subscribe({ - next: async ({ message }: SubjectMessage) => { - this.logger.test('Received message') - - await agent.receiveMessage(message) - }, - }) + // No logic needed } public async sendMessage(outboundPackage: OutboundPackage) { @@ -56,6 +42,17 @@ export class SubjectOutboundTransport implements OutboundTransport { throw new AriesFrameworkError(`No subject found for endpoint ${endpoint}`) } - subject.next({ message: payload, replySubject: this.ourSubject }) + // Create a replySubject just for this session. Both ends will be able to close it, + // mimicking a transport like http or websocket. Close session automatically when agent stops + const replySubject = new Subject() + replySubject.pipe(takeUntil(this.agent.config.stop$)).subscribe({ + next: async ({ message }: SubjectMessage) => { + this.logger.test('Received message') + + await this.agent.receiveMessage(message) + }, + }) + + subject.next({ message: payload, replySubject }) } } From a17bff79f311418963535a478641438c7852ce32 Mon Sep 17 00:00:00 2001 From: Timo Glastra Date: Mon, 2 May 2022 10:46:12 +0200 Subject: [PATCH 3/3] test: close session on agent shutdown Signed-off-by: Timo Glastra --- tests/transport/SubjectOutboundTransport.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/transport/SubjectOutboundTransport.ts b/tests/transport/SubjectOutboundTransport.ts index e099ccd506..385fcdc08c 100644 --- a/tests/transport/SubjectOutboundTransport.ts +++ b/tests/transport/SubjectOutboundTransport.ts @@ -1,7 +1,7 @@ import type { SubjectMessage } from './SubjectInboundTransport' import type { OutboundPackage, OutboundTransport, Agent, Logger } from '@aries-framework/core' -import { takeUntil, Subject } from 'rxjs' +import { takeUntil, Subject, take } from 'rxjs' import { InjectionSymbols, AriesFrameworkError } from '@aries-framework/core' @@ -45,6 +45,8 @@ export class SubjectOutboundTransport implements OutboundTransport { // Create a replySubject just for this session. Both ends will be able to close it, // mimicking a transport like http or websocket. Close session automatically when agent stops const replySubject = new Subject() + this.agent.config.stop$.pipe(take(1)).subscribe(() => !replySubject.closed && replySubject.complete()) + replySubject.pipe(takeUntil(this.agent.config.stop$)).subscribe({ next: async ({ message }: SubjectMessage) => { this.logger.test('Received message')