Skip to content

Commit

Permalink
Update the doc and allow customization of DatabaseName in task level
Browse files Browse the repository at this point in the history
  • Loading branch information
momingkotoba committed Jan 2, 2023
1 parent dfc271d commit ac3ce76
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 67 deletions.
3 changes: 2 additions & 1 deletion docs/configuration/config.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Config Items

> Here we use json with comments for documentation
> Here we use json with comments for documentation, config file in hjson format is also supported
```json
{
Expand Down Expand Up @@ -105,6 +105,7 @@
"parser": "json",

// clickhouse table name
// override the clickhouse.db with "db.tableName" format, eg "default.tbl1"
"tableName": "daily",

// columns of the table
Expand Down
26 changes: 11 additions & 15 deletions docs/dev/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,20 @@ So if you setup ClickHouse properly(ReplacingMergeTree ORDER BY (__kafak_topic,

It's hard for clickhouse_sinker to guarantee exactly-once semantic without ReplacingMergeTree. Kafka consumer group load-balance cause duplicated messages if one consumer crash suddenly.

### Sharding with kafka message offset stripe (default)
## Workflow

The flow is:
Internally, clickhouse_sinker groups tasks that with identical "consumerGroup" property set together for the purpose of reducing the number of Kafka client, so that Kafka server is able to handle more requests concurrently. And consequently, it's decided to commit Kafka offset only after messages in a whole fetch got written to clickhouse completely.

- Fetch message via Franz, Sarama, or kafka-go, which starts internally a goroutine for each partition.
- Parse messages in a global goroutine pool(pool size is customizable), fill the result into a ring according to the message's partition and offset.
- Generate a batch when messages in a ring reach a batchSize boundary or flush timer fire. For each message, the dest shard is determined by `(kafka_offset/roundup(buffer_size))%clickhouse_shards`.
- Write batch to ClickHouse in a global goroutine pool(pool size is fixed according to the number of tasks and Clickhouse shards).
The flow is like this:

### Sharding with custom key

The flow is:

- Fetch message via kafka-go or samara, which starts internally a goroutine for each partition.
- Parse messages in a global goroutine pool(pool size is customizable), fill the result into a ring according to the message's partition and offset.
- Shard messages in a ring when messages reach a batchSize boundary or flush timer fire. For each message, if the sharding key is numerical(integer, float, time, etc.), the dest shard is determined by `(shardingKey/shardingStripe)%clickhouse_shards`, otherwise it is determined by `xxHash64(shardingKey)%clickhouse_shards`.
- Generate batches for all shard slots if messages in one shard slot reach batchSize boundary or flush timer fire. Those batches form a `BatchGroup`. The `before` relationship could be impossible if messages of a partition are distributed to multiple batches. So those batches need to be committed after ALL of them have been written to Clickhouse.
- Write batch to ClickHouse in a global goroutine pool(pool size is fixed according to the number of tasks and Clickhouse shards).
- Group tasks with identical "consumerGroup" property together, fetch messages for the group of tasks with a single goroutine.
- Route fetched messages to the individual tasks for further parsing. By default, the mapping between messages and tasks is controlled by "topic" and "tableName" property. But for messages with Kafka header "__table_name" specified, the mapping between "__table_name" and "tableName" will override the default behavior.
- Parse messages and calculate the dest shard:
-- For tasks with "shardingkey" property specified, if the sharding key is numerical(integer, float, time, etc.), the dest shard is determined by `(shardingKey/shardingStripe)%clickhouse_shards`, if not, it is determined by `xxHash64(shardingKey)%clickhouse_shards`.
-- Otherwise, the dest shard for each message is determined by `(kafka_offset/roundup(buffer_size))%clickhouse_shards`.
- Generate batches for all shard slots that are in the same Group, when total cached message count in the Group reached `sum(batchSize)*80%` boundary or flush timer fire.
- Write batches to ClickHouse in a global goroutine pool(pool size is a fixed number based on the number of tasks and Clickhouse shards).
- Commit offset back to Kafka


## Task scheduling
Expand Down
28 changes: 4 additions & 24 deletions docs/dev/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ Refers to [design](./design.md) for how it works.
- Uses native ClickHouse client-server TCP protocol, with higher performance than HTTP.
- Easy to use and deploy, you don't need write any hard code, just care about the configuration file
- Support multiple parsers: fastjson(recommended), gjson, csv.
- Support multiple Kafka client: sarama(recommended), kafka-go.
- Support multiple Kafka security mechanisms: SSL, SASL/PLAIN, SASL/SCRAM, SASL/GSSAPI and combinations of them.
- Bulk insert (by config `bufferSize` and `flushInterval`).
- Powered by Franz-go, which is the fastest and most cpu and memory efficient Kafka client in Go.
- Parse messages concurrently.
- Write batches concurrently.
- Every batch is routed to a determined clickhouse shard. Exit if loop write fail.
Expand All @@ -35,6 +35,7 @@ Refers to [design](./design.md) for how it works.
- [x] Enum
- [x] Array(T), where T is one of above basic types
- [x] Nullable(T), where T is one of above basic types
- [x] Map

Note:

Expand Down Expand Up @@ -164,7 +165,7 @@ Please follow [`Kafka SSL setup`](https://kafka.apache.org/documentation/#securi

### Kafka Authentication

clickhouse_sinker support following following authentication mechanisms:
clickhouse_sinker support the following authentication mechanisms:

- No authentication

Expand Down Expand Up @@ -339,13 +340,6 @@ type Parser interface {
Parse(bs []byte) model.Metric
}

type Inputer interface {
Init(cfg *config.Config, taskName string, putFn func(msg model.InputMessage)) error
Run(ctx context.Context)
Stop() error
CommitMessages(ctx context.Context, message *model.InputMessage) error
}

// RemoteConfManager can be implemented by many backends: Nacos, Consul, etcd, ZooKeeper...
type RemoteConfManager interface {
Init(properties map[string]interface{}) error
Expand All @@ -366,25 +360,11 @@ type RemoteConfManager interface {

Kafka release history is at [here](https://kafka.apache.org/downloads). Kafka broker [exposes versions of various APIs it supports since 0.10.0.0](https://kafka.apache.org/protocol#api_versions).

### Kafka-go

- Kafka-go [negotiate it's protocol version](https://github.com/segmentio/kafka-go/blob/c66d8ca149e7f1a7905b47a60962745ceb08a6a9/conn.go#L209).
- Kafka-go [doesn't support Kerberos authentication](https://github.com/segmentio/kafka-go/issues/237).

### Sarama

- Sarama guarantees compatibility [with Kafka 2.6 through 2.8](https://github.com/Shopify/sarama/blob/master/README.md#compatibility-and-api-stability).
- Sarama [has tied it's protocol usage to the Version field in Config](https://github.com/Shopify/sarama/issues/1732). My experience is setting `Config.Version` to "0.11.0.0" or "2.5.0" cannot cowork with broker 2.2.0.
- Sarama consumer API provides generation cleanup callback. This ensures `exactly once` when consumer-group rebalance occur.

### Franz
### Franz-go Kafka client

- Franz negotiates it's protocol version.
- Franz supports Kerberos authentication.
- Franz supports generation cleanup callback.
- Franz wins Sarama and Kafka-go at benchmark competition.
- Franz project is young but very active.

### Conclusion

Franz is the best Golang client library, though none is as mature as the officaial Kafka Java client. You need to try another if clickhouse_sinker fails to connect with Kafka.
1 change: 1 addition & 0 deletions go.test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ echo "send messages to kafka"
echo "cat /tmp/a.json | kafka-console-producer --topic topic1 --broker-list localhost:9092" > send.sh
sudo docker cp a.json kafka:/tmp/
sudo docker cp send.sh kafka:/tmp/
sudo docker exec kafka kafka-topics --bootstrap-server localhost:9093 --topic topic1 --delete
sudo docker exec kafka sh /tmp/send.sh

echo "start clickhouse_sinker to consume"
Expand Down
56 changes: 33 additions & 23 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@ var (

// ClickHouse is an output service consumers from kafka messages
type ClickHouse struct {
Dims []*model.ColumnWithType
NumDims int
IdxSerID int
NameKey string
cfg *config.Config
taskCfg *config.TaskConfig
Dims []*model.ColumnWithType
NumDims int
IdxSerID int
NameKey string
cfg *config.Config
taskCfg *config.TaskConfig
TableName string
DBName string

prepareSQL string
promSerSQL string
seriesTbl string
Expand Down Expand Up @@ -211,7 +214,7 @@ func (c *ClickHouse) write(batch *model.Batch, sc *pool.ShardConn, dbVer *int) (
if numBad, err = writeRows(c.prepareSQL, *batch.Rows, 0, numDims, conn); err != nil {
return
}
statistics.WritingDurations.WithLabelValues(c.taskCfg.Name, c.taskCfg.TableName).Observe(time.Since(begin).Seconds())
statistics.WritingDurations.WithLabelValues(c.taskCfg.Name, c.TableName).Observe(time.Since(begin).Seconds())
if numBad != 0 {
statistics.ParseMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(numBad))
}
Expand Down Expand Up @@ -262,7 +265,7 @@ func (c *ClickHouse) initBmSeries(conn clickhouse.Conn) (err error) {
util.Logger.Info(fmt.Sprintf("skipped loading series from %v", tbl), zap.String("task", c.taskCfg.Name))
return
}
query := fmt.Sprintf("SELECT toInt64(__series_id) AS sid, toInt64(__mgmt_id) AS mid FROM %s.%s ORDER BY sid", c.cfg.Clickhouse.DB, tbl)
query := fmt.Sprintf("SELECT toInt64(__series_id) AS sid, toInt64(__mgmt_id) AS mid FROM %s.%s ORDER BY sid", c.DBName, tbl)
util.Logger.Info(fmt.Sprintf("executing sql=> %s", query))
var rs driver.Rows
var seriesID, mgmtID int64
Expand Down Expand Up @@ -301,23 +304,23 @@ func (c *ClickHouse) initSeriesSchema(conn clickhouse.Conn) (err error) {
}
}
if dimSerID == nil {
err = errors.Newf("Metric table %s shall have column `__series_id UInt64`.", c.taskCfg.TableName)
err = errors.Newf("Metric table %s.%s shall have column `__series_id UInt64`.", c.DBName, c.TableName)
return
}
c.IdxSerID = len(c.Dims)
c.Dims = append(c.Dims, dimSerID)

// Add string columns from series table
c.seriesTbl = c.taskCfg.TableName + "_series"
c.seriesTbl = c.TableName + "_series"
expSeriesDims := []*model.ColumnWithType{
{Name: "__series_id", Type: &model.TypeInfo{Type: model.Int64}},
{Name: "__mgmt_id", Type: &model.TypeInfo{Type: model.Int64}},
{Name: "labels", Type: &model.TypeInfo{Type: model.String}},
}
var seriesDims []*model.ColumnWithType
if seriesDims, err = getDims(c.cfg.Clickhouse.DB, c.seriesTbl, nil, c.taskCfg.Parser, conn); err != nil {
if seriesDims, err = getDims(c.DBName, c.seriesTbl, nil, c.taskCfg.Parser, conn); err != nil {
if errors.Is(err, ErrTblNotExist) {
err = errors.Wrapf(err, "Please create series table for %s.%s", c.cfg.Clickhouse.DB, c.taskCfg.TableName)
err = errors.Wrapf(err, "Please create series table for %s.%s", c.DBName, c.TableName)
return
}
return
Expand Down Expand Up @@ -354,7 +357,7 @@ func (c *ClickHouse) initSeriesSchema(conn clickhouse.Conn) (err error) {
serDimsQuoted[i] = fmt.Sprintf("`%s`", serDim.Name)
}
c.promSerSQL = fmt.Sprintf("INSERT INTO `%s`.`%s` (%s)",
c.cfg.Clickhouse.DB,
c.DBName,
c.seriesTbl,
strings.Join(serDimsQuoted, ","))

Expand All @@ -377,13 +380,20 @@ func (c *ClickHouse) initSeriesSchema(conn clickhouse.Conn) (err error) {
}

func (c *ClickHouse) initSchema() (err error) {
if idx := strings.Index(c.taskCfg.TableName, "."); idx > 0 {
c.TableName = c.taskCfg.TableName[idx+1:]
c.DBName = c.taskCfg.TableName[0:idx]
} else {
c.TableName = c.taskCfg.TableName[idx+1:]
c.DBName = c.cfg.Clickhouse.DB
}
sc := pool.GetShardConn(0)
var conn clickhouse.Conn
if conn, _, err = sc.NextGoodReplica(0); err != nil {
return
}
if c.taskCfg.AutoSchema {
if c.Dims, err = getDims(c.cfg.Clickhouse.DB, c.taskCfg.TableName, c.taskCfg.ExcludeColumns, c.taskCfg.Parser, conn); err != nil {
if c.Dims, err = getDims(c.DBName, c.TableName, c.taskCfg.ExcludeColumns, c.taskCfg.Parser, conn); err != nil {
return
}
} else {
Expand All @@ -410,18 +420,18 @@ func (c *ClickHouse) initSchema() (err error) {
quotedDms[i] = fmt.Sprintf("`%s`", c.Dims[i].Name)
}
c.prepareSQL = fmt.Sprintf("INSERT INTO `%s`.`%s` (%s)",
c.cfg.Clickhouse.DB,
c.taskCfg.TableName,
c.DBName,
c.TableName,
strings.Join(quotedDms, ","))
util.Logger.Info(fmt.Sprintf("Prepare sql=> %s", c.prepareSQL), zap.String("task", c.taskCfg.Name))

// Check distributed metric table
if chCfg := &c.cfg.Clickhouse; chCfg.Cluster != "" {
if c.distMetricTbls, err = c.getDistTbls(c.taskCfg.TableName); err != nil {
if c.distMetricTbls, err = c.getDistTbls(c.TableName); err != nil {
return
}
if c.distMetricTbls == nil {
err = errors.Newf("Please create distributed table for %s.", c.taskCfg.TableName)
err = errors.Newf("Please create distributed table for %s.", c.TableName)
return
}
}
Expand Down Expand Up @@ -475,12 +485,12 @@ func (c *ClickHouse) ChangeSchema(newKeys *sync.Map) (err error) {
}
if c.taskCfg.PrometheusSchema {
if intVal == model.String {
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` %s ADD COLUMN IF NOT EXISTS `%s` %s", chCfg.DB, c.seriesTbl, onCluster, strKey, strVal)
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` %s ADD COLUMN IF NOT EXISTS `%s` %s", c.DBName, c.seriesTbl, onCluster, strKey, strVal)
queries = append(queries, query)
affectDistSeries = true
}
} else {
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` %s ADD COLUMN IF NOT EXISTS `%s` %s", chCfg.DB, taskCfg.TableName, onCluster, strKey, strVal)
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` %s ADD COLUMN IF NOT EXISTS `%s` %s", c.DBName, c.TableName, onCluster, strKey, strVal)
queries = append(queries, query)
affectDistMetric = true
}
Expand All @@ -504,12 +514,12 @@ func (c *ClickHouse) ChangeSchema(newKeys *sync.Map) (err error) {
}
if chCfg.Cluster != "" {
if affectDistMetric {
if err = recreateDistTbls(chCfg.Cluster, chCfg.DB, c.taskCfg.TableName, c.distMetricTbls, conn); err != nil {
if err = recreateDistTbls(chCfg.Cluster, c.DBName, c.TableName, c.distMetricTbls, conn); err != nil {
return
}
}
if affectDistSeries {
if err = recreateDistTbls(chCfg.Cluster, chCfg.DB, c.seriesTbl, c.distSeriesTbls, conn); err != nil {
if err = recreateDistTbls(chCfg.Cluster, c.DBName, c.seriesTbl, c.distSeriesTbls, conn); err != nil {
return
}
}
Expand All @@ -526,7 +536,7 @@ func (c *ClickHouse) getDistTbls(table string) (distTbls []string, err error) {
return
}
query := fmt.Sprintf(`SELECT name FROM system.tables WHERE engine='Distributed' AND database='%s' AND match(create_table_query, 'Distributed\(\'%s\', \'%s\', \'%s\'.*\)')`,
chCfg.DB, chCfg.Cluster, chCfg.DB, table)
c.DBName, chCfg.Cluster, c.DBName, table)
util.Logger.Info(fmt.Sprintf("executing sql=> %s", query), zap.String("task", taskCfg.Name))

var rows driver.Rows
Expand Down
2 changes: 1 addition & 1 deletion task/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *Consumer) processFetch() {

c.tasks.Range(func(key, value any) bool {
tsk := value.(*Service)
if (tablename != "" && tsk.taskCfg.TableName == tablename) || tsk.taskCfg.Topic == rec.Topic {
if (tablename != "" && tsk.clickhouse.TableName == tablename) || tsk.taskCfg.Topic == rec.Topic {
bufLength++
if e := tsk.Put(msg, flushFn); e != nil {
atomic.StoreInt64(&done, items)
Expand Down
5 changes: 2 additions & 3 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Service struct {
consumer *Consumer
}

// cloneTask create a new task by steal members from s instead of creating a new one
// cloneTask create a new task by stealing members from s instead of creating a new one
func cloneTask(s *Service, newGroup *Consumer) (service *Service) {
service = &Service{
clickhouse: s.clickhouse,
Expand Down Expand Up @@ -183,7 +183,6 @@ func (service *Service) Put(msg *model.InputMessage, flushFn func()) error {
// 3) apply the schema change.
// 4) recreate the service
util.Logger.Warn("new key detected, consumer is going to restart", zap.String("consumer group", service.taskCfg.ConsumerGroup), zap.Error(err))
service.consumer.state.Store(util.StateStopped)
go service.consumer.restart()
flushFn()
if err = service.clickhouse.ChangeSchema(&service.newKeys); err != nil {
Expand All @@ -202,7 +201,7 @@ func (service *Service) Put(msg *model.InputMessage, flushFn func()) error {
util.Logger.Fatal("shard number calculation failed", zap.String("task", taskCfg.Name), zap.Error(err))
}
} else {
msgRow.Shard = int(msgRow.Msg.Offset>>17) % service.sharder.shards
msgRow.Shard = int(msgRow.Msg.Offset>>int64(util.GetShift(service.taskCfg.BufferSize))) % service.sharder.shards
}
service.sharder.PutElement(&msgRow)
}
Expand Down

0 comments on commit ac3ce76

Please sign in to comment.