diff --git a/input/kafka_franz.go b/input/kafka_franz.go index 378e98e1..87d4c4a7 100644 --- a/input/kafka_franz.go +++ b/input/kafka_franz.go @@ -173,6 +173,7 @@ func GetFranzConfig(kfkCfg *config.KafkaConfig) (opts []kgo.Opt, err error) { func (k *KafkaFranz) Run() { k.wgRun.Add(1) defer k.wgRun.Done() +LOOP: for { fetches := k.cl.PollRecords(k.ctx, k.grpConfig.BufferSize) err := fetches.Err() @@ -191,6 +192,9 @@ func (k *KafkaFranz) Run() { select { case k.fetch <- &fetches: t.Stop() + case <-k.ctx.Done(): + t.Stop() + break LOOP case <-t.C: util.Logger.Fatal(fmt.Sprintf("Sinker abort because group %s was not processing in last %d minutes", k.grpConfig.Name, processTimeOut)) } diff --git a/output/clickhouse.go b/output/clickhouse.go index aa05aaed..013cb743 100644 --- a/output/clickhouse.go +++ b/output/clickhouse.go @@ -239,10 +239,8 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, sc *pool.ShardConn) { retry.RetryIf(func(err error) bool { return shouldReconnect(err, sc) }), retry.OnRetry(func(n uint, err error) { retrycount++ - if !errors.Is(err, clickhouse.ErrAcquireConnTimeout) { - util.Logger.Error("flush batch failed", zap.String("task", c.taskCfg.Name), zap.Int("try", int(retrycount)), zap.Error(err)) - statistics.FlushMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(batch.RealSize)) - } + util.Logger.Error("flush batch failed", zap.String("task", c.taskCfg.Name), zap.Int("try", int(retrycount)), zap.Error(err)) + statistics.FlushMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(batch.RealSize)) }), ); err != nil { util.Logger.Fatal("ClickHouse.loopWrite failed", zap.String("task", c.taskCfg.Name), zap.Error(err)) @@ -478,13 +476,7 @@ func (c *ClickHouse) ChangeSchema(newKeys *sync.Map) (err error) { alterTable := func(tbl, col string) error { query := fmt.Sprintf("ALTER TABLE `%s`.`%s` %s %s;", c.dbName, tbl, onCluster, col) util.Logger.Info(fmt.Sprintf("executing sql=> %s", query), zap.String("task", taskCfg.Name)) - return retry.Do( - func() error { return conn.Exec(context.Background(), query) }, - retry.Attempts(0), - retry.LastErrorOnly(true), - retry.Delay(10*time.Second), - retry.RetryIf(func(err error) bool { return errors.Is(err, clickhouse.ErrAcquireConnTimeout) }), - ) + return conn.Exec(context.Background(), query) } if len(alterSeries) != 0 { @@ -527,16 +519,7 @@ func (c *ClickHouse) getDistTbls(table string) (distTbls []string, err error) { c.dbName, chCfg.Cluster, c.dbName, table) util.Logger.Info(fmt.Sprintf("executing sql=> %s", query), zap.String("task", taskCfg.Name)) var rows driver.Rows - if err = retry.Do( - func() error { - rows, err = conn.Query(context.Background(), query) - return err - }, - retry.Attempts(0), - retry.LastErrorOnly(true), - retry.Delay(10*time.Second), - retry.RetryIf(func(err error) bool { return errors.Is(err, clickhouse.ErrAcquireConnTimeout) }), - ); err != nil { + if rows, err = conn.Query(context.Background(), query); err != nil { err = errors.Wrapf(err, "") return } diff --git a/output/clickhouse_util.go b/output/clickhouse_util.go index f4481ed9..8280257a 100644 --- a/output/clickhouse_util.go +++ b/output/clickhouse_util.go @@ -3,12 +3,10 @@ package output import ( "context" "fmt" - "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/RoaringBitmap/roaring" - "github.com/avast/retry-go/v4" "github.com/housepower/clickhouse_sinker/model" "github.com/housepower/clickhouse_sinker/pool" "github.com/housepower/clickhouse_sinker/util" @@ -82,16 +80,7 @@ func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn cl func getDims(database, table string, excludedColumns []string, parser string, conn clickhouse.Conn) (dims []*model.ColumnWithType, err error) { var rs driver.Rows - if err = retry.Do( - func() error { - rs, err = conn.Query(context.Background(), fmt.Sprintf(selectSQLTemplate, database, table)) - return err - }, - retry.Attempts(0), - retry.LastErrorOnly(true), - retry.Delay(10*time.Second), - retry.RetryIf(func(err error) bool { return errors.Is(err, clickhouse.ErrAcquireConnTimeout) }), - ); err != nil { + if rs, err = conn.Query(context.Background(), fmt.Sprintf(selectSQLTemplate, database, table)); err != nil { err = errors.Wrapf(err, "") return }