Skip to content

Commit

Permalink
calculate topic lags every 10 seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
momingkotoba committed Mar 10, 2023
1 parent 57d39c4 commit 180029f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Improvements:
- combine nacos log into sinker log
- update dmseries map when applying new config, reload the records from series table every single day
- avoid recreating dist tables, alter the table schema instead
- update clickhouse_sinker_consume_lags metric every 10 secs


#### Version 3.0.1 (2023-03-03)
Expand Down
2 changes: 1 addition & 1 deletion config_manager/lags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func GetTaskStateAndLags(cfg *config.Config) (stateLags map[string]StateLag, err
defer adm.Close()
defer cl.Close()

stateLags = make(map[string]StateLag)
stateLags = make(map[string]StateLag, len(cfg.Tasks))
for _, taskCfg := range cfg.Tasks {
var state string
var totalLags int64
Expand Down
18 changes: 18 additions & 0 deletions config_manager/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ func (ncm *NacosConfManager) Run() {
// Assign regularly to handle lag change
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
lagticker := time.NewTicker(10 * time.Second)
defer lagticker.Stop()
LOOP_FOR:
for {
select {
Expand All @@ -236,6 +238,12 @@ LOOP_FOR:
util.Logger.Debug("assign triggered by 5 min timer")
if err := ncm.assign(); err != nil {
util.Logger.Error("assign failed", zap.Error(err))
} else {
lagticker.Reset(10 * time.Second)
}
case <-lagticker.C:
if err := ncm.calculateGroupLag(); err != nil {
util.Logger.Error("calculate lag failed", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -427,6 +435,16 @@ func (ncm *NacosConfManager) assign() (err error) {
return
}

func (ncm *NacosConfManager) calculateGroupLag() (err error) {
if len(ncm.curInsts) == 0 || ncm.curCfg == nil || ncm.curInsts[0] != ncm.instance {
// Only the first instance is capable to report the lag
return
}
_, err = GetTaskStateAndLags(ncm.curCfg)

return
}

type NacosLogger struct {
*zap.SugaredLogger
}
Expand Down

0 comments on commit 180029f

Please sign in to comment.