diff --git a/config/config.go b/config/config.go index 3438b08c..8be5bd7c 100644 --- a/config/config.go +++ b/config/config.go @@ -142,15 +142,15 @@ type TaskConfig struct { FlushInterval int `json:"flushInterval,omitempty"` BufferSize int `json:"bufferSize,omitempty"` - MinBufferSize int `json:"minBufferSize,omitempty"` MsgSizeHint int `json:"msgSizeHint,omitempty"` TimeZone string `json:"timezone"` } const ( - defaultFlushInterval = 3 - defaultBufferSize = 1 << 20 //1048576 - defaultMinBufferSize = 1 << 14 // 16384 + maxFlushInterval = 10 + defaultFlushInterval = 5 + MaxBufferSize = 1 << 20 //1048576 + defaultBufferSize = 1 << 18 //262144 defaultMsgSizeHint = 1000 defaultTimeZone = "Local" defaultLogLevel = "info" @@ -214,17 +214,16 @@ func (cfg *Config) Normallize() (err error) { if cfg.Task.FlushInterval <= 0 { cfg.Task.FlushInterval = defaultFlushInterval + } else if cfg.Task.FlushInterval > maxFlushInterval { + cfg.Task.FlushInterval = maxFlushInterval } if cfg.Task.BufferSize <= 0 { cfg.Task.BufferSize = defaultBufferSize + } else if cfg.Task.BufferSize > MaxBufferSize { + cfg.Task.BufferSize = MaxBufferSize } else { cfg.Task.BufferSize = 1 << util.GetShift(cfg.Task.BufferSize) } - if cfg.Task.MinBufferSize <= 0 { - cfg.Task.MinBufferSize = defaultMinBufferSize - } else { - cfg.Task.MinBufferSize = 1 << util.GetShift(cfg.Task.MinBufferSize) - } if cfg.Task.MsgSizeHint <= 0 { cfg.Task.MsgSizeHint = defaultMsgSizeHint } diff --git a/docs/configuration/config.md b/docs/configuration/config.md index 5237a641..a4fb4463 100644 --- a/docs/configuration/config.md +++ b/docs/configuration/config.md @@ -121,23 +121,21 @@ // shardingPolicy is `stripe,`(requires ShardingKey be numerical) or `hash`(requires ShardingKey be string) "shardingPolicy": "", - // interval of flushing the batch + // interval of flushing the batch. Default to 5, max to 10. "flushInterval": 5, - // batch size to insert into clickhouse. sinker will round upward it to the the nearest 2^n. + // batch size to insert into clickhouse. sinker will round upward it to the the nearest 2^n. Default to 262114, max to 1048576. "bufferSize": 90000, - // min batch size to insert into clickhouse. sinker will round upward it to the the nearest 2^n. - "minBufferSize": 1, - // estimated avg message size. kafka-go needs this to determize receive buffer size. default to 1000. + // estimated avg message size. kafka-go needs this to determize receive buffer size. Default to 1000. "msgSizeHint": 1000, // In the absence of time zone information, interprets the time as in the given location. Default to "Local" (aka /etc/localtime of the machine on which sinker runs) "timezone": "" }, - // log level, possible value: "debug", "info", "warn", "error", "dpanic", "panic", "fatal" + // log level, possible value: "debug", "info", "warn", "error", "dpanic", "panic", "fatal". Default to "info". "logLevel": "debug", - // log output paths, possible value: "stdout", "stderr", relative file path, absoute file path. Log files will be rotated every 100MB, keep 10 old ones. + // log output paths, possible value: "stdout", "stderr", relative file path, absoute file path. Log files will be rotated every 100MB, keep 10 old ones. Default to ["stdout"]. "logPaths": ["stdout", "test_dynamic_schema.log"] } ``` diff --git a/go.test.sh b/go.test.sh index 36981bc7..9f352f53 100755 --- a/go.test.sh +++ b/go.test.sh @@ -51,7 +51,7 @@ timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_fixed_schema.json timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.json timeout 60 ./clickhouse_sinker --local-cfg-file docker/test_dynamic_schema.json -echo "check result" +echo "check result 1" count=`curl "localhost:8123" -d 'select count() from test_fixed_schema'` echo "Got test_fixed_schema count => $count" [ $count -eq 100000 ] || exit 1 @@ -88,7 +88,7 @@ timeout 30 ./clickhouse_sinker --nacos-addr 127.0.0.1:8848 --nacos-username naco timeout 30 ./clickhouse_sinker --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_auto_schema timeout 30 ./clickhouse_sinker --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_dynamic_schema -echo "check result" +echo "check result 2" count=`curl "localhost:8123" -d 'select count() from test_fixed_schema'` echo "Got test_fixed_schema count => $count" [ $count -eq 100000 ] || exit 1 diff --git a/input/kafka_go.go b/input/kafka_go.go index efdcac8b..c1e0c8b9 100644 --- a/input/kafka_go.go +++ b/input/kafka_go.go @@ -64,9 +64,9 @@ func (k *KafkaGo) Init(cfg *config.Config, taskName string, putFn func(msg model GroupID: k.cfg.Task.ConsumerGroup, Topic: k.cfg.Task.Topic, StartOffset: offset, - MinBytes: k.cfg.Task.MinBufferSize * k.cfg.Task.MsgSizeHint, + MinBytes: (k.cfg.Task.BufferSize / 2) * k.cfg.Task.MsgSizeHint, MaxBytes: k.cfg.Task.BufferSize * k.cfg.Task.MsgSizeHint, - MaxWait: time.Duration(k.cfg.Task.FlushInterval) * time.Second, + MaxWait: time.Duration(3) * time.Second, CommitInterval: time.Second, // flushes commits to Kafka every second } if kfkCfg.TLS.CaCertFiles == "" && kfkCfg.TLS.TrustStoreLocation != "" { diff --git a/input/kafka_sarama.go b/input/kafka_sarama.go index 805f7722..a8cf2089 100644 --- a/input/kafka_sarama.go +++ b/input/kafka_sarama.go @@ -127,7 +127,7 @@ func (k *KafkaSarama) Init(cfg *config.Config, taskName string, putFn func(msg m if taskCfg.Earliest { config.Consumer.Offsets.Initial = sarama.OffsetOldest } - config.ChannelBufferSize = taskCfg.MinBufferSize + config.ChannelBufferSize = 1024 cg, err := sarama.NewConsumerGroup(strings.Split(kfkCfg.Brokers, ","), taskCfg.ConsumerGroup, config) if err != nil { return err diff --git a/parser/parser_test.go b/parser/parser_test.go index 6fd0a0c3..60e5a88a 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -630,9 +630,9 @@ func TestParseInt(t *testing.T) { uvAct = uint32(uv) case 64: ivExp = i64Exp[i] - ivAct = int64(iv) + ivAct = iv uvExp = u64Exp[i] - uvAct = uint64(uv) + uvAct = uv } desc = fmt.Sprintf(`ParseInt("%s", 10, %d)=%d(%v)`, s, bitSize, iv, errors.Unwrap(ivErr)) require.Equal(t, ivExp, ivAct, desc) diff --git a/pool/conn.go b/pool/conn.go index 8f699bda..0143201f 100644 --- a/pool/conn.go +++ b/pool/conn.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/housepower/clickhouse_sinker/config" "github.com/housepower/clickhouse_sinker/health" "github.com/housepower/clickhouse_sinker/util" "github.com/pkg/errors" @@ -33,10 +34,6 @@ import ( "go.uber.org/zap" ) -const ( - BlockSize = 1 << 21 //2097152, two times of the default value -) - var ( lock sync.Mutex clusterConn []*ShardConn @@ -125,7 +122,7 @@ func InitClusterConn(hosts [][]string, port int, db, username, password, dsnPara // Each shard has a *sql.DB which connects to one replica inside the shard. // "alt_hosts" tolerates replica single-point-failure. However more flexable switching is needed for some cases for example https://github.com/ClickHouse/ClickHouse/issues/24036. dsnTmpl = "tcp://%s" + fmt.Sprintf("?database=%s&username=%s&password=%s&block_size=%d", - url.QueryEscape(db), url.QueryEscape(username), url.QueryEscape(password), BlockSize) + url.QueryEscape(db), url.QueryEscape(username), url.QueryEscape(password), 2*config.MaxBufferSize) if dsnParams != "" { dsnTmpl += "&" + dsnParams }