From 72f96608f6b13c3b7060b0e39dfe77a5aa37a8ff Mon Sep 17 00:00:00 2001 From: November-12 <845255519@qq.com> Date: Sat, 4 Nov 2023 00:04:24 +0800 Subject: [PATCH] add --- conf/app.yml | 4 ++++ modules/dataService/mqtt/mqtt.go | 9 ++------- services/tskv_service.go | 5 ++--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/conf/app.yml b/conf/app.yml index 641b7c5d3..c8e5e2321 100644 --- a/conf/app.yml +++ b/conf/app.yml @@ -1,6 +1,10 @@ app: runmode: prod httpport: 127.0.0.1:9999 + channel_buffer_size: 10000 + write_workers: 2 + batch_wait_time: 1 + batch_size: 1000 log: # 0-控制台输出 1-文件输出 2-文件和控制台输出 diff --git a/modules/dataService/mqtt/mqtt.go b/modules/dataService/mqtt/mqtt.go index bc418c099..c1caa98da 100644 --- a/modules/dataService/mqtt/mqtt.go +++ b/modules/dataService/mqtt/mqtt.go @@ -10,7 +10,6 @@ import ( "time" "github.com/beego/beego/v2/core/logs" - "github.com/beego/beego/v2/server/web" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/panjf2000/ants" "github.com/spf13/viper" @@ -34,15 +33,11 @@ func ListenNew(broker, username, password string) error { //defer pOther.Release() qos := byte(viper.GetUint("mqtt.qos")) - - channelBufferSize, err := web.AppConfig.Int("channel_buffer_size") - if err != nil { - return err - } + channelBufferSize := viper.GetInt("app.channel_buffer_size") messages := make(chan map[string]interface{}, channelBufferSize) + writeWorkers := viper.GetInt("app.write_workers") - writeWorkers, _ := web.AppConfig.Int("write_workers") for i := 0; i < writeWorkers; i++ { go s.BatchWrite(messages) } diff --git a/services/tskv_service.go b/services/tskv_service.go index da9cb0039..3cf5872ca 100644 --- a/services/tskv_service.go +++ b/services/tskv_service.go @@ -17,7 +17,6 @@ import ( "time" "github.com/beego/beego/v2/core/logs" - "github.com/beego/beego/v2/server/web" "github.com/bitly/go-simplejson" "github.com/mintance/go-uniqid" "github.com/spf13/viper" @@ -460,11 +459,11 @@ func (*TSKVService) BatchWrite(messages <-chan map[string]interface{}) error { logs.Info("批量写入协程启动") var tskvList []models.TSKV var tskvLatestList []models.TSKVLatest - batchWaitTime, _ := web.AppConfig.Int("batch_wait_time") + batchWaitTime := viper.GetInt("app.batch_wait_time") logs.Info("批量写入等待时间:", batchWaitTime) // 转time.Duration batchWaitTimeDuration := time.Duration(batchWaitTime) * time.Second - batchSize, _ := web.AppConfig.Int("batch_size") + batchSize := viper.GetInt("app.batch_size") logs.Warn("批量写入大小:", batchSize) for {