Skip to content
This repository has been archived by the owner on Nov 8, 2023. It is now read-only.

Commit

Permalink
Merge pull request ThingsPanel#252 from November-12/main
Browse files Browse the repository at this point in the history
fix config
  • Loading branch information
November-12 authored Nov 3, 2023
2 parents bf96348 + 61220e7 commit caff277
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 10 deletions.
4 changes: 4 additions & 0 deletions conf/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
app:
runmode: prod
httpport: 127.0.0.1:9999
channel_buffer_size: 10000
write_workers: 2
batch_wait_time: 1
batch_size: 1000

db:
psql:
Expand Down
9 changes: 2 additions & 7 deletions modules/dataService/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/beego/beego/v2/core/logs"
"github.com/beego/beego/v2/server/web"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/panjf2000/ants"
"github.com/spf13/viper"
Expand All @@ -34,15 +33,11 @@ func ListenNew(broker, username, password string) error {
//defer pOther.Release()

qos := byte(viper.GetUint("mqtt.qos"))

channelBufferSize, err := web.AppConfig.Int("channel_buffer_size")
if err != nil {
return err
}
channelBufferSize := viper.GetInt("app.channel_buffer_size")

messages := make(chan map[string]interface{}, channelBufferSize)
writeWorkers := viper.GetInt("app.write_workers")

writeWorkers, _ := web.AppConfig.Int("write_workers")
for i := 0; i < writeWorkers; i++ {
go s.BatchWrite(messages)
}
Expand Down
5 changes: 2 additions & 3 deletions services/tskv_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/beego/beego/v2/core/logs"
"github.com/beego/beego/v2/server/web"
"github.com/bitly/go-simplejson"
"github.com/mintance/go-uniqid"
"github.com/spf13/viper"
Expand Down Expand Up @@ -470,11 +469,11 @@ func (*TSKVService) BatchWrite(messages <-chan map[string]interface{}) error {
logs.Info("批量写入协程启动")
var tskvList []models.TSKV
var tskvLatestList []models.TSKVLatest
batchWaitTime, _ := web.AppConfig.Int("batch_wait_time")
batchWaitTime := viper.GetInt("app.batch_wait_time")
logs.Info("批量写入等待时间:", batchWaitTime)
// 转time.Duration
batchWaitTimeDuration := time.Duration(batchWaitTime) * time.Second
batchSize, _ := web.AppConfig.Int("batch_size")
batchSize := viper.GetInt("app.batch_size")
logs.Warn("批量写入大小:", batchSize)
for {

Expand Down

0 comments on commit caff277

Please sign in to comment.