Skip to content

Commit

Permalink
chore - no more input when context canceled
Browse files Browse the repository at this point in the history
  • Loading branch information
momingkotoba committed Apr 10, 2023
1 parent 05af994 commit 7b0e833
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 33 deletions.
4 changes: 4 additions & 0 deletions input/kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func GetFranzConfig(kfkCfg *config.KafkaConfig) (opts []kgo.Opt, err error) {
func (k *KafkaFranz) Run() {
k.wgRun.Add(1)
defer k.wgRun.Done()
LOOP:
for {
fetches := k.cl.PollRecords(k.ctx, k.grpConfig.BufferSize)
err := fetches.Err()
Expand All @@ -191,6 +192,9 @@ func (k *KafkaFranz) Run() {
select {
case k.fetch <- &fetches:
t.Stop()
case <-k.ctx.Done():
t.Stop()
break LOOP
case <-t.C:
util.Logger.Fatal(fmt.Sprintf("Sinker abort because group %s was not processing in last %d minutes", k.grpConfig.Name, processTimeOut))
}
Expand Down
25 changes: 4 additions & 21 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,8 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, sc *pool.ShardConn) {
retry.RetryIf(func(err error) bool { return shouldReconnect(err, sc) }),
retry.OnRetry(func(n uint, err error) {
retrycount++
if !errors.Is(err, clickhouse.ErrAcquireConnTimeout) {
util.Logger.Error("flush batch failed", zap.String("task", c.taskCfg.Name), zap.Int("try", int(retrycount)), zap.Error(err))
statistics.FlushMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(batch.RealSize))
}
util.Logger.Error("flush batch failed", zap.String("task", c.taskCfg.Name), zap.Int("try", int(retrycount)), zap.Error(err))
statistics.FlushMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(batch.RealSize))
}),
); err != nil {
util.Logger.Fatal("ClickHouse.loopWrite failed", zap.String("task", c.taskCfg.Name), zap.Error(err))
Expand Down Expand Up @@ -478,13 +476,7 @@ func (c *ClickHouse) ChangeSchema(newKeys *sync.Map) (err error) {
alterTable := func(tbl, col string) error {
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` %s %s;", c.dbName, tbl, onCluster, col)
util.Logger.Info(fmt.Sprintf("executing sql=> %s", query), zap.String("task", taskCfg.Name))
return retry.Do(
func() error { return conn.Exec(context.Background(), query) },
retry.Attempts(0),
retry.LastErrorOnly(true),
retry.Delay(10*time.Second),
retry.RetryIf(func(err error) bool { return errors.Is(err, clickhouse.ErrAcquireConnTimeout) }),
)
return conn.Exec(context.Background(), query)
}

if len(alterSeries) != 0 {
Expand Down Expand Up @@ -527,16 +519,7 @@ func (c *ClickHouse) getDistTbls(table string) (distTbls []string, err error) {
c.dbName, chCfg.Cluster, c.dbName, table)
util.Logger.Info(fmt.Sprintf("executing sql=> %s", query), zap.String("task", taskCfg.Name))
var rows driver.Rows
if err = retry.Do(
func() error {
rows, err = conn.Query(context.Background(), query)
return err
},
retry.Attempts(0),
retry.LastErrorOnly(true),
retry.Delay(10*time.Second),
retry.RetryIf(func(err error) bool { return errors.Is(err, clickhouse.ErrAcquireConnTimeout) }),
); err != nil {
if rows, err = conn.Query(context.Background(), query); err != nil {
err = errors.Wrapf(err, "")
return
}
Expand Down
13 changes: 1 addition & 12 deletions output/clickhouse_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package output
import (
"context"
"fmt"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/RoaringBitmap/roaring"
"github.com/avast/retry-go/v4"
"github.com/housepower/clickhouse_sinker/model"
"github.com/housepower/clickhouse_sinker/pool"
"github.com/housepower/clickhouse_sinker/util"
Expand Down Expand Up @@ -82,16 +80,7 @@ func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn cl

func getDims(database, table string, excludedColumns []string, parser string, conn clickhouse.Conn) (dims []*model.ColumnWithType, err error) {
var rs driver.Rows
if err = retry.Do(
func() error {
rs, err = conn.Query(context.Background(), fmt.Sprintf(selectSQLTemplate, database, table))
return err
},
retry.Attempts(0),
retry.LastErrorOnly(true),
retry.Delay(10*time.Second),
retry.RetryIf(func(err error) bool { return errors.Is(err, clickhouse.ErrAcquireConnTimeout) }),
); err != nil {
if rs, err = conn.Query(context.Background(), fmt.Sprintf(selectSQLTemplate, database, table)); err != nil {
err = errors.Wrapf(err, "")
return
}
Expand Down

0 comments on commit 7b0e833

Please sign in to comment.