Skip to content

Commit

Permalink
minor improvement of systest config
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Dec 8, 2020
1 parent 7c67299 commit f2c1fea
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 73 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ jobs:
uses: golangci/golangci-lint-action@v2
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.32.2
version: v1.33

# Optional: working directory, useful for monorepos
# working-directory: somedir

# Optional: golangci-lint command line arguments.
args: --issues-exit-code=0 --disable=nakedret,exhaustivestruct,wrapcheck
args: --issues-exit-code=0 --disable=nakedret,exhaustivestruct,wrapcheck,paralleltest

# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ benchtest: pre
systest: build
bash go.test.sh
lint:
golangci-lint run --issues-exit-code=0 --disable=nakedret,exhaustivestruct,wrapcheck
golangci-lint run --issues-exit-code=0 --disable=nakedret,exhaustivestruct,wrapcheck,paralleltest
run: pre
go run cmd/clickhouse_sinker/main.go --local-cfg-dir conf/

Expand Down
1 change: 1 addition & 0 deletions docker/conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"bufferSize": 90000,
"minBufferSize": 2000,
"flushInterval": 5,
"layoutDateTime": "2006-01-02 15:04:05.999999999Z07:00",
"logLevel": "debug"
}
}
42 changes: 21 additions & 21 deletions docker/conf/tasks/test1.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
{

"name" : "test1",

"name": "test1",
"kafka": "kfk1",
"topic": "topic1",
"consumerGroup" : "test_sinker",
"earliest" : true,
"parser" : "json",
"clickhouse" : "ch1",

"tableName" : "test1",

"dims" : [
{"name" : "timestamp" , "type" : "UInt64"},
{"name" : "name" , "type" : "String"}
"consumerGroup": "test_sinker",
"earliest": true,
"parser": "json",
"clickhouse": "ch1",
"tableName": "test1",
"dims": [
{
"name": "time",
"type": "DateTime"
},
{
"name": "name",
"type": "String"
},
{
"name": "value",
"type": "Float32"
}
],

"metrics" : [
{"name" : "value" , "type" : "Float32"}
],

"bufferSize" : 50000
}

"bufferSize": 50000
}
27 changes: 12 additions & 15 deletions docker/conf/tasks/test_auto_schema.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
{

"name" : "test_auto_schema",

"name": "test_auto_schema",
"kafka": "kfk1",
"topic": "topic1",
"consumerGroup" : "test_auto_schema",
"earliest" : true,
"parser" : "json",
"clickhouse" : "ch1",

"autoSchema" : true,
"tableName" : "test_auto_schema",
"excludeColumns" : ["day", "time"],

"bufferSize" : 50000
}

"consumerGroup": "test_auto_schema",
"earliest": true,
"parser": "json",
"clickhouse": "ch1",
"autoSchema": true,
"tableName": "test_auto_schema",
"excludeColumns": [
"day"
],
"bufferSize": 50000
}
22 changes: 9 additions & 13 deletions go.test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,33 @@ curl "localhost:8123" -d 'DROP TABLE IF EXISTS test1'
curl "localhost:8123" -d 'CREATE TABLE IF NOT EXISTS test1
(
`day` Date DEFAULT toDate(time),
`time` DateTime DEFAULT toDateTime(timestamp / 1000),
`timestamp` UInt64,
`time` DateTime,
`name` String,
`value` Float64
)
ENGINE = MergeTree
PARTITION BY day
ORDER BY time'
ORDER BY (time, name)'

curl "localhost:8123" -d 'DROP TABLE IF EXISTS test_auto_schema'
curl "localhost:8123" -d 'CREATE TABLE IF NOT EXISTS test_auto_schema
(
`day` Date DEFAULT toDate(time),
`time` DateTime DEFAULT toDateTime(timestamp / 1000),
`timestamp` UInt64,
`time` DateTime,
`name` String,
`value` Float64
)
ENGINE = MergeTree
PARTITION BY day
ORDER BY time'

ORDER BY (time, name)'

## send the messages to kafka
current_timestamp=`date +%s`000
now=`date --rfc-3339=ns`
for i in `seq 1 100000`;do
echo "{\"timestamp\" : \"${current_timestamp}\", \"name\" : \"sundy-li\", \"value\" : \"$i\" }"
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : \"$i\" }"
done > a.json
echo "generated a.json"
echo "cat /tmp/a.json | kafka-console-producer --topic topic1 --broker-list localhost:9092" > send.sh

sudo docker cp a.json kafka:/tmp/
sudo docker cp send.sh kafka:/tmp/
sudo docker exec kafka sh /tmp/send.sh
Expand All @@ -53,9 +50,8 @@ echo "Got test_auto_schema count => $count"


## reset kafka consumer-group offsets
echo "kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_sinker --all-topics --to-earliest; kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_auto_schema --all-topics --to-earliest" > reset-offsets.sh
sudo docker cp reset-offsets.sh kafka:/tmp/
sudo docker exec kafka sh /tmp/reset-offsets.sh
sudo docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_sinker --all-topics --to-earliest
sudo docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_auto_schema --all-topics --to-earliest

## truncate tables
curl "localhost:8123" -d 'TRUNCATE TABLE test1'
Expand Down
22 changes: 10 additions & 12 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func shouldReconnect(err error) bool {
// LoopWrite will dead loop to write the records
func (c *ClickHouse) loopWrite(batch *model.Batch, callback func(batch *model.Batch) error) {
var err error
times := c.chCfg.RetryTimes
var times int
for {
if err = c.write(batch); err == nil {
for {
Expand All @@ -150,13 +150,12 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, callback func(batch *model.Ba
log.Infof("%s: ClickHouse.loopWrite quit due to the context has been cancelled", c.taskCfg.Name)
return
}
log.Errorf("%s: committing offset(try #%d) failed with error %+v", c.taskCfg.Name, c.chCfg.RetryTimes-times, err)
if c.chCfg.RetryTimes > 0 {
times--
if times <= 0 {
os.Exit(-1)
}
log.Errorf("%s: committing offset(try #%d) failed with error %+v", c.taskCfg.Name, times, err)
times++
if c.chCfg.RetryTimes <= 0 || times < c.chCfg.RetryTimes {
time.Sleep(10 * time.Second)
} else {
os.Exit(-1)
}
}
}
Expand All @@ -166,12 +165,11 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, callback func(batch *model.Ba
}
log.Errorf("%s: flush batch(try #%d) failed with error %+v", c.taskCfg.Name, c.chCfg.RetryTimes-times, err)
statistics.FlushMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(batch.RealSize))
if c.chCfg.RetryTimes > 0 {
times--
if times <= 0 {
os.Exit(-1)
}
times++
if c.chCfg.RetryTimes <= 0 || times < c.chCfg.RetryTimes {
time.Sleep(10 * time.Second)
} else {
os.Exit(-1)
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *Connection) ReConnect() error {
log.Info("reconnect to ", c.dsn, err.Error())
return err
}
setDbParams(sqlDB)
setDBParams(sqlDB)
log.Info("reconnect success to ", c.dsn)
c.DB = sqlDB
return nil
Expand Down Expand Up @@ -104,7 +104,7 @@ func InitConn(name string, hosts [][]string, port int, db, username, password, d
err = errors.Wrapf(err, "")
return
}
setDbParams(sqlDB)
setDBParams(sqlDB)
cc.connections = append(cc.connections, &Connection{sqlDB, dsn})
}

Expand All @@ -124,7 +124,7 @@ func InitConn(name string, hosts [][]string, port int, db, username, password, d
return nil
}

func setDbParams(sqlDB *sql.DB) {
func setDBParams(sqlDB *sql.DB) {
sqlDB.SetMaxIdleConns(1)
sqlDB.SetConnMaxIdleTime(10 * time.Second)
}
Expand All @@ -134,13 +134,13 @@ func FreeConn(name string) {
defer lock.Unlock()
if cc, ok := poolMaps[name]; ok {
cc.ref--
if cc.ref <= 0 {
if cc.ref == 0 {
delete(poolMaps, name)
}
for _, conn := range cc.connections {
if err := health.Health.RemoveReadinessCheck(conn.dsn); err != nil {
err = errors.Wrapf(err, "")
log.Errorf("got error: %+v", err)
for _, conn := range cc.connections {
if err := health.Health.RemoveReadinessCheck(conn.dsn); err != nil {
err = errors.Wrapf(err, conn.dsn)
log.Errorf("got error: %+v", err)
}
}
}
}
Expand Down

0 comments on commit f2c1fea

Please sign in to comment.