Skip to content

Commit

Permalink
fix: log and throw on WebSocket sending errors (#1573)
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Vergnaud <[email protected]>
  • Loading branch information
ericvergnaud authored Sep 19, 2023
1 parent 9ee2ce7 commit 11050af
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 12 deletions.
3 changes: 2 additions & 1 deletion packages/core/src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ export class MessageSender {
}

private async sendMessageToSession(agentContext: AgentContext, session: TransportSession, message: AgentMessage) {
this.logger.debug(`Existing ${session.type} transport session has been found.`)
this.logger.debug(`Packing message and sending it via existing session ${session.type}...`)
if (!session.keys) {
throw new AriesFrameworkError(`There are no keys for the given ${session.type} transport session.`)
}
const encryptedMessage = await this.envelopeService.packMessage(agentContext, message, session.keys)
this.logger.debug('Sending message')
await session.send(agentContext, encryptedMessage)
}

Expand Down
9 changes: 7 additions & 2 deletions packages/core/src/modules/message-pìckup/MessagePickupApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import { MessageSender } from '../../agent/MessageSender'
import { OutboundMessageContext } from '../../agent/models'
import { InjectionSymbols } from '../../constants'
import { AriesFrameworkError } from '../../error'
import { injectable } from '../../plugins'
import { Logger } from '../../logger/Logger'
import { inject, injectable } from '../../plugins'
import { ConnectionService } from '../connections/services'

import { MessagePickupModuleConfig } from './MessagePickupModuleConfig'
Expand All @@ -32,17 +33,20 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP
private messageSender: MessageSender
private agentContext: AgentContext
private connectionService: ConnectionService
private logger: Logger

public constructor(
messageSender: MessageSender,
agentContext: AgentContext,
connectionService: ConnectionService,
config: MessagePickupModuleConfig<MPPs>
config: MessagePickupModuleConfig<MPPs>,
@inject(InjectionSymbols.Logger) logger: Logger
) {
this.messageSender = messageSender
this.connectionService = connectionService
this.agentContext = agentContext
this.config = config
this.logger = logger
}

private getProtocol<MPP extends MPPs[number]['version']>(protocolVersion: MPP): MessagePickupProtocol {
Expand All @@ -61,6 +65,7 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP
* @param options: connectionId associated to the message and the encrypted message itself
*/
public async queueMessage(options: QueueMessageOptions): Promise<QueueMessageReturnType> {
this.logger.debug('Queuing message...')
const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId)

const messageRepository = this.agentContext.dependencyManager.resolve<MessageRepository>(
Expand Down
6 changes: 3 additions & 3 deletions packages/core/tests/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class TestLogger extends BaseLogger {
} as const

// Map our log levels to tslog levels
private tsLogLevelNumgerMap = {
private tsLogLevelNumberMap = {
[LogLevel.test]: 0,
[LogLevel.trace]: 1,
[LogLevel.debug]: 2,
Expand All @@ -48,12 +48,12 @@ export class TestLogger extends BaseLogger {
if (logger) {
this.logger = logger.getSubLogger({
name,
minLevel: this.logLevel == LogLevel.off ? undefined : this.tsLogLevelNumgerMap[this.logLevel],
minLevel: this.logLevel == LogLevel.off ? undefined : this.tsLogLevelNumberMap[this.logLevel],
})
} else {
this.logger = new Logger({
name,
minLevel: this.logLevel == LogLevel.off ? undefined : this.tsLogLevelNumgerMap[this.logLevel],
minLevel: this.logLevel == LogLevel.off ? undefined : this.tsLogLevelNumberMap[this.logLevel],
attachedTransports: [logToTransport],
})
}
Expand Down
19 changes: 13 additions & 6 deletions packages/node/src/transport/WsInboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class WsInboundTransport implements InboundTransport {
if (!this.socketIds[socketId]) {
this.logger.debug(`Saving new socket with id ${socketId}.`)
this.socketIds[socketId] = socket
const session = new WebSocketTransportSession(socketId, socket)
const session = new WebSocketTransportSession(socketId, socket, this.logger)
this.listenOnWebSocketMessages(agent, socket, session)
socket.on('close', () => {
this.logger.debug('Socket closed.')
Expand All @@ -58,7 +58,6 @@ export class WsInboundTransport implements InboundTransport {
if (error) {
reject(error)
}

resolve()
})
})
Expand All @@ -73,7 +72,7 @@ export class WsInboundTransport implements InboundTransport {
try {
await messageReceiver.receiveMessage(JSON.parse(event.data), { session })
} catch (error) {
this.logger.error('Error processing message')
this.logger.error(`Error processing message: ${error}`)
}
})
}
Expand All @@ -83,18 +82,26 @@ export class WebSocketTransportSession implements TransportSession {
public id: string
public readonly type = 'WebSocket'
public socket: WebSocket
private logger: Logger

public constructor(id: string, socket: WebSocket) {
public constructor(id: string, socket: WebSocket, logger: Logger) {
this.id = id
this.socket = socket
this.logger = logger
}

public async send(agentContext: AgentContext, encryptedMessage: EncryptedMessage): Promise<void> {
if (this.socket.readyState !== WebSocket.OPEN) {
throw new AriesFrameworkError(`${this.type} transport session has been closed.`)
}

this.socket.send(JSON.stringify(encryptedMessage))
this.socket.send(JSON.stringify(encryptedMessage), (error?) => {
if (error != undefined) {
this.logger.debug(`Error sending message: ${error}`)
throw new AriesFrameworkError(`${this.type} send message failed.`, { cause: error })
} else {
this.logger.debug(`${this.type} sent message successfully.`)
}
})
}

public async close(): Promise<void> {
Expand Down

0 comments on commit 11050af

Please sign in to comment.