From 00212eaa8ef610073df9dfc589c234252e7bf239 Mon Sep 17 00:00:00 2001 From: YuShifan <894402575bt@gmail.com> Date: Wed, 16 Oct 2024 18:22:14 +0800 Subject: [PATCH 1/6] feat(desktop): sync tree data to message table --- src/utils/messageQueue.ts | 66 ++++++++++++++++++++++++++++++++++ src/utils/topicTree.ts | 1 - src/views/viewer/TopicTree.vue | 31 ++++++++++++---- 3 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 src/utils/messageQueue.ts diff --git a/src/utils/messageQueue.ts b/src/utils/messageQueue.ts new file mode 100644 index 000000000..6fb764f40 --- /dev/null +++ b/src/utils/messageQueue.ts @@ -0,0 +1,66 @@ +import { Subject, timer, Observable } from 'rxjs' +import { buffer, filter, mergeMap, share } from 'rxjs/operators' +import { IPublishPacket } from 'mqtt-packet/types' +import { getNowDate } from '@/utils/time' +import { getMessageId } from '@/utils/idGenerator' + +export class MessageQueue { + private messageSubject = new Subject<{ packet: IPublishPacket; id: string }>() + private messageObservable: Observable<{ messages: MessageModel[]; connectionId: string }> + private processedMessages = new Set() + + constructor(private bufferTime: number = 1000, private maxBufferSize: number = 100) { + this.messageObservable = this.initMessageProcessing() + } + + queueMessage(packet: IPublishPacket, id: string) { + this.messageSubject.next({ packet, id }) + } + + public getMessageObservable(): Observable<{ messages: MessageModel[]; connectionId: string }> { + return this.messageObservable + } + + private initMessageProcessing(): Observable<{ messages: MessageModel[]; connectionId: string }> { + return this.messageSubject.pipe( + buffer(timer(0, this.bufferTime)), + filter((messages) => messages.length > 0), + mergeMap((messages) => { + const groupedMessages = new Map() + + messages.slice(0, this.maxBufferSize).forEach(({ packet, id }) => { + const messageKey = `${id}-${packet.topic}-${packet.payload}` + if (!this.processedMessages.has(messageKey)) { + if (!groupedMessages.has(id)) { + groupedMessages.set(id, []) + } + groupedMessages.get(id)!.push({ + id: getMessageId(), + topic: packet.topic, + payload: packet.payload.toString(), + qos: packet.qos, + retain: packet.retain, + out: false, + createAt: getNowDate(), + properties: packet.properties, + }) + this.processedMessages.add(messageKey) + } + }) + + if (this.processedMessages.size > 10000) { + const oldestEntries = Array.from(this.processedMessages).slice(0, 5000) + oldestEntries.forEach((entry) => this.processedMessages.delete(entry)) + } + + return Array.from(groupedMessages.entries()).map(([connectionId, messages]) => ({ + messages, + connectionId, + })) + }), + share(), + ) + } +} + +export const messageQueue = new MessageQueue() diff --git a/src/utils/topicTree.ts b/src/utils/topicTree.ts index bebcc54a4..71574b2a0 100644 --- a/src/utils/topicTree.ts +++ b/src/utils/topicTree.ts @@ -11,7 +11,6 @@ import time from './time' * @param {ConnectionModel} rawData.connectionInfo - The connection information. * @returns {TopicTreeData[]} The updated topic tree data structure. */ - export function updateTopicTreeData( currentTree: TopicTreeData[], rawData: { diff --git a/src/views/viewer/TopicTree.vue b/src/views/viewer/TopicTree.vue index 2b8c86df0..345fd9422 100644 --- a/src/views/viewer/TopicTree.vue +++ b/src/views/viewer/TopicTree.vue @@ -23,6 +23,9 @@ import TreeView from '@/components/widgets/TreeView.vue' import { updateTopicTreeData } from '@/utils/topicTree' import { IPublishPacket } from 'mqtt-packet/types' import TreeNodeInfo from '@/components/widgets/TreeNodeInfo.vue' +import { ignoreQoS0Message } from '@/utils/mqttUtils' +import { messageQueue } from '@/utils/messageQueue' +import useServices from '@/database/useServices' @Component({ components: { @@ -40,19 +43,35 @@ export default class TopicTree extends Vue { packet, connectionInfo, }) + this.queueMessage(packet, connectionInfo.id as string) } - created() { - globalEventBus.on('packetReceive', this.handlePacketReceive) - } - - beforeDestroy() { - globalEventBus.off('packetReceive', this.handlePacketReceive) + private queueMessage(packet: IPublishPacket, id: string) { + if (packet.cmd !== 'publish' || ignoreQoS0Message(packet.qos)) return + messageQueue.queueMessage(packet, id) } private handleNodeClick(data: TopicTreeData) { this.selectedNode = data } + + private created() { + globalEventBus.on('packetReceive', this.handlePacketReceive) + const { messageService } = useServices() + messageQueue.getMessageObservable().subscribe(async ({ messages, connectionId }) => { + if (messages.length === 0) return + try { + await messageService.importMsgsToConnection(messages, connectionId) + this.$log.info(`Topic Tree: Processed ${messages.length} messages for connection ${connectionId}`) + } catch (error) { + this.$log.error(`Topic Tree: Error processing messages: ${(error as Error).toString()}`) + } + }) + } + + private beforeDestroy() { + globalEventBus.off('packetReceive', this.handlePacketReceive) + } } From c7ba20a7ff081e449cad1178102504048fccc22a Mon Sep 17 00:00:00 2001 From: YuShifan <894402575bt@gmail.com> Date: Thu, 17 Oct 2024 01:04:44 +0800 Subject: [PATCH 2/6] feat(desktop): update mqtt message queue --- src/types/global.d.ts | 7 +++- src/utils/messageQueue.ts | 66 ------------------------------- src/utils/mqttMessageQueue.ts | 71 ++++++++++++++++++++++++++++++++++ src/views/viewer/TopicTree.vue | 43 ++++++++++++++++---- 4 files changed, 113 insertions(+), 74 deletions(-) delete mode 100644 src/utils/messageQueue.ts create mode 100644 src/utils/mqttMessageQueue.ts diff --git a/src/types/global.d.ts b/src/types/global.d.ts index 34a862998..5e12456b7 100644 --- a/src/types/global.d.ts +++ b/src/types/global.d.ts @@ -1,6 +1,6 @@ import Vue from 'vue' import { TranslateResult } from 'vue-i18n' -import { MqttClient } from 'mqtt' +import { IPublishPacket, MqttClient } from 'mqtt' declare global { type $TSFixed = any @@ -460,4 +460,9 @@ declare global { connectionInfo?: ConnectionModel children?: TopicTreeData[] } + interface QueuedMessage { + connectionId: string + packet: IPublishPacket + timestamp: number + } } diff --git a/src/utils/messageQueue.ts b/src/utils/messageQueue.ts deleted file mode 100644 index 6fb764f40..000000000 --- a/src/utils/messageQueue.ts +++ /dev/null @@ -1,66 +0,0 @@ -import { Subject, timer, Observable } from 'rxjs' -import { buffer, filter, mergeMap, share } from 'rxjs/operators' -import { IPublishPacket } from 'mqtt-packet/types' -import { getNowDate } from '@/utils/time' -import { getMessageId } from '@/utils/idGenerator' - -export class MessageQueue { - private messageSubject = new Subject<{ packet: IPublishPacket; id: string }>() - private messageObservable: Observable<{ messages: MessageModel[]; connectionId: string }> - private processedMessages = new Set() - - constructor(private bufferTime: number = 1000, private maxBufferSize: number = 100) { - this.messageObservable = this.initMessageProcessing() - } - - queueMessage(packet: IPublishPacket, id: string) { - this.messageSubject.next({ packet, id }) - } - - public getMessageObservable(): Observable<{ messages: MessageModel[]; connectionId: string }> { - return this.messageObservable - } - - private initMessageProcessing(): Observable<{ messages: MessageModel[]; connectionId: string }> { - return this.messageSubject.pipe( - buffer(timer(0, this.bufferTime)), - filter((messages) => messages.length > 0), - mergeMap((messages) => { - const groupedMessages = new Map() - - messages.slice(0, this.maxBufferSize).forEach(({ packet, id }) => { - const messageKey = `${id}-${packet.topic}-${packet.payload}` - if (!this.processedMessages.has(messageKey)) { - if (!groupedMessages.has(id)) { - groupedMessages.set(id, []) - } - groupedMessages.get(id)!.push({ - id: getMessageId(), - topic: packet.topic, - payload: packet.payload.toString(), - qos: packet.qos, - retain: packet.retain, - out: false, - createAt: getNowDate(), - properties: packet.properties, - }) - this.processedMessages.add(messageKey) - } - }) - - if (this.processedMessages.size > 10000) { - const oldestEntries = Array.from(this.processedMessages).slice(0, 5000) - oldestEntries.forEach((entry) => this.processedMessages.delete(entry)) - } - - return Array.from(groupedMessages.entries()).map(([connectionId, messages]) => ({ - messages, - connectionId, - })) - }), - share(), - ) - } -} - -export const messageQueue = new MessageQueue() diff --git a/src/utils/mqttMessageQueue.ts b/src/utils/mqttMessageQueue.ts new file mode 100644 index 000000000..d392d597b --- /dev/null +++ b/src/utils/mqttMessageQueue.ts @@ -0,0 +1,71 @@ +import { Subject, Observable } from 'rxjs' +import { bufferTime, filter, mergeMap, share } from 'rxjs/operators' +import { IPublishPacket } from 'mqtt-packet/types' + +/** + * MessageQueue class for handling MQTT message buffering and processing. + * + * This class provides functionality to queue MQTT messages, buffer them for a specified time, + * and process them in batches. It uses RxJS observables for efficient message handling. + */ +export class MessageQueue { + private messageSubject = new Subject() + private messageObservable: Observable<{ messages: QueuedMessage[]; connectionId: string }> + + /** + * Creates a new MessageQueue instance. + * @param bufferTime The time in milliseconds to buffer messages before processing. Default is 500ms. + */ + constructor(private bufferTime: number = 500) { + this.messageObservable = this.initMessageProcessing() + } + + /** + * Queues a new MQTT message. + * @param packet The MQTT publish packet to queue. + * @param connectionId The ID of the connection associated with this message. + */ + queueMessage(packet: IPublishPacket, connectionId: string) { + this.messageSubject.next({ + connectionId, + packet, + timestamp: Date.now(), + }) + } + + /** + * Returns the observable for processed message batches. + * @returns An Observable that emits batches of messages grouped by connectionId. + */ + getMessageObservable(): Observable<{ messages: QueuedMessage[]; connectionId: string }> { + return this.messageObservable + } + + /** + * Initializes the message processing pipeline. + * @returns An Observable that emits batches of messages grouped by connectionId. + */ + private initMessageProcessing(): Observable<{ messages: QueuedMessage[]; connectionId: string }> { + return this.messageSubject.pipe( + bufferTime(this.bufferTime), + filter((messages) => messages.length > 0), + mergeMap((messages) => { + const groupedMessages = messages.reduce((acc, message) => { + if (!acc[message.connectionId]) { + acc[message.connectionId] = [] + } + acc[message.connectionId].push(message) + return acc + }, {} as Record) + + return Object.entries(groupedMessages).map(([connectionId, messages]) => ({ + messages, + connectionId, + })) + }), + share(), + ) + } +} + +export const messageQueue = new MessageQueue() diff --git a/src/views/viewer/TopicTree.vue b/src/views/viewer/TopicTree.vue index 345fd9422..d170c3c63 100644 --- a/src/views/viewer/TopicTree.vue +++ b/src/views/viewer/TopicTree.vue @@ -24,8 +24,11 @@ import { updateTopicTreeData } from '@/utils/topicTree' import { IPublishPacket } from 'mqtt-packet/types' import TreeNodeInfo from '@/components/widgets/TreeNodeInfo.vue' import { ignoreQoS0Message } from '@/utils/mqttUtils' -import { messageQueue } from '@/utils/messageQueue' +import { messageQueue } from '@/utils/mqttMessageQueue' import useServices from '@/database/useServices' +import { getMessageId } from '@/utils/idGenerator' +import { Subscription } from 'rxjs' +import { getNowDate } from '@/utils/time' @Component({ components: { @@ -38,6 +41,12 @@ export default class TopicTree extends Vue { private selectedNode: TopicTreeData | null = null + private subscription: Subscription | null = null + + private handleNodeClick(data: TopicTreeData) { + this.selectedNode = data + } + private handlePacketReceive(packet: IPublishPacket, connectionInfo: ConnectionModel) { this.data = updateTopicTreeData(this.data, { packet, @@ -51,17 +60,26 @@ export default class TopicTree extends Vue { messageQueue.queueMessage(packet, id) } - private handleNodeClick(data: TopicTreeData) { - this.selectedNode = data + private generateMessage(m: QueuedMessage): MessageModel { + return { + id: getMessageId(), + topic: m.packet.topic, + payload: m.packet.payload.toString(), + qos: m.packet.qos, + retain: m.packet.retain, + out: false, + createAt: getNowDate(), + properties: m.packet.properties, + } } - private created() { - globalEventBus.on('packetReceive', this.handlePacketReceive) + private async storeMessages() { const { messageService } = useServices() - messageQueue.getMessageObservable().subscribe(async ({ messages, connectionId }) => { + this.subscription = messageQueue.getMessageObservable().subscribe(async ({ messages, connectionId }) => { if (messages.length === 0) return try { - await messageService.importMsgsToConnection(messages, connectionId) + const processedMessages = messages.map((m) => this.generateMessage(m)) + await messageService.importMsgsToConnection(processedMessages, connectionId) this.$log.info(`Topic Tree: Processed ${messages.length} messages for connection ${connectionId}`) } catch (error) { this.$log.error(`Topic Tree: Error processing messages: ${(error as Error).toString()}`) @@ -69,8 +87,19 @@ export default class TopicTree extends Vue { }) } + private created() { + globalEventBus.on('packetReceive', this.handlePacketReceive) + if (!this.subscription) { + this.storeMessages() + } + } + private beforeDestroy() { globalEventBus.off('packetReceive', this.handlePacketReceive) + if (this.subscription) { + this.subscription.unsubscribe() + this.subscription = null + } } } From fe820fe30c5dd10fede4070cdd1b31be5f1b3a1d Mon Sep 17 00:00:00 2001 From: YuShifan <894402575bt@gmail.com> Date: Thu, 17 Oct 2024 01:18:39 +0800 Subject: [PATCH 3/6] feat(desktop): log info improvement --- src/components/widgets/TreeView.vue | 6 ++-- src/views/connections/ConnectionsDetail.vue | 34 ++++++++++++--------- src/views/viewer/TopicTree.vue | 7 +++-- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/components/widgets/TreeView.vue b/src/components/widgets/TreeView.vue index a5978627b..3c6b54161 100644 --- a/src/components/widgets/TreeView.vue +++ b/src/components/widgets/TreeView.vue @@ -36,9 +36,9 @@