diff --git a/packages/core/src/agent/MessageReceiver.ts b/packages/core/src/agent/MessageReceiver.ts index 39cd6a2cc2..9ae31bb189 100644 --- a/packages/core/src/agent/MessageReceiver.ts +++ b/packages/core/src/agent/MessageReceiver.ts @@ -122,6 +122,9 @@ export class MessageReceiver { session.connection = connection ?? undefined messageContext.sessionId = session.id this.transportService.saveSession(session) + } else if (session) { + // No need to wait for session to stay open if we're not actually going to respond to the message. + await session.close() } await this.dispatcher.dispatch(messageContext) diff --git a/packages/core/src/agent/TransportService.ts b/packages/core/src/agent/TransportService.ts index cf078dc8da..ccaee2b6f5 100644 --- a/packages/core/src/agent/TransportService.ts +++ b/packages/core/src/agent/TransportService.ts @@ -67,4 +67,5 @@ export interface TransportSession { inboundMessage?: AgentMessage connection?: ConnectionRecord send(encryptedMessage: EncryptedMessage): Promise + close(): Promise } diff --git a/packages/core/src/agent/__tests__/stubs.ts b/packages/core/src/agent/__tests__/stubs.ts index 5bdb3b5bb6..49fcaae660 100644 --- a/packages/core/src/agent/__tests__/stubs.ts +++ b/packages/core/src/agent/__tests__/stubs.ts @@ -17,4 +17,8 @@ export class DummyTransportSession implements TransportSession { public send(): Promise { throw new Error('Method not implemented.') } + + public close(): Promise { + throw new Error('Method not implemented.') + } } diff --git a/packages/node/src/transport/HttpInboundTransport.ts b/packages/node/src/transport/HttpInboundTransport.ts index 4ee555a395..d6ecd27910 100644 --- a/packages/node/src/transport/HttpInboundTransport.ts +++ b/packages/node/src/transport/HttpInboundTransport.ts @@ -75,6 +75,12 @@ export class HttpTransportSession implements TransportSession { this.res = res } + public async close(): Promise { + if (!this.res.headersSent) { + this.res.status(200).end() + } + } + public async send(encryptedMessage: EncryptedMessage): Promise { if (this.res.headersSent) { throw new AriesFrameworkError(`${this.type} transport session has been closed.`) diff --git a/packages/node/src/transport/WsInboundTransport.ts b/packages/node/src/transport/WsInboundTransport.ts index 7be023570f..58f21f1557 100644 --- a/packages/node/src/transport/WsInboundTransport.ts +++ b/packages/node/src/transport/WsInboundTransport.ts @@ -88,4 +88,10 @@ export class WebSocketTransportSession implements TransportSession { this.socket.send(JSON.stringify(encryptedMessage)) } + + public async close(): Promise { + if (this.socket.readyState === WebSocket.OPEN) { + this.socket.close() + } + } } diff --git a/tests/transport/SubjectInboundTransport.ts b/tests/transport/SubjectInboundTransport.ts index 39b288b119..10c978654d 100644 --- a/tests/transport/SubjectInboundTransport.ts +++ b/tests/transport/SubjectInboundTransport.ts @@ -64,4 +64,8 @@ export class SubjectTransportSession implements TransportSession { public async send(encryptedMessage: EncryptedMessage): Promise { this.replySubject.next({ message: encryptedMessage }) } + + public async close(): Promise { + this.replySubject.complete() + } } diff --git a/tests/transport/SubjectOutboundTransport.ts b/tests/transport/SubjectOutboundTransport.ts index bd9986baf8..385fcdc08c 100644 --- a/tests/transport/SubjectOutboundTransport.ts +++ b/tests/transport/SubjectOutboundTransport.ts @@ -1,18 +1,14 @@ -import type { Agent, Logger } from '../../packages/core/src' -import type { OutboundTransport } from '../../packages/core/src/transport/OutboundTransport' -import type { OutboundPackage } from '../../packages/core/src/types' import type { SubjectMessage } from './SubjectInboundTransport' -import type { Subscription } from 'rxjs' +import type { OutboundPackage, OutboundTransport, Agent, Logger } from '@aries-framework/core' -import { Subject } from 'rxjs' +import { takeUntil, Subject, take } from 'rxjs' -import { InjectionSymbols, AriesFrameworkError } from '../../packages/core/src' +import { InjectionSymbols, AriesFrameworkError } from '@aries-framework/core' export class SubjectOutboundTransport implements OutboundTransport { private logger!: Logger - private ourSubject = new Subject() - private returnRouteMessageSubscription?: Subscription private subjectMap: { [key: string]: Subject | undefined } + private agent!: Agent public supportedSchemes = ['rxjs'] @@ -21,23 +17,13 @@ export class SubjectOutboundTransport implements OutboundTransport { } public async start(agent: Agent): Promise { + this.agent = agent + this.logger = agent.injectionContainer.resolve(InjectionSymbols.Logger) - this.subscribe(agent) } public async stop(): Promise { - this.returnRouteMessageSubscription?.unsubscribe() - this.ourSubject.complete() - } - - private subscribe(agent: Agent) { - this.returnRouteMessageSubscription = this.ourSubject.subscribe({ - next: async ({ message }: SubjectMessage) => { - this.logger.test('Received message') - - await agent.receiveMessage(message) - }, - }) + // No logic needed } public async sendMessage(outboundPackage: OutboundPackage) { @@ -56,6 +42,19 @@ export class SubjectOutboundTransport implements OutboundTransport { throw new AriesFrameworkError(`No subject found for endpoint ${endpoint}`) } - subject.next({ message: payload, replySubject: this.ourSubject }) + // Create a replySubject just for this session. Both ends will be able to close it, + // mimicking a transport like http or websocket. Close session automatically when agent stops + const replySubject = new Subject() + this.agent.config.stop$.pipe(take(1)).subscribe(() => !replySubject.closed && replySubject.complete()) + + replySubject.pipe(takeUntil(this.agent.config.stop$)).subscribe({ + next: async ({ message }: SubjectMessage) => { + this.logger.test('Received message') + + await this.agent.receiveMessage(message) + }, + }) + + subject.next({ message: payload, replySubject }) } }