From f2c1fea4d68a3eaf457d4f0cccae3be3cfa29f22 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Tue, 8 Dec 2020 09:37:58 +0800 Subject: [PATCH] minor improvement of systest config --- .github/workflows/lint.yml | 4 +-- Makefile | 2 +- docker/conf/config.json | 1 + docker/conf/tasks/test1.json | 42 ++++++++++++------------- docker/conf/tasks/test_auto_schema.json | 27 +++++++--------- go.test.sh | 22 ++++++------- output/clickhouse.go | 22 ++++++------- pool/conn.go | 18 +++++------ 8 files changed, 65 insertions(+), 73 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 6f057d3a..e1ca2ff0 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -16,13 +16,13 @@ jobs: uses: golangci/golangci-lint-action@v2 with: # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. - version: v1.32.2 + version: v1.33 # Optional: working directory, useful for monorepos # working-directory: somedir # Optional: golangci-lint command line arguments. - args: --issues-exit-code=0 --disable=nakedret,exhaustivestruct,wrapcheck + args: --issues-exit-code=0 --disable=nakedret,exhaustivestruct,wrapcheck,paralleltest # Optional: show only new issues if it's a pull request. The default value is `false`. # only-new-issues: true \ No newline at end of file diff --git a/Makefile b/Makefile index 33ed3a6c..fb903bc5 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ benchtest: pre systest: build bash go.test.sh lint: - golangci-lint run --issues-exit-code=0 --disable=nakedret,exhaustivestruct,wrapcheck + golangci-lint run --issues-exit-code=0 --disable=nakedret,exhaustivestruct,wrapcheck,paralleltest run: pre go run cmd/clickhouse_sinker/main.go --local-cfg-dir conf/ diff --git a/docker/conf/config.json b/docker/conf/config.json index f6f3d8ef..bd7340c2 100644 --- a/docker/conf/config.json +++ b/docker/conf/config.json @@ -38,6 +38,7 @@ "bufferSize": 90000, "minBufferSize": 2000, "flushInterval": 5, + "layoutDateTime": "2006-01-02 15:04:05.999999999Z07:00", "logLevel": "debug" } } \ No newline at end of file diff --git a/docker/conf/tasks/test1.json b/docker/conf/tasks/test1.json index 8a78f9b9..0c72cd60 100644 --- a/docker/conf/tasks/test1.json +++ b/docker/conf/tasks/test1.json @@ -1,25 +1,25 @@ { - - "name" : "test1", - + "name": "test1", "kafka": "kfk1", "topic": "topic1", - "consumerGroup" : "test_sinker", - "earliest" : true, - "parser" : "json", - "clickhouse" : "ch1", - - "tableName" : "test1", - - "dims" : [ - {"name" : "timestamp" , "type" : "UInt64"}, - {"name" : "name" , "type" : "String"} + "consumerGroup": "test_sinker", + "earliest": true, + "parser": "json", + "clickhouse": "ch1", + "tableName": "test1", + "dims": [ + { + "name": "time", + "type": "DateTime" + }, + { + "name": "name", + "type": "String" + }, + { + "name": "value", + "type": "Float32" + } ], - - "metrics" : [ - {"name" : "value" , "type" : "Float32"} - ], - - "bufferSize" : 50000 -} - + "bufferSize": 50000 +} \ No newline at end of file diff --git a/docker/conf/tasks/test_auto_schema.json b/docker/conf/tasks/test_auto_schema.json index e6b6d381..4c0161b5 100644 --- a/docker/conf/tasks/test_auto_schema.json +++ b/docker/conf/tasks/test_auto_schema.json @@ -1,18 +1,15 @@ { - - "name" : "test_auto_schema", - + "name": "test_auto_schema", "kafka": "kfk1", "topic": "topic1", - "consumerGroup" : "test_auto_schema", - "earliest" : true, - "parser" : "json", - "clickhouse" : "ch1", - - "autoSchema" : true, - "tableName" : "test_auto_schema", - "excludeColumns" : ["day", "time"], - - "bufferSize" : 50000 -} - + "consumerGroup": "test_auto_schema", + "earliest": true, + "parser": "json", + "clickhouse": "ch1", + "autoSchema": true, + "tableName": "test_auto_schema", + "excludeColumns": [ + "day" + ], + "bufferSize": 50000 +} \ No newline at end of file diff --git a/go.test.sh b/go.test.sh index 78f33538..44ef3c2a 100755 --- a/go.test.sh +++ b/go.test.sh @@ -5,36 +5,33 @@ curl "localhost:8123" -d 'DROP TABLE IF EXISTS test1' curl "localhost:8123" -d 'CREATE TABLE IF NOT EXISTS test1 ( `day` Date DEFAULT toDate(time), - `time` DateTime DEFAULT toDateTime(timestamp / 1000), - `timestamp` UInt64, + `time` DateTime, `name` String, `value` Float64 ) ENGINE = MergeTree PARTITION BY day -ORDER BY time' +ORDER BY (time, name)' curl "localhost:8123" -d 'DROP TABLE IF EXISTS test_auto_schema' curl "localhost:8123" -d 'CREATE TABLE IF NOT EXISTS test_auto_schema ( `day` Date DEFAULT toDate(time), - `time` DateTime DEFAULT toDateTime(timestamp / 1000), - `timestamp` UInt64, + `time` DateTime, `name` String, `value` Float64 ) ENGINE = MergeTree PARTITION BY day -ORDER BY time' - +ORDER BY (time, name)' ## send the messages to kafka -current_timestamp=`date +%s`000 +now=`date --rfc-3339=ns` for i in `seq 1 100000`;do - echo "{\"timestamp\" : \"${current_timestamp}\", \"name\" : \"sundy-li\", \"value\" : \"$i\" }" + echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : \"$i\" }" done > a.json +echo "generated a.json" echo "cat /tmp/a.json | kafka-console-producer --topic topic1 --broker-list localhost:9092" > send.sh - sudo docker cp a.json kafka:/tmp/ sudo docker cp send.sh kafka:/tmp/ sudo docker exec kafka sh /tmp/send.sh @@ -53,9 +50,8 @@ echo "Got test_auto_schema count => $count" ## reset kafka consumer-group offsets -echo "kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_sinker --all-topics --to-earliest; kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_auto_schema --all-topics --to-earliest" > reset-offsets.sh -sudo docker cp reset-offsets.sh kafka:/tmp/ -sudo docker exec kafka sh /tmp/reset-offsets.sh +sudo docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_sinker --all-topics --to-earliest +sudo docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_auto_schema --all-topics --to-earliest ## truncate tables curl "localhost:8123" -d 'TRUNCATE TABLE test1' diff --git a/output/clickhouse.go b/output/clickhouse.go index 76037945..89b04735 100644 --- a/output/clickhouse.go +++ b/output/clickhouse.go @@ -139,7 +139,7 @@ func shouldReconnect(err error) bool { // LoopWrite will dead loop to write the records func (c *ClickHouse) loopWrite(batch *model.Batch, callback func(batch *model.Batch) error) { var err error - times := c.chCfg.RetryTimes + var times int for { if err = c.write(batch); err == nil { for { @@ -150,13 +150,12 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, callback func(batch *model.Ba log.Infof("%s: ClickHouse.loopWrite quit due to the context has been cancelled", c.taskCfg.Name) return } - log.Errorf("%s: committing offset(try #%d) failed with error %+v", c.taskCfg.Name, c.chCfg.RetryTimes-times, err) - if c.chCfg.RetryTimes > 0 { - times-- - if times <= 0 { - os.Exit(-1) - } + log.Errorf("%s: committing offset(try #%d) failed with error %+v", c.taskCfg.Name, times, err) + times++ + if c.chCfg.RetryTimes <= 0 || times < c.chCfg.RetryTimes { time.Sleep(10 * time.Second) + } else { + os.Exit(-1) } } } @@ -166,12 +165,11 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, callback func(batch *model.Ba } log.Errorf("%s: flush batch(try #%d) failed with error %+v", c.taskCfg.Name, c.chCfg.RetryTimes-times, err) statistics.FlushMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(batch.RealSize)) - if c.chCfg.RetryTimes > 0 { - times-- - if times <= 0 { - os.Exit(-1) - } + times++ + if c.chCfg.RetryTimes <= 0 || times < c.chCfg.RetryTimes { time.Sleep(10 * time.Second) + } else { + os.Exit(-1) } } } diff --git a/pool/conn.go b/pool/conn.go index f07901cf..93ee5c4b 100644 --- a/pool/conn.go +++ b/pool/conn.go @@ -60,7 +60,7 @@ func (c *Connection) ReConnect() error { log.Info("reconnect to ", c.dsn, err.Error()) return err } - setDbParams(sqlDB) + setDBParams(sqlDB) log.Info("reconnect success to ", c.dsn) c.DB = sqlDB return nil @@ -104,7 +104,7 @@ func InitConn(name string, hosts [][]string, port int, db, username, password, d err = errors.Wrapf(err, "") return } - setDbParams(sqlDB) + setDBParams(sqlDB) cc.connections = append(cc.connections, &Connection{sqlDB, dsn}) } @@ -124,7 +124,7 @@ func InitConn(name string, hosts [][]string, port int, db, username, password, d return nil } -func setDbParams(sqlDB *sql.DB) { +func setDBParams(sqlDB *sql.DB) { sqlDB.SetMaxIdleConns(1) sqlDB.SetConnMaxIdleTime(10 * time.Second) } @@ -134,13 +134,13 @@ func FreeConn(name string) { defer lock.Unlock() if cc, ok := poolMaps[name]; ok { cc.ref-- - if cc.ref <= 0 { + if cc.ref == 0 { delete(poolMaps, name) - } - for _, conn := range cc.connections { - if err := health.Health.RemoveReadinessCheck(conn.dsn); err != nil { - err = errors.Wrapf(err, "") - log.Errorf("got error: %+v", err) + for _, conn := range cc.connections { + if err := health.Health.RemoveReadinessCheck(conn.dsn); err != nil { + err = errors.Wrapf(err, conn.dsn) + log.Errorf("got error: %+v", err) + } } } }