Skip to content
This repository has been archived by the owner on Nov 8, 2023. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
November-12 committed Nov 6, 2023
2 parents 3e4f1d0 + 5781aee commit f514455
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 96 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/docker-image-v0.5.4.1.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: docker-image-v0.5.4.1

on:
workflow_dispatch:

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1

- name: Build and push
id: docker_build
uses: docker/build-push-action@v2
with:
context: .
file: ./Dockerfile
push: true
tags: thingspanel/thingspanel-go:v0.5.4.1
4 changes: 3 additions & 1 deletion TP.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1993,4 +1993,6 @@ CREATE TABLE public.shared_visualization (
);

ALTER TABLE public.tp_dashboard ADD share_id varchar(36) NULL;
COMMENT ON COLUMN public.tp_dashboard.share_id IS '分享id';
COMMENT ON COLUMN public.tp_dashboard.share_id IS '分享id';

-- 0.5.4.1
38 changes: 19 additions & 19 deletions conf/app.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
# 如需在系统变量中设置配置项,可使用GOTP_开头的变量名,如:GOTP_DB_PSQL_DBTYPE为db.psql.dbType

app:
runmode: prod
httpport: 127.0.0.1:9999
httpport: :9999
channel_buffer_size: 10000
write_workers: 2
batch_wait_time: 1
batch_size: 1000

log:
# 0-控制台输出 1-文件输出 2-文件和控制台输出
adapter_type: 0
# 文件最多保存多少天
maxdays: 7
# 日志级别 (0-紧急 1-报警 2-严重错误 3-错误 4-警告 5-注意 6-信息 7-调试)
level: 7
# 每个文件保存的最大行数
maxlines: 10000

db:
psql:
dbType: timescaledb
Expand All @@ -24,14 +16,13 @@ db:
psqldb: ThingsPanel
psqluser: postgres
psqlpass: ThingsPanel2023
psqlMaxConns: 10
psqlMaxOpen: 50

# SQL日志级别 (1-静音 2-错误 3-警告 4-信息). 注意: sql日志只在level大于等于5级别才会输出。
psqlMaxConns: 5 # 空闲连接池中的最大连接数,建议为psqlMaxOpen的百分之5-20之间
psqlMaxOpen: 50 # 最大打开连接数,不能大于timescaledb默认为100,考虑到其他服务也会使用数据库,建议这里设置为50
# SQL日志级别 (1-静音 2-错误 3-警告 4-信息). 注意: sql日志只在log.level大于等于5级别才会输出。
sqlloglevel: 2

# 慢SQL阈值(毫秒)。慢SQL会在sqlloglevel大于等于3时输出。
slow_threshold: 200
slow_threshold: 1000

redis:
# redis 连接字符串
conn: 127.0.0.1:6379
Expand All @@ -40,10 +31,19 @@ db:
# redis 密码
password: "redis2022"

log:
# 0-控制台输出 1-文件输出 2-文件和控制台输出
adapter_type: 0
# 文件最多保存多少天
maxdays: 7
# 日志级别 (0-紧急 1-报警 2-严重错误 3-错误 4-警告 5-注意 6-信息 7-调试)
level: 2
# 每个文件保存的最大行数
maxlines: 10000


# GOTP_MQTT_SERVER mqtt服务:gmqtt、vernemq
# mqtt服务:gmqtt、vernemq
mqtt_server: gmqtt

mqtt:
broker: 127.0.0.1:1883
user: root
Expand Down
2 changes: 1 addition & 1 deletion initialize/conf/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func init() {
log.Println("系统配置文件初始化...")
viper.SetEnvPrefix("TP")
viper.SetEnvPrefix("GOTP")
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.SetConfigName("./conf/app")
Expand Down
8 changes: 7 additions & 1 deletion services/--services层开发帮助.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

开发中尽量考虑代码的复用,以便于后期维护。

## 注意
如果生产环境在外网,就需要设置不能让用户自定义AccessToken,否则有设备数据泄露的风险

## vernemq相关
- vernemq需要所有MQTT协议直连设备的MQTTclientID唯一,生产使用vernemq需要对程序稍作调整

## 时序数据库相关

- timescaleDB数据库也是一个时序数据库,它在查询方面有很大的优势,但在高并发写入方面有很大的限制,还有水平扩展的问题。
Expand All @@ -10,7 +16,7 @@

## 在线离线相关

- 目前在线离线状态是通过订阅device/status获取,将其存储在ts_kv_latest的SYS_ONLINE(str_v 1-在线 0-离线)
- 目前在线离线状态是通过订阅device/status获取,将其存储在ts_kv_latest的SYS_ONLINE(str_v 1-在线 0-离线),并将状态缓存到redis(key:"status"+diviceID)
- 当数据类型不是timescaledb的时候,SYS_ONLINE仍然存储在timescaledb的ts_kv_latest的SYS_ONLINE字段

- 有时候device/status会因为各种原因(大多时候是因为broker没将状态的改变获取到),可能会漏掉状态上报
Expand Down
158 changes: 84 additions & 74 deletions services/tskv_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,42 +152,45 @@ func (*TSKVService) MsgProcOther(body []byte, topic string) {
if values, ok := payload.Values.(map[string]interface{}); ok {
var device models.Device
// 首先从redis中获设备id
device.ID = redis.GetStr(payload.AccessToken)
// device.ID = redis.GetStr(payload.AccessToken)
// 为了保证数据准确性,在线离线相关直接从数据库中获取设备id
result := psql.Mydb.Where("token = ?", payload.AccessToken).First(&device)
if result.Error != nil {
logs.Error(result.Error.Error())
return
}
if device.ID == "" {
// 从数据库中获取设备id
result := psql.Mydb.Where("token = ?", payload.AccessToken).First(&device)
if result.Error != nil {
logs.Error(result.Error.Error())
return
}
if device.ID == "" {
return
} else {
// 存储24小时
redis.SetStr(payload.AccessToken, device.ID, 24*time.Hour)
}
return
} else {
// 存储24小时
redis.SetStr(payload.AccessToken, device.ID, 24*time.Hour)
}

//DeviceOnlineState[device.ID] = values["status"]
// 如果mqtt_server为vernemq,则不需要更新ts_kv_latest表
if viper.GetString("mqtt_server") != "-" {
d := models.TSKVLatest{
EntityType: "DEVICE",
EntityID: device.ID,
Key: "SYS_ONLINE",
TS: time.Now().UnixMicro(),
StrV: fmt.Sprint(values["status"]),
TenantID: device.TenantId,
}
result := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE'", device.ID).Update("str_v", d.StrV)
if result.Error != nil {
logs.Error(result.Error.Error())
} else {
if result.RowsAffected == int64(0) {
rtsl := psql.Mydb.Create(&d)
if rtsl.Error != nil {
logs.Error(rtsl.Error)
}
status := fmt.Sprint(values["status"])
// 存入redis
if status == "0" {
// 延时一秒,等最后一个消息处理完,防止在线状态修复程序在下线后修改状态
time.Sleep(1 * time.Second)
}
err := redis.SetStr("status"+device.ID, status, 72*time.Hour)
if err != nil {
logs.Error(err.Error())
}
d := models.TSKVLatest{
EntityType: "DEVICE",
EntityID: device.ID,
Key: "SYS_ONLINE",
TS: time.Now().UnixMicro(),
StrV: fmt.Sprint(values["status"]),
TenantID: device.TenantId,
}
result = psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE'", device.ID).Update("str_v", d.StrV)
if result.Error != nil {
logs.Error(result.Error.Error())
} else {
if result.RowsAffected == int64(0) {
rtsl := psql.Mydb.Create(&d)
if rtsl.Error != nil {
logs.Error(rtsl.Error)
}
}
}
Expand All @@ -202,25 +205,55 @@ func (*TSKVService) MsgProcOther(body []byte, topic string) {
}

// 判断设备是否在线,不在线更新ts_kv_latest表为在线
func checkDeviceOnline(deviceId string) {
// 如果dbType为timescaledb,则不更新ts_kv_latest表
dbType := viper.GetString("db.psql.dbType")
//if dbType == "timescaledb" {
if dbType != "" { //不管哪个数据库,在线离线状态都存储在pg的ts_kv_latest表中
var count int64
// 判断5秒外设备是否在线
var currentData = time.Now().UnixMicro() - 5000000
result := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE' and str_v = '0' and ts < ?", deviceId, currentData).Count(&count)
func checkDeviceOnline(deviceId string, tenantId string) {
//不管哪个数据库,在线离线状态都存储在pg的ts_kv_latest表中
// 从redis中获取设备状态
status := redis.GetStr("status" + deviceId)
if status == "1" {
return
} else {
if status == "" {
// 从数据库中获取设备状态
var count int64
result := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE' and str_v = '1'", deviceId).Count(&count)
if result.Error != nil {
logs.Error(result.Error.Error())
} else {
if count == int64(1) {
return
}
}
}
// 更新redis
err := redis.SetStr("status"+deviceId, "1", 72*time.Hour)
if err != nil {
logs.Error(err.Error())
}
// 更新ts_kv_latest表
result := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE'", deviceId).Update("str_v", "1")
if result.Error != nil {
logs.Error(result.Error.Error())
} else {
if count > int64(0) {
result = psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE'", deviceId).Update("str_v", "1")
if result.Error != nil {
logs.Error(result.Error.Error())
// 如果不存在,就插入一条
if result.RowsAffected == int64(0) {
d := models.TSKVLatest{
EntityType: "DEVICE",
EntityID: deviceId,
Key: "SYS_ONLINE",
TS: time.Now().UnixMicro(),
StrV: "1",
TenantID: tenantId,
}
rtsl := psql.Mydb.Create(&d)
if rtsl.Error != nil {
logs.Error(rtsl.Error)
}
}
}

// 设备上下线自动化检查
var ConditionsService ConditionsService
go ConditionsService.OnlineAndOfflineCheck(deviceId, "1")
}
}

Expand All @@ -246,7 +279,7 @@ func (*TSKVService) GatewayMsgProc(body []byte, topic string, messages chan map[
return false
}
// 在线离线检查修复
checkDeviceOnline(device.ID)
go checkDeviceOnline(device.ID, device.TenantId)
logs.Info("设备信息:", device)
// 通过脚本执行器
req, err := scriptDeal(device.ScriptId, payload.Values, topic)
Expand Down Expand Up @@ -326,7 +359,7 @@ func (*TSKVService) MsgProc(messages chan<- map[string]interface{}, body []byte,
}
if device.DeviceType == "1" {
// 在线离线检查修复
checkDeviceOnline(device.ID)
checkDeviceOnline(device.ID, device.TenantId)
}
// 通过脚本执行器
req, err_a := scriptDeal(device.ScriptId, payload.Values, topic)
Expand Down Expand Up @@ -362,9 +395,9 @@ func (*TSKVService) MsgProc(messages chan<- map[string]interface{}, body []byte,
topic := viper.GetString("mqtt.topicToSubscribe") + "/" + device.ID
sendMqtt.SendMQTT(body, topic, 0)
}
// 非系统数据库不需要入库
// timescaledb数据库需要入库
dbType := viper.GetString("db.psql.dbType")
if dbType != "cassandra" {
if dbType == "timescaledb" {
// 入库
//存入系统时间
ts := time.Now().UnixMicro()
Expand Down Expand Up @@ -422,29 +455,6 @@ func (*TSKVService) MsgProc(messages chan<- map[string]interface{}, body []byte,
messages <- map[string]interface{}{
"tskv": d,
}
// 更新当前值表
// l := models.TSKVLatest{}
// utils.StructAssign(&l, &d)
// var latestCount int64
// psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_type = ? and entity_id = ? and key = ? and tenant_id = ?", l.EntityType, l.EntityID, l.Key, l.TenantID).Count(&latestCount)
// if latestCount <= 0 {
// rtsl := psql.Mydb.Create(&l)
// if rtsl.Error != nil {
// log.Println(rtsl.Error)
// }
// } else {
// rtsl := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_type = ? and entity_id = ? and key = ? and tenant_id = ?", l.EntityType, l.EntityID,
// l.Key, l.TenantID).Updates(map[string]interface{}{"entity_type": l.EntityType, "entity_id": l.EntityID, "key": l.Key, "ts": l.TS, "bool_v": l.BoolV, "long_v": l.LongV, "str_v": l.StrV, "dbl_v": l.DblV})
// if rtsl.Error != nil {
// log.Println(rtsl.Error)
// }
// }

// rts := psql.Mydb.Create(&d)
// if rts.Error != nil {
// log.Println(rts.Error)
// return false
// }
}
}

Expand Down

0 comments on commit f514455

Please sign in to comment.