Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(desktop): sync tree data to message table #1777

Merged
merged 6 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/components/widgets/TreeNodeInfo.vue
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ import { Component, Vue, Prop } from 'vue-property-decorator'
import { Getter } from 'vuex-class'
import { findSubTopics, findFullTopicPath, isPayloadEmpty } from '@/utils/topicTree'
import Prism from 'prismjs'
import { jsonStringify, jsonParse } from '@/utils/jsonUtils'

@Component
export default class TreeNodeInfo extends Vue {
@Prop() private node!: TopicTreeData
@Prop() private treeData!: TopicTreeData[]
@Prop() private node!: TopicTreeNode
@Prop() private treeData!: TopicTreeNode[]

@Getter('currentTheme') private currentTheme!: Theme

Expand All @@ -89,7 +90,7 @@ export default class TreeNodeInfo extends Vue {
get latestMessage(): string {
const message = this.node.latestMessage || ''
if (this.payloadFormat === 'json') {
return JSON.stringify(JSON.parse(message.toString()), null, 2)
return jsonStringify(jsonParse(message.toString()), null, 2)
}
return message.toString()
}
Expand Down Expand Up @@ -127,11 +128,11 @@ export default class TreeNodeInfo extends Vue {
return fullHost
}

private getSubTopics(node: TopicTreeData): string[] {
private getSubTopics(node: TopicTreeNode): string[] {
return findSubTopics(node)
}

private getFullTopicPath(node: TopicTreeData): string {
private getFullTopicPath(node: TopicTreeNode): string {
const fullPath = findFullTopicPath(this.treeData, node.id)
if (!fullPath) return node.label
return fullPath
Expand Down
20 changes: 10 additions & 10 deletions src/components/widgets/TreeView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
<template #default="{ node, data }">
<span class="custom-tree-node">
<span class="tree-node-info">
<span>{{ node.label }}</span>
<span v-if="data.connectionInfo && data.connectionInfo.name">
&nbsp;- [{{ data.connectionInfo.name }}]
<span>
<span v-if="data.connectionInfo && data.connectionInfo.name">{{ data.connectionInfo.name }}@</span
>{{ node.label }}
</span>
<el-tag v-if="!checkPayloadEmpty(data.latestMessage)" size="mini" class="value-tag ml-2">
{{ data.latestMessage }}
Expand All @@ -62,7 +62,7 @@ import { getAllIDs, isPayloadEmpty } from '@/utils/topicTree'

@Component
export default class TreeView extends Vue {
@Prop({ default: () => [] }) public data!: TopicTreeData[]
@Prop({ default: () => [] }) public data!: TopicTreeNode[]

@Getter('currentTheme') private currentTheme!: Theme

Expand Down Expand Up @@ -95,26 +95,26 @@ export default class TreeView extends Vue {
label: 'label',
}

private filterNode(value: string, data: TopicTreeData) {
private filterNode(value: string, data: TopicTreeNode) {
if (!value) return true
return data.label.toLowerCase().indexOf(value.toLowerCase()) !== -1
}

private handleNodeClick(data: TopicTreeData) {
private handleNodeClick(data: TopicTreeNode) {
this.$emit('node-click', data)
}

private handleNodeExpand(data: TopicTreeData) {
private handleNodeExpand(data: TopicTreeNode) {
if (!this.expandedKeys.includes(data.id)) {
this.expandedKeys.push(data.id)
}
}

private handleNodeCollapse(data: TopicTreeData) {
private handleNodeCollapse(data: TopicTreeNode) {
this.removeExpandedKeysRecursively(data)
}

private removeExpandedKeysRecursively(node: TopicTreeData) {
private removeExpandedKeysRecursively(node: TopicTreeNode) {
const index = this.expandedKeys.indexOf(node.id)
if (index > -1) {
this.expandedKeys.splice(index, 1)
Expand All @@ -129,7 +129,7 @@ export default class TreeView extends Vue {
}

private collapseAll() {
const collapseNode = (node: TopicTreeData) => {
const collapseNode = (node: TopicTreeNode) => {
if (node.children && node.children.length > 0) {
node.children.forEach(collapseNode)
}
Expand Down
11 changes: 8 additions & 3 deletions src/types/global.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Vue from 'vue'
import { TranslateResult } from 'vue-i18n'
import { MqttClient } from 'mqtt'
import { IPublishPacket, MqttClient } from 'mqtt'

declare global {
type $TSFixed = any
Expand Down Expand Up @@ -448,7 +448,7 @@ declare global {

type LogLevel = 'debug' | 'info' | 'warn' | 'error'

interface TopicTreeData {
interface TopicTreeNode {
id: string
label: string
qos?: QoS
Expand All @@ -458,6 +458,11 @@ declare global {
messageCount: number
subTopicCount: number
connectionInfo?: ConnectionModel
children?: TopicTreeData[]
children?: TopicTreeNode[]
}
interface QueuedMessage {
connectionId: string
packet: IPublishPacket
timestamp: number
}
}
71 changes: 71 additions & 0 deletions src/utils/mqttMessageQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { Subject, Observable } from 'rxjs'
import { bufferTime, filter, mergeMap, share } from 'rxjs/operators'
import { IPublishPacket } from 'mqtt-packet/types'

/**
* MessageQueue class for handling MQTT message buffering and processing.
*
* This class provides functionality to queue MQTT messages, buffer them for a specified time,
* and process them in batches. It uses RxJS observables for efficient message handling.
*/
export class MessageQueue {
private messageSubject = new Subject<QueuedMessage>()
private messageObservable: Observable<{ messages: QueuedMessage[]; connectionId: string }>

/**
* Creates a new MessageQueue instance.
* @param bufferTime The time in milliseconds to buffer messages before processing. Default is 500ms.
*/
constructor(private bufferTime: number = 500) {
this.messageObservable = this.initMessageProcessing()
}

/**
* Queues a new MQTT message.
* @param packet The MQTT publish packet to queue.
* @param connectionId The ID of the connection associated with this message.
*/
queueMessage(packet: IPublishPacket, connectionId: string) {
this.messageSubject.next({
connectionId,
packet,
timestamp: Date.now(),
})
}

/**
* Returns the observable for processed message batches.
* @returns An Observable that emits batches of messages grouped by connectionId.
*/
getMessageObservable(): Observable<{ messages: QueuedMessage[]; connectionId: string }> {
return this.messageObservable
}

/**
* Initializes the message processing pipeline.
* @returns An Observable that emits batches of messages grouped by connectionId.
*/
private initMessageProcessing(): Observable<{ messages: QueuedMessage[]; connectionId: string }> {
return this.messageSubject.pipe(
bufferTime(this.bufferTime),
filter((messages) => messages.length > 0),
mergeMap((messages) => {
const groupedMessages = messages.reduce((acc, message) => {
if (!acc[message.connectionId]) {
acc[message.connectionId] = []
}
acc[message.connectionId].push(message)
return acc
}, {} as Record<string, QueuedMessage[]>)

return Object.entries(groupedMessages).map(([connectionId, messages]) => ({
messages,
connectionId,
}))
}),
share(),
)
}
}

export const messageQueue = new MessageQueue()
25 changes: 12 additions & 13 deletions src/utils/topicTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ import time from './time'
/**
* Updates the topic tree data structure based on received MQTT packets.
*
* @param {TopicTreeData[]} currentTree - The current state of the topic tree.
* @param {TopicTreeNode[]} currentTree - The current state of the topic tree.
* @param {Object} rawData - The raw data containing the MQTT packet and connection information.
* @param {IPublishPacket} rawData.packet - The MQTT publish packet.
* @param {ConnectionModel} rawData.connectionInfo - The connection information.
* @returns {TopicTreeData[]} The updated topic tree data structure.
* @returns {TopicTreeNode[]} The updated topic tree data structure.
*/

export function updateTopicTreeData(
currentTree: TopicTreeData[],
export function updateTopicTreeNode(
currentTree: TopicTreeNode[],
rawData: {
packet: IPublishPacket
connectionInfo: ConnectionModel
},
): TopicTreeData[] {
): TopicTreeNode[] {
const { packet, connectionInfo } = rawData
if (packet.cmd !== 'publish') {
return currentTree
Expand Down Expand Up @@ -52,7 +51,7 @@ export function updateTopicTreeData(
for (let i = 0; i < topicLevels.length; i++) {
const level = topicLevels[i]
const childIndex = currentNode.children?.findIndex((n) => n.label === level) ?? -1
let childNode: TopicTreeData
let childNode: TopicTreeNode

if (childIndex === -1) {
currentId = `${currentId}-${currentNode.children?.length ?? 0 + 1}`
Expand Down Expand Up @@ -100,7 +99,7 @@ export function updateTopicTreeData(
* @param node - The current node in the topic tree to update.
* @returns The total number of subtopics for the current node and its children.
*/
export function updateSubTopicCounts(node: TopicTreeData): number {
export function updateSubTopicCounts(node: TopicTreeNode): number {
if (!node.children || node.children.length === 0) {
node.subTopicCount = 0
return 1
Expand All @@ -121,7 +120,7 @@ export function updateSubTopicCounts(node: TopicTreeData): number {
* @param isRoot - A boolean indicating whether the current node is the root node.
* @returns An array of strings representing the subtopics.
*/
export function findSubTopics(node: TopicTreeData, isRoot: boolean = true): string[] {
export function findSubTopics(node: TopicTreeNode, isRoot: boolean = true): string[] {
let subTopics: string[] = []
if (!isRoot && node.label) {
subTopics.push(node.label)
Expand All @@ -141,8 +140,8 @@ export function findSubTopics(node: TopicTreeData, isRoot: boolean = true): stri
* @param targetId - The ID of the node to find the full path for.
* @returns The full topic path as a string, or null if the node is not found.
*/
export function findFullTopicPath(treeData: TopicTreeData[], targetId: string): string | null {
function findPath(node: TopicTreeData, currentPath: string[]): string[] | null {
export function findFullTopicPath(treeData: TopicTreeNode[], targetId: string): string | null {
function findPath(node: TopicTreeNode, currentPath: string[]): string[] | null {
if (node.id === targetId) {
return currentPath
}
Expand Down Expand Up @@ -170,10 +169,10 @@ export function findFullTopicPath(treeData: TopicTreeData[], targetId: string):
/**
* Retrieves all IDs from the given topic tree nodes and their children.
*
* @param nodes - An array of TopicTreeData representing the topic tree nodes.
* @param nodes - An array of TopicTreeNode representing the topic tree nodes.
* @returns An array of strings containing all IDs from the nodes and their children.
*/
export function getAllIDs(nodes: TopicTreeData[]): string[] {
export function getAllIDs(nodes: TopicTreeNode[]): string[] {
let ids: string[] = []
for (const node of nodes) {
ids.push(node.id)
Expand Down
34 changes: 20 additions & 14 deletions src/views/connections/ConnectionsDetail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ export default class ConnectionsDetail extends Vue {
clearInterval(this.sendTimeId)
this.sendTimeId = null
this.$message.success(this.$tc('connections.stopTimedMessage'))
this.$log.info(`Timed messages sending stopped for ${this.record.name}`)
this.$log.info(`Timed messages sending stopped for ${this.record.name}@${this.record.host}`)
}
}

Expand Down Expand Up @@ -977,7 +977,7 @@ export default class ConnectionsDetail extends Vue {
if (this.record.id) {
const { messageService } = useServices()
await messageService.cleanInConnection(this.record.id)
this.$log.info(`History connection messages were cleaned for ${this.record.name}`)
this.$log.info(`History connection messages were cleaned for ${this.record.name}@${this.record.host}`)
}
}

Expand Down Expand Up @@ -1085,7 +1085,7 @@ export default class ConnectionsDetail extends Vue {
})
this.setShowClientInfo(false)
this.$emit('reload', false, false, this.handleReSubTopics)
this.$log.info(`Successful connection for ${this.record.name}, MQTT.js onConnect trigger`)
this.$log.info(`Successful connection for ${this.record.name}@${this.record.host}, MQTT.js onConnect trigger`)
}

// Error callback
Expand All @@ -1096,7 +1096,9 @@ export default class ConnectionsDetail extends Vue {
}
this.forceCloseTheConnection()
this.notifyMsgWithCopilot(msgTitle)
this.$log.error(`Connection for ${this.record.name} failed, MQTT.js onError trigger, Error: ${error.stack}`)
this.$log.error(
`Connection for ${this.record.name}@${this.record.host} failed, MQTT.js onError trigger, Error: ${error.stack}`,
)
this.$emit('reload')
}

Expand All @@ -1119,7 +1121,7 @@ export default class ConnectionsDetail extends Vue {
this.forceCloseTheConnection()
} else {
this.$log.info(
`Retrying connection for ${this.record.name}, attempt: [${this.reTryConnectTimes}/${this.maxReconnectTimes}]`,
`Retrying connection for ${this.record.name}@${this.record.host}, attempt: [${this.reTryConnectTimes}/${this.maxReconnectTimes}]`,
)
this.connectLoading = true
this.$notify({
Expand All @@ -1135,7 +1137,7 @@ export default class ConnectionsDetail extends Vue {

// Close connection callback
private onClose() {
this.$log.info(`Connection for ${this.record.name} closed, MQTT.js onClose trigger`)
this.$log.info(`Connection for ${this.record.name}@${this.record.host} closed, MQTT.js onClose trigger`)
this.connectLoading = false
}

Expand Down Expand Up @@ -1221,7 +1223,9 @@ export default class ConnectionsDetail extends Vue {
if (this.client.reconnecting && this.client.connected === false) {
this.client.reconnecting = false
this.forceCloseTheConnection()
this.$log.warn(`MQTTX force stopped reconnecting for ${this.record.name} (Client ID: ${this.record.clientId})`)
this.$log.warn(
`MQTTX force stopped reconnecting for ${this.record.name}@${this.record.host} - Client ID: ${this.record.clientId}`,
)
}
}

Expand Down Expand Up @@ -1258,14 +1262,14 @@ export default class ConnectionsDetail extends Vue {
}
this.setScript({ currentScript })
this.$message.success(this.$tc('script.startScript'))
this.$log.info(`Script set successfully for ${this.record.name}`)
this.$log.info(`Script set successfully for ${this.record.name}@${this.record.host}`)
}

// Remove script
private removeScript() {
this.setScript({ currentScript: null })
this.$message.success(this.$tc('script.stopScirpt'))
this.$log.info(`Script removed successfully from ${this.record.name}`)
this.$log.info(`Script removed successfully from ${this.record.name}@${this.record.host}`)
}

/*
Expand Down Expand Up @@ -1386,9 +1390,11 @@ export default class ConnectionsDetail extends Vue {
const isFromActiveTopic = this.msgType !== 'publish' && this.activeTopic && isActiveTopicMessages
const isFromNotActiveTopic = this.msgType !== 'publish' && !this.activeTopic
if (isFromActiveTopic || isFromNotActiveTopic) {
let receivedLog = `Message arrived for ${this.record.name} with topic: "${topic}". Message ID: "${
message.id
}", payload: ${jsonStringify(message.payload)}. MQTT.js onMessageArrived trigger`
let receivedLog = `Message arrived for ${this.record.name}@${
this.record.host
} with topic: "${topic}". Message ID: "${message.id}", payload: ${jsonStringify(
message.payload,
)}. MQTT.js onMessageArrived trigger`
this.$log.info(receivedLog)
}
} else {
Expand Down Expand Up @@ -1550,7 +1556,7 @@ export default class ConnectionsDetail extends Vue {
private notifyTimedMessageSuccess() {
this.$message.success(`${this.$t('connections.startTimedMessage')}${this.sendFrequency}`)
this.$log.info(
`Timed message for ${this.record.name} started successfully with a frequency of ${this.sendFrequency} seconds.`,
`Timed message for ${this.record.name}@${this.record.host} started successfully with a frequency of ${this.sendFrequency} seconds.`,
)
}

Expand Down Expand Up @@ -1686,7 +1692,7 @@ export default class ConnectionsDetail extends Vue {
this.notifyMsgWithCopilot(errorMsg)
this.stopTimedSend()
this.$log.error(
`Failed to publish message for ${this.record.name}. Error: ${errorMsg}. Stack trace: ${error.stack}`,
`Failed to publish message for ${this.record.name}@${this.record.host}. Error: ${errorMsg}. Stack trace: ${error.stack}`,
)
}

Expand Down
Loading
Loading