diff --git a/.github/workflows/docker-image-v0.5.4.1.yml b/.github/workflows/docker-image-v0.5.4.1.yml new file mode 100644 index 000000000..c2f213382 --- /dev/null +++ b/.github/workflows/docker-image-v0.5.4.1.yml @@ -0,0 +1,27 @@ +name: docker-image-v0.5.4.1 + +on: + workflow_dispatch: + +jobs: + + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Login to DockerHub + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + + - name: Build and push + id: docker_build + uses: docker/build-push-action@v2 + with: + context: . + file: ./Dockerfile + push: true + tags: thingspanel/thingspanel-go:v0.5.4.1 \ No newline at end of file diff --git a/TP.sql b/TP.sql index 64c61053e..d51744e65 100644 --- a/TP.sql +++ b/TP.sql @@ -1993,4 +1993,6 @@ CREATE TABLE public.shared_visualization ( ); ALTER TABLE public.tp_dashboard ADD share_id varchar(36) NULL; -COMMENT ON COLUMN public.tp_dashboard.share_id IS '分享id'; \ No newline at end of file +COMMENT ON COLUMN public.tp_dashboard.share_id IS '分享id'; + +-- 0.5.4.1 \ No newline at end of file diff --git a/conf/app.yml b/conf/app.yml index a5065f70f..a49eada30 100644 --- a/conf/app.yml +++ b/conf/app.yml @@ -1,21 +1,13 @@ +# 如需在系统变量中设置配置项,可使用GOTP_开头的变量名,如:GOTP_DB_PSQL_DBTYPE为db.psql.dbType + app: runmode: prod - httpport: 127.0.0.1:9999 + httpport: :9999 channel_buffer_size: 10000 write_workers: 2 batch_wait_time: 1 batch_size: 1000 -log: - # 0-控制台输出 1-文件输出 2-文件和控制台输出 - adapter_type: 0 - # 文件最多保存多少天 - maxdays: 7 - # 日志级别 (0-紧急 1-报警 2-严重错误 3-错误 4-警告 5-注意 6-信息 7-调试) - level: 7 - # 每个文件保存的最大行数 - maxlines: 10000 - db: psql: dbType: timescaledb @@ -24,14 +16,13 @@ db: psqldb: ThingsPanel psqluser: postgres psqlpass: ThingsPanel2023 - psqlMaxConns: 10 - psqlMaxOpen: 50 - - # SQL日志级别 (1-静音 2-错误 3-警告 4-信息). 注意: sql日志只在level大于等于5级别才会输出。 + psqlMaxConns: 5 # 空闲连接池中的最大连接数,建议为psqlMaxOpen的百分之5-20之间 + psqlMaxOpen: 50 # 最大打开连接数,不能大于timescaledb默认为100,考虑到其他服务也会使用数据库,建议这里设置为50 + # SQL日志级别 (1-静音 2-错误 3-警告 4-信息). 注意: sql日志只在log.level大于等于5级别才会输出。 sqlloglevel: 2 - # 慢SQL阈值(毫秒)。慢SQL会在sqlloglevel大于等于3时输出。 - slow_threshold: 200 + slow_threshold: 1000 + redis: # redis 连接字符串 conn: 127.0.0.1:6379 @@ -40,10 +31,19 @@ db: # redis 密码 password: "redis2022" +log: + # 0-控制台输出 1-文件输出 2-文件和控制台输出 + adapter_type: 0 + # 文件最多保存多少天 + maxdays: 7 + # 日志级别 (0-紧急 1-报警 2-严重错误 3-错误 4-警告 5-注意 6-信息 7-调试) + level: 2 + # 每个文件保存的最大行数 + maxlines: 10000 - -# GOTP_MQTT_SERVER mqtt服务:gmqtt、vernemq +# mqtt服务:gmqtt、vernemq mqtt_server: gmqtt + mqtt: broker: 127.0.0.1:1883 user: root diff --git a/initialize/conf/init.go b/initialize/conf/init.go index f71059e1e..f5b1f7ec4 100644 --- a/initialize/conf/init.go +++ b/initialize/conf/init.go @@ -10,7 +10,7 @@ import ( func init() { log.Println("系统配置文件初始化...") - viper.SetEnvPrefix("TP") + viper.SetEnvPrefix("GOTP") viper.AutomaticEnv() viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) viper.SetConfigName("./conf/app") diff --git "a/services/--services\345\261\202\345\274\200\345\217\221\345\270\256\345\212\251.md" "b/services/--services\345\261\202\345\274\200\345\217\221\345\270\256\345\212\251.md" index b718aae72..ad0fce988 100644 --- "a/services/--services\345\261\202\345\274\200\345\217\221\345\270\256\345\212\251.md" +++ "b/services/--services\345\261\202\345\274\200\345\217\221\345\270\256\345\212\251.md" @@ -2,6 +2,12 @@ 开发中尽量考虑代码的复用,以便于后期维护。 +## 注意 +如果生产环境在外网,就需要设置不能让用户自定义AccessToken,否则有设备数据泄露的风险 + +## vernemq相关 +- vernemq需要所有MQTT协议直连设备的MQTTclientID唯一,生产使用vernemq需要对程序稍作调整 + ## 时序数据库相关 - timescaleDB数据库也是一个时序数据库,它在查询方面有很大的优势,但在高并发写入方面有很大的限制,还有水平扩展的问题。 @@ -10,7 +16,7 @@ ## 在线离线相关 -- 目前在线离线状态是通过订阅device/status获取,将其存储在ts_kv_latest的SYS_ONLINE(str_v 1-在线 0-离线) +- 目前在线离线状态是通过订阅device/status获取,将其存储在ts_kv_latest的SYS_ONLINE(str_v 1-在线 0-离线),并将状态缓存到redis(key:"status"+diviceID) - 当数据类型不是timescaledb的时候,SYS_ONLINE仍然存储在timescaledb的ts_kv_latest的SYS_ONLINE字段 - 有时候device/status会因为各种原因(大多时候是因为broker没将状态的改变获取到),可能会漏掉状态上报 diff --git a/services/tskv_service.go b/services/tskv_service.go index c816e7781..5b5bf7b3d 100644 --- a/services/tskv_service.go +++ b/services/tskv_service.go @@ -152,42 +152,45 @@ func (*TSKVService) MsgProcOther(body []byte, topic string) { if values, ok := payload.Values.(map[string]interface{}); ok { var device models.Device // 首先从redis中获设备id - device.ID = redis.GetStr(payload.AccessToken) + // device.ID = redis.GetStr(payload.AccessToken) + // 为了保证数据准确性,在线离线相关直接从数据库中获取设备id + result := psql.Mydb.Where("token = ?", payload.AccessToken).First(&device) + if result.Error != nil { + logs.Error(result.Error.Error()) + return + } if device.ID == "" { - // 从数据库中获取设备id - result := psql.Mydb.Where("token = ?", payload.AccessToken).First(&device) - if result.Error != nil { - logs.Error(result.Error.Error()) - return - } - if device.ID == "" { - return - } else { - // 存储24小时 - redis.SetStr(payload.AccessToken, device.ID, 24*time.Hour) - } + return + } else { + // 存储24小时 + redis.SetStr(payload.AccessToken, device.ID, 24*time.Hour) } - - //DeviceOnlineState[device.ID] = values["status"] - // 如果mqtt_server为vernemq,则不需要更新ts_kv_latest表 - if viper.GetString("mqtt_server") != "-" { - d := models.TSKVLatest{ - EntityType: "DEVICE", - EntityID: device.ID, - Key: "SYS_ONLINE", - TS: time.Now().UnixMicro(), - StrV: fmt.Sprint(values["status"]), - TenantID: device.TenantId, - } - result := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE'", device.ID).Update("str_v", d.StrV) - if result.Error != nil { - logs.Error(result.Error.Error()) - } else { - if result.RowsAffected == int64(0) { - rtsl := psql.Mydb.Create(&d) - if rtsl.Error != nil { - logs.Error(rtsl.Error) - } + status := fmt.Sprint(values["status"]) + // 存入redis + if status == "0" { + // 延时一秒,等最后一个消息处理完,防止在线状态修复程序在下线后修改状态 + time.Sleep(1 * time.Second) + } + err := redis.SetStr("status"+device.ID, status, 72*time.Hour) + if err != nil { + logs.Error(err.Error()) + } + d := models.TSKVLatest{ + EntityType: "DEVICE", + EntityID: device.ID, + Key: "SYS_ONLINE", + TS: time.Now().UnixMicro(), + StrV: fmt.Sprint(values["status"]), + TenantID: device.TenantId, + } + result = psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE'", device.ID).Update("str_v", d.StrV) + if result.Error != nil { + logs.Error(result.Error.Error()) + } else { + if result.RowsAffected == int64(0) { + rtsl := psql.Mydb.Create(&d) + if rtsl.Error != nil { + logs.Error(rtsl.Error) } } } @@ -202,25 +205,55 @@ func (*TSKVService) MsgProcOther(body []byte, topic string) { } // 判断设备是否在线,不在线更新ts_kv_latest表为在线 -func checkDeviceOnline(deviceId string) { - // 如果dbType为timescaledb,则不更新ts_kv_latest表 - dbType := viper.GetString("db.psql.dbType") - //if dbType == "timescaledb" { - if dbType != "" { //不管哪个数据库,在线离线状态都存储在pg的ts_kv_latest表中 - var count int64 - // 判断5秒外设备是否在线 - var currentData = time.Now().UnixMicro() - 5000000 - result := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE' and str_v = '0' and ts < ?", deviceId, currentData).Count(&count) +func checkDeviceOnline(deviceId string, tenantId string) { + //不管哪个数据库,在线离线状态都存储在pg的ts_kv_latest表中 + // 从redis中获取设备状态 + status := redis.GetStr("status" + deviceId) + if status == "1" { + return + } else { + if status == "" { + // 从数据库中获取设备状态 + var count int64 + result := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE' and str_v = '1'", deviceId).Count(&count) + if result.Error != nil { + logs.Error(result.Error.Error()) + } else { + if count == int64(1) { + return + } + } + } + // 更新redis + err := redis.SetStr("status"+deviceId, "1", 72*time.Hour) + if err != nil { + logs.Error(err.Error()) + } + // 更新ts_kv_latest表 + result := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE'", deviceId).Update("str_v", "1") if result.Error != nil { logs.Error(result.Error.Error()) } else { - if count > int64(0) { - result = psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_id = ? and key = 'SYS_ONLINE'", deviceId).Update("str_v", "1") - if result.Error != nil { - logs.Error(result.Error.Error()) + // 如果不存在,就插入一条 + if result.RowsAffected == int64(0) { + d := models.TSKVLatest{ + EntityType: "DEVICE", + EntityID: deviceId, + Key: "SYS_ONLINE", + TS: time.Now().UnixMicro(), + StrV: "1", + TenantID: tenantId, + } + rtsl := psql.Mydb.Create(&d) + if rtsl.Error != nil { + logs.Error(rtsl.Error) } } } + + // 设备上下线自动化检查 + var ConditionsService ConditionsService + go ConditionsService.OnlineAndOfflineCheck(deviceId, "1") } } @@ -246,7 +279,7 @@ func (*TSKVService) GatewayMsgProc(body []byte, topic string, messages chan map[ return false } // 在线离线检查修复 - checkDeviceOnline(device.ID) + go checkDeviceOnline(device.ID, device.TenantId) logs.Info("设备信息:", device) // 通过脚本执行器 req, err := scriptDeal(device.ScriptId, payload.Values, topic) @@ -326,7 +359,7 @@ func (*TSKVService) MsgProc(messages chan<- map[string]interface{}, body []byte, } if device.DeviceType == "1" { // 在线离线检查修复 - checkDeviceOnline(device.ID) + checkDeviceOnline(device.ID, device.TenantId) } // 通过脚本执行器 req, err_a := scriptDeal(device.ScriptId, payload.Values, topic) @@ -362,9 +395,9 @@ func (*TSKVService) MsgProc(messages chan<- map[string]interface{}, body []byte, topic := viper.GetString("mqtt.topicToSubscribe") + "/" + device.ID sendMqtt.SendMQTT(body, topic, 0) } - // 非系统数据库不需要入库 + // timescaledb数据库需要入库 dbType := viper.GetString("db.psql.dbType") - if dbType != "cassandra" { + if dbType == "timescaledb" { // 入库 //存入系统时间 ts := time.Now().UnixMicro() @@ -422,29 +455,6 @@ func (*TSKVService) MsgProc(messages chan<- map[string]interface{}, body []byte, messages <- map[string]interface{}{ "tskv": d, } - // 更新当前值表 - // l := models.TSKVLatest{} - // utils.StructAssign(&l, &d) - // var latestCount int64 - // psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_type = ? and entity_id = ? and key = ? and tenant_id = ?", l.EntityType, l.EntityID, l.Key, l.TenantID).Count(&latestCount) - // if latestCount <= 0 { - // rtsl := psql.Mydb.Create(&l) - // if rtsl.Error != nil { - // log.Println(rtsl.Error) - // } - // } else { - // rtsl := psql.Mydb.Model(&models.TSKVLatest{}).Where("entity_type = ? and entity_id = ? and key = ? and tenant_id = ?", l.EntityType, l.EntityID, - // l.Key, l.TenantID).Updates(map[string]interface{}{"entity_type": l.EntityType, "entity_id": l.EntityID, "key": l.Key, "ts": l.TS, "bool_v": l.BoolV, "long_v": l.LongV, "str_v": l.StrV, "dbl_v": l.DblV}) - // if rtsl.Error != nil { - // log.Println(rtsl.Error) - // } - // } - - // rts := psql.Mydb.Create(&d) - // if rts.Error != nil { - // log.Println(rts.Error) - // return false - // } } }