Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist Topic Tree Data #1778

Merged
merged 8 commits into from
Oct 22, 2024
Merged
18 changes: 10 additions & 8 deletions src/components/widgets/TreeNodeInfo.vue
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
</div>
</div>
<!-- Topic node without payload -->
<div v-else-if="checkPayloadEmpty(node.latestMessage)">
<div v-else-if="!node.message || (node.message && checkPayloadEmpty(node.message.payload))">
<div>{{ $t('connections.fullTopic') }}</div>
<el-tooltip
:effect="currentTheme !== 'light' ? 'light' : 'dark'"
Expand Down Expand Up @@ -57,10 +57,12 @@
<div ref="topicPath" class="node-info-item ellipsis">{{ fullTopicPath }}</div>
</el-tooltip>
<div>{{ $t('connections.receivedTime') }}</div>
<div class="node-info-item">{{ node.time }}</div>
<div v-if="node.message" class="node-info-item">{{ node.message.createAt }}</div>
<div class="flex justify-between">
<span>Payload <el-tag v-if="node.retain" type="info" size="mini">Retained</el-tag></span>
<span>QoS: {{ node.qos }}</span>
<span
>Payload <el-tag v-if="node.message && node.message.retain" type="info" size="mini">Retained</el-tag></span
>
<span v-if="node.message">QoS: {{ node.message.qos }}</span>
</div>
<pre
class="payload-container mt-2 mb-2"
Expand Down Expand Up @@ -88,16 +90,16 @@ export default class TreeNodeInfo extends Vue {
}

get latestMessage(): string {
const message = this.node.latestMessage || ''
const payload = this.node.message?.payload || ''
if (this.payloadFormat === 'json') {
return jsonStringify(jsonParse(message.toString()), null, 2)
return jsonStringify(jsonParse(payload.toString()), null, 2)
}
return message.toString()
return payload.toString()
}

get payloadFormat(): string {
try {
const message = this.node.latestMessage || ''
const message = this.node.message?.payload || ''
JSON.parse(message.toString())
return 'json'
} catch (e) {
Expand Down
4 changes: 2 additions & 2 deletions src/components/widgets/TreeView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
<span v-if="data.connectionInfo && data.connectionInfo.name">{{ data.connectionInfo.name }}@</span
>{{ node.label }}
</span>
<el-tag v-if="!checkPayloadEmpty(data.latestMessage)" size="mini" class="value-tag ml-2">
{{ data.latestMessage }}
<el-tag v-if="data.message && !checkPayloadEmpty(data.message.payload)" size="mini" class="value-tag ml-2">
{{ data.message.payload }}
</el-tag>
</span>
<span class="tree-node-meta">
Expand Down
32 changes: 32 additions & 0 deletions src/database/models/TopicNodeEntity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Entity, PrimaryGeneratedColumn, Column, Tree, TreeChildren, TreeParent, PrimaryColumn } from 'typeorm'

@Entity('TopicNodeEntity')
@Tree('closure-table')
export default class TopicNodeEntity {
@PrimaryColumn()
id!: string

@Column()
label!: string

@Column({ default: 0 })
messageCount!: number

@Column({ default: 0 })
subTopicCount!: number

@Column({ nullable: true })
lastMessageId?: string

@Column({ nullable: true })
connectionId?: string

@TreeChildren()
children?: TopicNodeEntity[]

@TreeParent()
parent?: TopicNodeEntity

@Column({ nullable: true })
parentId?: string
}
14 changes: 9 additions & 5 deletions src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,18 +451,22 @@ declare global {
interface TopicTreeNode {
id: string
label: string
qos?: QoS
retain?: boolean
time?: string
latestMessage?: string | Buffer | null
messageCount: number
subTopicCount: number
message?: MessageModel
connectionInfo?: ConnectionModel
parentId?: string
children?: TopicTreeNode[]
}

interface UpdateTopicNodeResult {
updatedTree: TopicTreeNode[]
updatedNode: TopicTreeNode | null
}

interface QueuedMessage {
connectionId: string
packet: IPublishPacket
updateNodes: TopicTreeNode[]
timestamp: number
}
}
48 changes: 48 additions & 0 deletions src/utils/messageQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { Subject, Observable } from 'rxjs'
import { bufferTime, filter, mergeMap, map, share } from 'rxjs/operators'

/**
* A generic MessageQueue class for buffering and processing messages.
*
* @typeparam T The type of message to be queued.
*/
export class MessageQueue<T extends any> {
private messageSubject = new Subject<T>()
private messageObservable: Observable<T[]>

/**
* Creates a new MessageQueue instance.
* @param bufferTime The time in milliseconds to buffer messages before processing. Default is 500ms.
*/
constructor(private bufferTime: number = 500) {
this.messageObservable = this.initMessageProcessing()
}

/**
* Queues a new message.
* @param message The message to queue.
*/
queueMessage(message: T) {
this.messageSubject.next(message)
}

/**
* Returns the observable for processed message batches.
* @returns An Observable that emits batches of messages.
*/
getMessageObservable(): Observable<T[]> {
return this.messageObservable
}

/**
* Initializes the message processing pipeline.
* @returns An Observable that emits batches of messages.
*/
private initMessageProcessing(): Observable<T[]> {
return this.messageSubject.pipe(
bufferTime(this.bufferTime),
filter((messages) => messages.length > 0),
share(),
)
}
}
71 changes: 0 additions & 71 deletions src/utils/mqttMessageQueue.ts

This file was deleted.

Loading
Loading