From 0b4b8dd42117eb8e92fcc4be695ff149b49a06c7 Mon Sep 17 00:00:00 2001 From: Ariel Gentile Date: Wed, 10 Apr 2024 12:18:53 -0300 Subject: [PATCH] feat: queued messages reception time (#1824) Signed-off-by: Ariel Gentile --- packages/core/src/agent/Agent.ts | 1 + packages/core/src/agent/Dispatcher.ts | 1 + packages/core/src/agent/Events.ts | 2 ++ packages/core/src/agent/MessageReceiver.ts | 21 +++++++++++++------ .../src/agent/models/InboundMessageContext.ts | 3 +++ .../protocol/v2/V2MessagePickupProtocol.ts | 3 +++ .../message-pickup/storage/QueuedMessage.ts | 7 +++++++ 7 files changed, 32 insertions(+), 6 deletions(-) diff --git a/packages/core/src/agent/Agent.ts b/packages/core/src/agent/Agent.ts index eb7d87cbaf..8179d7bef5 100644 --- a/packages/core/src/agent/Agent.ts +++ b/packages/core/src/agent/Agent.ts @@ -157,6 +157,7 @@ export class Agent extends BaseAge .receiveMessage(e.payload.message, { connection: e.payload.connection, contextCorrelationId: e.payload.contextCorrelationId, + receivedAt: e.payload.receivedAt, }) .catch((error) => { this.logger.error('Failed to process message', { error }) diff --git a/packages/core/src/agent/Dispatcher.ts b/packages/core/src/agent/Dispatcher.ts index bea9e99422..2c3c86769e 100644 --- a/packages/core/src/agent/Dispatcher.ts +++ b/packages/core/src/agent/Dispatcher.ts @@ -97,6 +97,7 @@ class Dispatcher { payload: { message, connection, + receivedAt: messageContext.receivedAt, }, }) } diff --git a/packages/core/src/agent/Events.ts b/packages/core/src/agent/Events.ts index 0eecc21fe4..8a889a237c 100644 --- a/packages/core/src/agent/Events.ts +++ b/packages/core/src/agent/Events.ts @@ -33,6 +33,7 @@ export interface AgentMessageReceivedEvent extends BaseEvent { message: unknown connection?: ConnectionRecord contextCorrelationId?: string + receivedAt?: Date } } @@ -41,6 +42,7 @@ export interface AgentMessageProcessedEvent extends BaseEvent { payload: { message: AgentMessage connection?: ConnectionRecord + receivedAt?: Date } } diff --git a/packages/core/src/agent/MessageReceiver.ts b/packages/core/src/agent/MessageReceiver.ts index 65c0721873..19fca86a8b 100644 --- a/packages/core/src/agent/MessageReceiver.ts +++ b/packages/core/src/agent/MessageReceiver.ts @@ -82,7 +82,13 @@ export class MessageReceiver { session, connection, contextCorrelationId, - }: { session?: TransportSession; connection?: ConnectionRecord; contextCorrelationId?: string } = {} + receivedAt, + }: { + session?: TransportSession + connection?: ConnectionRecord + contextCorrelationId?: string + receivedAt?: Date + } = {} ) { this.logger.debug(`Agent received message`) @@ -93,9 +99,9 @@ export class MessageReceiver { try { if (this.isEncryptedMessage(inboundMessage)) { - await this.receiveEncryptedMessage(agentContext, inboundMessage as EncryptedMessage, session) + await this.receiveEncryptedMessage(agentContext, inboundMessage as EncryptedMessage, session, receivedAt) } else if (this.isPlaintextMessage(inboundMessage)) { - await this.receivePlaintextMessage(agentContext, inboundMessage, connection) + await this.receivePlaintextMessage(agentContext, inboundMessage, connection, receivedAt) } else { throw new CredoError('Unable to parse incoming message: unrecognized format') } @@ -108,17 +114,19 @@ export class MessageReceiver { private async receivePlaintextMessage( agentContext: AgentContext, plaintextMessage: PlaintextMessage, - connection?: ConnectionRecord + connection?: ConnectionRecord, + receivedAt?: Date ) { const message = await this.transformAndValidate(agentContext, plaintextMessage) - const messageContext = new InboundMessageContext(message, { connection, agentContext }) + const messageContext = new InboundMessageContext(message, { connection, agentContext, receivedAt }) await this.dispatcher.dispatch(messageContext) } private async receiveEncryptedMessage( agentContext: AgentContext, encryptedMessage: EncryptedMessage, - session?: TransportSession + session?: TransportSession, + receivedAt?: Date ) { const decryptedMessage = await this.decryptMessage(agentContext, encryptedMessage) const { plaintextMessage, senderKey, recipientKey } = decryptedMessage @@ -140,6 +148,7 @@ export class MessageReceiver { senderKey, recipientKey, agentContext, + receivedAt, }) // We want to save a session if there is a chance of returning outbound message via inbound transport. diff --git a/packages/core/src/agent/models/InboundMessageContext.ts b/packages/core/src/agent/models/InboundMessageContext.ts index c205091fd9..03bfef54f3 100644 --- a/packages/core/src/agent/models/InboundMessageContext.ts +++ b/packages/core/src/agent/models/InboundMessageContext.ts @@ -11,6 +11,7 @@ export interface MessageContextParams { senderKey?: Key recipientKey?: Key agentContext: AgentContext + receivedAt?: Date } export class InboundMessageContext { @@ -19,6 +20,7 @@ export class InboundMessageContext { public sessionId?: string public senderKey?: Key public recipientKey?: Key + public receivedAt: Date public readonly agentContext: AgentContext public constructor(message: T, context: MessageContextParams) { @@ -28,6 +30,7 @@ export class InboundMessageContext { this.connection = context.connection this.sessionId = context.sessionId this.agentContext = context.agentContext + this.receivedAt = context.receivedAt ?? new Date() } /** diff --git a/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts b/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts index f42fd5d126..3c2470c34a 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts @@ -118,6 +118,7 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { (msg) => new Attachment({ id: msg.id, + lastmodTime: msg.receivedAt, data: { json: msg.encryptedMessage, }, @@ -190,6 +191,7 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { (msg) => new Attachment({ id: msg.id, + lastmodTime: msg.receivedAt, data: { json: msg.encryptedMessage, }, @@ -323,6 +325,7 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { payload: { message: attachment.getDataAsJson(), contextCorrelationId: messageContext.agentContext.contextCorrelationId, + receivedAt: attachment.lastmodTime, }, }) } diff --git a/packages/core/src/modules/message-pickup/storage/QueuedMessage.ts b/packages/core/src/modules/message-pickup/storage/QueuedMessage.ts index b554e08184..1c22dfdf69 100644 --- a/packages/core/src/modules/message-pickup/storage/QueuedMessage.ts +++ b/packages/core/src/modules/message-pickup/storage/QueuedMessage.ts @@ -1,6 +1,13 @@ import type { EncryptedMessage } from '../../../types' +/** + * Basic representation of an encrypted message in a Message Pickup Queue + * - id: Message Pickup repository's specific queued message id (unrelated to DIDComm message id) + * - receivedAt: reception time (i.e. time when the message has been added to the queue) + * - encryptedMessage: packed message + */ export type QueuedMessage = { id: string + receivedAt?: Date encryptedMessage: EncryptedMessage }