Skip to content

Commit

Permalink
Group the tasks by consumerGroup property to reduce number of kafka c…
Browse files Browse the repository at this point in the history
…lient
  • Loading branch information
momingkotoba committed Dec 23, 2022
1 parent b452c6d commit 0a4add9
Show file tree
Hide file tree
Showing 20 changed files with 861 additions and 1,623 deletions.
332 changes: 6 additions & 326 deletions cmd/clickhouse_sinker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ limitations under the License.
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
Expand All @@ -26,17 +25,10 @@ import (
"net/http/pprof"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
"time"

"github.com/housepower/clickhouse_sinker/config"
cm "github.com/housepower/clickhouse_sinker/config_manager"
"github.com/housepower/clickhouse_sinker/health"
"github.com/housepower/clickhouse_sinker/pool"
"github.com/housepower/clickhouse_sinker/statistics"
"github.com/housepower/clickhouse_sinker/task"
"github.com/housepower/clickhouse_sinker/util"
"go.uber.org/zap"
Expand All @@ -45,40 +37,23 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type CmdOptions struct {
ShowVer bool
LogLevel string // "debug", "info", "warn", "error", "dpanic", "panic", "fatal"
LogPaths string // comma-separated paths. "stdout" means the console stdout
HTTPPort int // 0 menas a randomly OS chosen port
PushGatewayAddrs string
PushInterval int
LocalCfgFile string
NacosAddr string
NacosNamespaceID string
NacosGroup string
NacosUsername string
NacosPassword string
NacosDataID string
NacosServiceName string // participate in assignment management if not empty
}

var (
//goreleaser fill following info per https://goreleaser.com/customization/build/.
version = "None"
commit = "None"
date = "None"
builtBy = "None"

cmdOps CmdOptions
cmdOps util.CmdOptions
selfIP string
httpAddr string
httpMetrics = promhttp.Handler()
runner *Sinker
runner *task.Sinker
)

func initCmdOptions() {
// 1. Set options to default value.
cmdOps = CmdOptions{
cmdOps = util.CmdOptions{
ShowVer: false,
LogLevel: "info",
LogPaths: "stdout,clickhouse_sinker.log",
Expand Down Expand Up @@ -174,11 +149,11 @@ func main() {

mux.HandleFunc("/state", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if runner != nil && runner.curCfg != nil {
if runner != nil && runner.GetCurrentConfig() != nil {
var stateLags map[string]cm.StateLag
var bs []byte
var err error
if stateLags, err = cm.GetTaskStateAndLags(runner.curCfg); err == nil {
if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil {
if bs, err = json.Marshal(stateLags); err == nil {
_, _ = w.Write(bs)
}
Expand Down Expand Up @@ -250,7 +225,7 @@ func main() {
}
}
}
runner = NewSinker(rcm)
runner = task.NewSinker(rcm, httpAddr, &cmdOps)
return runner.Init()
}, func() error {
runner.Run()
Expand All @@ -260,298 +235,3 @@ func main() {
return nil
})
}

// Sinker object maintains number of task for each partition
type Sinker struct {
curCfg *config.Config
numCfg int
pusher *statistics.Pusher
tasks map[string]*task.Service
rcm cm.RemoteConfManager
ctx context.Context
cancel context.CancelFunc
stopped chan struct{}
}

// NewSinker get an instance of sinker with the task list
func NewSinker(rcm cm.RemoteConfManager) *Sinker {
ctx, cancel := context.WithCancel(context.Background())
s := &Sinker{
tasks: make(map[string]*task.Service),
rcm: rcm,
ctx: ctx,
cancel: cancel,
stopped: make(chan struct{}),
}
return s
}

func (s *Sinker) Init() (err error) {
return
}

// Run is the mainloop to get and apply config
func (s *Sinker) Run() {
var err error
var newCfg *config.Config
defer func() {
s.stopped <- struct{}{}
}()
if cmdOps.PushGatewayAddrs != "" {
addrs := strings.Split(cmdOps.PushGatewayAddrs, ",")
s.pusher = statistics.NewPusher(addrs, cmdOps.PushInterval, httpAddr)
if err = s.pusher.Init(); err != nil {
return
}
go s.pusher.Run()
}
if s.rcm == nil {
if _, err = os.Stat(cmdOps.LocalCfgFile); err == nil {
if newCfg, err = config.ParseLocalCfgFile(cmdOps.LocalCfgFile); err != nil {
util.Logger.Fatal("config.ParseLocalCfgFile failed", zap.Error(err))
return
}
} else {
util.Logger.Fatal("expect --local-cfg-file or --nacos-dataid")
return
}
if err = newCfg.Normallize(); err != nil {
util.Logger.Fatal("newCfg.Normallize failed", zap.Error(err))
return
}
if err = s.applyConfig(newCfg); err != nil {
util.Logger.Fatal("s.applyConfig failed", zap.Error(err))
return
}
<-s.ctx.Done()
} else {
if cmdOps.NacosServiceName != "" {
go s.rcm.Run()
}
// Golang <-time.After() is not garbage collected before expiry.
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
util.Logger.Info("Sinker.Run quit due to context has been canceled")
return
case <-ticker.C:
if newCfg, err = s.rcm.GetConfig(); err != nil {
util.Logger.Error("s.rcm.GetConfig failed", zap.Error(err))
continue
}
if err = newCfg.Normallize(); err != nil {
util.Logger.Error("newCfg.Normallize failed", zap.Error(err))
continue
}
if err = s.applyConfig(newCfg); err != nil {
util.Logger.Error("s.applyConfig failed", zap.Error(err))
continue
}
}
}
}
}

// Close shutdown task
func (s *Sinker) Close() {
// 1. Stop rcm
if s.rcm != nil {
s.rcm.Stop()
s.rcm = nil
}
// 2. Quit Run mainloop
s.cancel()
<-s.stopped
// 3. Stop tasks gracefully.
s.stopAllTasks()
// 4. Stop pusher
if s.pusher != nil {
s.pusher.Stop()
s.pusher = nil
}
}

func (s *Sinker) stopAllTasks() {
var wg sync.WaitGroup
for _, tsk := range s.tasks {
wg.Add(1)
go func(tsk *task.Service) {
tsk.Stop()
wg.Done()
}(tsk)
}
wg.Wait()
for taskName := range s.tasks {
delete(s.tasks, taskName)
}
util.Logger.Info("stopped all tasks")
if util.GlobalParsingPool != nil {
util.GlobalParsingPool.StopWait()
}
util.Logger.Info("stopped parsing pool")
if util.GlobalTimerWheel != nil {
util.GlobalTimerWheel.Stop()
}
util.Logger.Info("stopped timer wheel")
if util.GlobalWritingPool != nil {
util.GlobalWritingPool.StopWait()
}
util.Logger.Info("stopped writing pool")
}

func (s *Sinker) applyConfig(newCfg *config.Config) (err error) {
util.SetLogLevel(newCfg.LogLevel)
if s.curCfg == nil {
// The first time invoking of applyConfig
err = s.applyFirstConfig(newCfg)
} else if !reflect.DeepEqual(newCfg.Clickhouse, s.curCfg.Clickhouse) ||
!reflect.DeepEqual(newCfg.Kafka, s.curCfg.Kafka) ||
!reflect.DeepEqual(newCfg.Tasks, s.curCfg.Tasks) ||
!reflect.DeepEqual(newCfg.Assignment.Map, s.curCfg.Assignment.Map) {
err = s.applyAnotherConfig(newCfg)
}
return
}

func (s *Sinker) applyFirstConfig(newCfg *config.Config) (err error) {
util.Logger.Info("going to apply the first config", zap.Reflect("config", newCfg))
// 1. Initialize clickhouse connections
chCfg := &newCfg.Clickhouse
if err = pool.InitClusterConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password,
chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns, chCfg.DialTimeout); err != nil {
return
}

// 2. Start goroutine pools.
util.InitGlobalTimerWheel()
util.InitGlobalParsingPool()
util.InitGlobalWritingPool(len(chCfg.Hosts) * chCfg.MaxOpenConns)

// 3. Generate, initialize and run task
for _, taskCfg := range newCfg.Tasks {
if cmdOps.NacosServiceName != "" && !newCfg.IsAssigned(httpAddr, taskCfg.Name) {
continue
}
task := task.NewTaskService(newCfg, taskCfg)
if err = task.Init(); err != nil {
return
}
s.tasks[taskCfg.Name] = task
}
for _, task := range s.tasks {
go task.Run()
}
s.curCfg = newCfg
util.Logger.Info("applied the first config")
return
}

func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) {
util.Logger.Info("going to apply another config", zap.Int("number", s.numCfg), zap.Reflect("config", newCfg))
if !reflect.DeepEqual(newCfg.Kafka, s.curCfg.Kafka) || !reflect.DeepEqual(newCfg.Clickhouse, s.curCfg.Clickhouse) {
// 1. Stop tasks gracefully. Wait until all flying data be processed (write to CH and commit to Kafka).
s.stopAllTasks()
// 2. Initialize clickhouse connections.
chCfg := &newCfg.Clickhouse
if err = pool.InitClusterConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password,
chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns, chCfg.DialTimeout); err != nil {
return
}

// 3. Restart goroutine pools.
util.Logger.Info("restarting parsing, writing and timer pool")
util.GlobalTimerWheel = nil
util.InitGlobalTimerWheel()
util.GlobalParsingPool.Restart()
maxWorkers := len(newCfg.Clickhouse.Hosts) * newCfg.Clickhouse.MaxOpenConns
util.GlobalWritingPool.Resize(maxWorkers)
util.GlobalWritingPool.Restart()
util.Logger.Info("resized writing pool", zap.Int("maxWorkers", maxWorkers))

// 4. Generate, initialize and run tasks.
var tasksToStart []string
for _, taskCfg := range newCfg.Tasks {
if cmdOps.NacosServiceName != "" && !newCfg.IsAssigned(httpAddr, taskCfg.Name) {
continue
}
task := task.NewTaskService(newCfg, taskCfg)
if err = task.Init(); err != nil {
return
}
tasksToStart = append(tasksToStart, taskCfg.Name)
s.tasks[taskCfg.Name] = task
}
for _, task := range s.tasks {
go task.Run()
}
sort.Strings(tasksToStart)
util.Logger.Info("started tasks", zap.Reflect("tasks", tasksToStart))
} else if !reflect.DeepEqual(newCfg.Tasks, s.curCfg.Tasks) || !reflect.DeepEqual(newCfg.Assignment.Map, s.curCfg.Assignment.Map) {
//1. Find tasks need to stop.
var tasksToStop []string
curCfgTasks := make(map[string]*config.TaskConfig)
newCfgTasks := make(map[string]*config.TaskConfig)
for _, taskCfg := range s.curCfg.Tasks {
curCfgTasks[taskCfg.Name] = taskCfg
}
for _, taskCfg := range newCfg.Tasks {
if cmdOps.NacosServiceName != "" && !newCfg.IsAssigned(httpAddr, taskCfg.Name) {
continue
}
newCfgTasks[taskCfg.Name] = taskCfg
}
for taskName := range s.tasks {
curTaskCfg := curCfgTasks[taskName]
newTaskCfg, ok := newCfgTasks[taskName]
if !ok || !reflect.DeepEqual(newTaskCfg, curTaskCfg) {
tasksToStop = append(tasksToStop, taskName)
}
}
sort.Strings(tasksToStop)
// 2. Stop tasks in parallel found at the previous step.
// They must drain flying batchs as quickly as possible to allow another clickhouse_sinker
// instance take over partitions safely.
var wg sync.WaitGroup
for _, taskName := range tasksToStop {
wg.Add(1)
go func(tsk *task.Service) {
tsk.Stop()
wg.Done()
}(s.tasks[taskName])
}
wg.Wait()
for _, taskName := range tasksToStop {
delete(s.tasks, taskName)
}
util.Logger.Info("stopped tasks", zap.Reflect("tasks", tasksToStop))
// 3. Initailize tasks which are new or their config differ.
var tasksToStart []string
var newTasks []*task.Service
for taskName, taskCfg := range newCfgTasks {
if _, ok := s.tasks[taskName]; ok {
continue
}
task := task.NewTaskService(newCfg, taskCfg)
if err = task.Init(); err != nil {
return
}
s.tasks[taskName] = task
tasksToStart = append(tasksToStart, taskName)
newTasks = append(newTasks, task)
}

// 4. Start new tasks. We don't do it at step 3 in order to avoid goroutine leak due to errors raised by later steps.
for _, task := range newTasks {
go task.Run()
}
sort.Strings(tasksToStart)
util.Logger.Info("started tasks", zap.Reflect("tasks", tasksToStart))
}
// Record the new config
s.curCfg = newCfg
util.Logger.Info("applied another config", zap.Int("number", s.numCfg))
s.numCfg++
return
}
Loading

0 comments on commit 0a4add9

Please sign in to comment.