Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(desktop): sync traffic packets to message table #1791

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/components/ConnectionSelect.vue
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<template>
<el-select v-model="modelValue" class="connection-select" v-bind="$attrs" @change="handleChange" :style="{ width }">
<el-select v-model="modelValue" class="connection-select" v-bind="$attrs" :style="{ width }">
<el-option v-for="conn in connections" :key="conn.id" :label="`${conn.name}@${conn.host}`" :value="conn.id">
<span style="float: left">{{ conn.name }}@{{ conn.host }}</span>
<span style="float: right; color: #8492a6; font-size: 13px; margin-left: 24px">
Expand Down Expand Up @@ -34,10 +34,6 @@ export default class ConnectionSelect extends Vue {
this.$emit('change', newVal)
}

private handleChange(value: string) {
this.$emit('change', value)
}

private getConnectionStatus(connectionId: string) {
if (this.activeConnection[connectionId]) {
return this.activeConnection[connectionId].client.connected
Expand Down
6 changes: 6 additions & 0 deletions src/components/charts/AreaLine.vue
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ export default class AreaLine extends Vue {
type: 'cross',
label: {
backgroundColor: '#6a7985',
formatter: (params: any) => {
if (params.axisDimension === 'y') {
return this.formatter ? this.formatter(params.value) : params.value
}
return params.value
},
},
},
formatter: (params: any[]) => {
Expand Down
6 changes: 6 additions & 0 deletions src/components/widgets/TrafficStatistics.vue
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ export default class TrafficStatistics extends Vue {
this.chartData.seriesData[0].data = []
this.chartData.seriesData[1].data = []
}

public setDefaultChartData(metrics: MetricsModel[] = []) {
this.chartData.xData = metrics.map((m) => m.label)
this.chartData.seriesData[0].data = metrics.map((m) => m.received)
this.chartData.seriesData[1].data = metrics.map((m) => m.sent)
}
}
</script>

Expand Down
42 changes: 42 additions & 0 deletions src/database/services/MessageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,46 @@ export default class MessageService {
const res = await this.messageRepository.find({ where: { topic: fullTopic }, take: limit })
return res.map((m) => MessageService.entityToModel(m))
}

/**
* Retrieves messages by topic pattern for a specific connection.
* @param connectionId - The ID of the connection.
* @param topicPattern - The pattern to match topics against.
* @param options - Optional parameters for the query.
* @param options.startTime - The start time for the date range filter.
* @param options.endTime - The end time for the date range filter.
* @param options.limit - The maximum number of messages to retrieve (default: 1000).
* @param options.transform - A function to transform the retrieved messages.
* @returns A promise that resolves to an array of transformed messages or MessageModel[].
*/
public async getMessagesByTopicPattern<T>(
connectionId: string,
topicPattern: string,
{
startTime,
endTime,
limit = 1000,
transform,
}: {
startTime?: string
endTime?: string
limit?: number
transform?: (messages: MessageModel[]) => T[]
} = {},
): Promise<T[]> {
const query = this.messageRepository
.createQueryBuilder('msg')
.where('msg.connectionId = :connectionId', { connectionId })
.andWhere('msg.topic LIKE :topic', { topic: topicPattern })

if (startTime && endTime) {
query.andWhere('msg.createAt BETWEEN :startTime AND :endTime', { startTime, endTime })
}

const messageEntities = await query.orderBy('msg.createAt', 'ASC').take(limit).getMany()

const messages = messageEntities.map((entity) => MessageService.entityToModel(entity))

return transform ? transform(messages) : (messages as unknown as T[])
}
}
29 changes: 11 additions & 18 deletions src/lang/viewer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,31 +96,24 @@ export default {
hu: 'Forgalom figyelés',
},
brokerTrafficMonitorTooltip: {
zh: '选择一个连接来监控该 MQTT Broker 的实时流量数据',
en: 'Select a connection to monitor real-time traffic data for this MQTT Broker',
tr: 'Bu MQTT Broker için gerçek zamanlı trafik verilerini izlemek için bir bağlantı seçin',
ja: 'この MQTT Broker のリアルタイムトラフィックデータを監視する接続を選択してください',
hu: 'Válasszon egy kapcsolatot az MQTT Broker valós idejű forgalmi adatainak figyeléséhez',
},
accumulatedTotalTraffic: {
zh: '累计传输量',
en: 'Accumulated Total Traffic',
tr: 'Toplam Trafik',
ja: '累計トラフィック',
hu: 'Összesített Forgalom',
zh: '选择一个连接来监控该 MQTT Broker 的实时流量数据,展示最近 24 小时的流量统计',
en: 'Select a connection to monitor real-time traffic data for this MQTT Broker, showing traffic statistics for the last 24 hours',
tr: 'Bu MQTT Broker için son 24 saatlik trafik istatistiklerini gösteren gerçek zamanlı trafik verilerini izlemek için bir bağlantı seçin',
ja: 'この MQTT Broker のリアルタイムトラフィックデータを監視する接続を選択してください。過去24時間のトラフィック統計を表示します',
hu: 'Válasszon egy kapcsolatot az MQTT Broker valós idejű forgalmi adatainak figyeléséhez, az elmúlt 24 óra forgalmi statisztikáinak megjelenítésével',
},
accumulatedReceivedTraffic: {
zh: '累计接收流量',
en: 'Accumulated Received Traffic',
tr: 'Toplam Alınan Trafik',
en: 'Accumulated Received',
tr: 'Toplam Alınan',
ja: '累計受信トラフィック',
hu: 'Összesített Fogadott Forgalom',
hu: 'Összesített Fogadott',
},
accumulatedSentTraffic: {
zh: '累计发送流量',
en: 'Accumulated Sent Traffic',
tr: 'Toplam Gönderilen Trafik',
en: 'Accumulated Sent',
tr: 'Toplam Gönderilen',
ja: '累計送信トラフィック',
hu: 'Összesített Küldött Forgalom',
hu: 'Összesített Küldött',
},
}
4 changes: 2 additions & 2 deletions src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ declare global {

interface MetricsModel {
label: string
recevied: number
sent: number
received: number | null
sent: number | null
}

// System
Expand Down
15 changes: 7 additions & 8 deletions src/utils/SystemTopicUtils.ts → src/utils/systemTopic.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import time from '@/utils/time'

const METRICS_BYTES_PREFIX = '/metrics/bytes/'
const RECEIVED_TOPIC = `${METRICS_BYTES_PREFIX}received`
const SENT_TOPIC = `${METRICS_BYTES_PREFIX}sent`
Expand All @@ -26,23 +24,24 @@ export const extractData = (message: MessageModel, topic: string): string | null
/**
* Parse traffic metrics data
* @param message The MQTT message to parse
* @returns The parsed data as a MetricsModel, or null if the message is a system topic
* @param defaultMetrics The default metrics model to use as base
* @returns The parsed data as a MetricsModel, or null if the message is not a system topic
*/
export const getTrafficMetrics = (message: MessageModel): MetricsModel | null => {
export const getTrafficMetrics = (message: MessageModel, defaultMetrics: MetricsModel): MetricsModel | null => {
if (!isSystemTopic(message.topic)) {
return null
}

const metrics: MetricsModel = {
label: time.getNowDate(),
recevied: 0,
sent: 0,
label: message.createAt,
received: defaultMetrics.received,
sent: defaultMetrics.sent,
}

// Try to parse received bytes
const receivedBytes = extractData(message, RECEIVED_TOPIC)
if (receivedBytes) {
metrics.recevied = parseInt(receivedBytes, 10)
metrics.received = parseInt(receivedBytes, 10)
return metrics
}

Expand Down
5 changes: 5 additions & 0 deletions src/utils/time.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ interface TimeModel {
convertSecondsToMs: (seconds: number) => number
sqliteDateFormat: string
toFormat: (date: Date | string) => string
getDateBefore: (minutes: number) => string
}

export const sqliteDateFormat = 'YYYY-MM-DD HH:mm:ss:SSS'

export const getNowDate = (format: string = sqliteDateFormat): string => moment().format(format)
Expand All @@ -14,11 +16,14 @@ export const toFormat = (date: Date | string): string => moment(date).format(sql

export const convertSecondsToMs = (seconds: number): number => seconds * 1000

export const getDateBefore = (minutes: number): string => moment().subtract(minutes, 'minutes').format(sqliteDateFormat)

const time: TimeModel = {
getNowDate,
convertSecondsToMs,
sqliteDateFormat,
toFormat,
getDateBefore,
}

export default time
1 change: 0 additions & 1 deletion src/utils/topicTree.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Buffer } from 'buffer'
import { IPublishPacket } from 'mqtt-packet/types'
import time from '@/utils/time'
import { getMessageId } from '@/utils/idGenerator'

/**
* Updates the topic tree structure with new message data.
Expand Down
2 changes: 1 addition & 1 deletion src/views/connections/ConnectionsDetail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ export default class ConnectionsDetail extends Vue {
}
}

// Recevied message
// received message
private onMessageArrived(client: MqttClient, id: string) {
const unsubscribe$ = new Subject<void>()

Expand Down
Loading
Loading