Skip to content

Commit

Permalink
updated generators
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Jul 25, 2022
1 parent fb0c86b commit 86311cd
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cmd/kafka_gen_log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ CREATE TABLE apache_access_log ON CLUSTER abc (
xforwardfor LowCardinality(String)
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{database}/{table}/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp, `@hostname`, `@path`, `@lineno`);
ORDER BY (`@hostname`, `@path`, `@lineno`, timestamp);
CREATE TABLE dist_apache_access_log ON CLUSTER abc AS apache_access_log ENGINE = Distributed(abc, default, apache_access_log);
Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka_gen_metric/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ CREATE TABLE sensor_dt_result_online ON CLUSTER abc (
is_missing Int32
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{database}/{table}/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(`@time`)
ORDER BY (`@time`, `@ItemGUID`, `@MetricName`);
ORDER BY (`@ItemGUID`, `@MetricName`, `@time`);
CREATE TABLE dist_sensor_dt_result_online ON CLUSTER abc AS sensor_dt_result_online ENGINE = Distributed(abc, default, sensor_dt_result_online);
Expand Down
64 changes: 40 additions & 24 deletions cmd/kafka_gen_prom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,37 @@ performance of inserting to sparse wide table is bad
CREATE TABLE prom_extend ON CLUSTER abc (
timestamp DateTime,
value Float64,
__name__ String
__name__ String,
labels String
) ENGINE=ReplicatedMergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (__name__, timestamp);
CREATE TABLE dist_prom_extend ON CLUSTER abc AS prom_extend ENGINE = Distributed(abc, default, prom_extend);
CREATE TABLE prom_metric ON CLUSTER abc (
__series_id Int64,
timestamp DateTime,
value Float64
) ENGINE=ReplicatedReplacingMergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (__series_id, timestamp);
CREATE TABLE dist_prom_metric ON CLUSTER abc AS prom_metric ENGINE = Distributed(abc, default, prom_metric);
CREATE TABLE prom_metric_series ON CLUSTER abc (
__series_id Int64,
__mgmt_id Int64,
labels String,
__name__ String
) ENGINE=ReplicatedReplacingMergeTree()
ORDER BY (__name__, __series_id);
CREATE TABLE dist_prom_metric_series ON CLUSTER abc AS prom_metric_series ENGINE = Distributed(abc, default, prom_metric_series);
*/

import (
"bytes"
"context"
"flag"
"fmt"
Expand All @@ -29,6 +49,7 @@ import (
"syscall"
"time"

"github.com/bytedance/sonic"
"github.com/google/gops/agent"
"github.com/housepower/clickhouse_sinker/util"
"github.com/thanos-io/thanos/pkg/errors"
Expand All @@ -38,8 +59,10 @@ import (

const (
Alpha = "abcdefghijklmnopqrstuvwxyz"
NumMetrics = 1000000
NumKeys = 10
NumMetrics = 1000
NumKeys = 5
NumRunes = 10
LenVal = 1 // 1000 * (10^1)^5 = 10^8 series
NumAllKeys = 1000
)

Expand All @@ -62,13 +85,13 @@ type Datapoint struct {

// I need every label be present at the top level.
func (dp Datapoint) MarshalJSON() ([]byte, error) {
var bbuf bytes.Buffer
bbuf.WriteString(fmt.Sprintf(`{"timestamp":"%s", "value":%f,"__name__":"%s"`, dp.Timestamp.Format(time.RFC3339), dp.Value, dp.Name))
for key, val := range dp.Labels {
bbuf.WriteString(fmt.Sprintf(`,"%s":"%s"`, key, val))
labels, err := sonic.MarshalString(dp.Labels)
if err != nil {
return nil, err
}
bbuf.WriteByte('}')
return bbuf.Bytes(), nil
labels2 := labels[1 : len(labels)-1]
msg := fmt.Sprintf(`{"timestamp":"%s", "value":%f,"__name__":"%s","labels":%s,%s}`, dp.Timestamp.Format(time.RFC3339), dp.Value, dp.Name, labels, labels2)
return []byte(msg), nil
}

type PromMetric struct {
Expand All @@ -77,18 +100,9 @@ type PromMetric struct {
}

func randValue() (val string) {
mod := rand.Intn(2)
var leng, maxN int
if mod == 0 { //10^5=100000
leng = 5
maxN = 10
} else { //3^10 = 59049
leng = 10
maxN = 3
}
b := make([]byte, leng)
for i := 0; i < leng; i++ {
b[i] = Alpha[rand.Intn(maxN+1)]
b := make([]byte, LenVal)
for i := 0; i < LenVal; i++ {
b[i] = Alpha[rand.Intn(NumRunes+1)]
}
val = string(b)
return
Expand All @@ -98,11 +112,11 @@ func initMetrics() {
metrics = make([]PromMetric, NumMetrics)
for i := 0; i < NumMetrics; i++ {
m := PromMetric{
Name: fmt.Sprintf("metric-%08d", i),
Name: fmt.Sprintf("metric_%08d", i),
LabelKeys: make([]string, NumKeys),
}
for j := 0; j < NumKeys; j++ {
key := fmt.Sprintf("key-%06d", rand.Intn(NumAllKeys+1))
key := fmt.Sprintf("key_%06d", rand.Intn(NumAllKeys+1))
m.LabelKeys[j] = key
}
metrics[i] = m
Expand Down Expand Up @@ -190,6 +204,8 @@ topic: for example, prom_extend`, os.Args[0], os.Args[0])
zap.String("KafkaTopic", KafkaTopic),
zap.Int("NumMetrics", NumMetrics),
zap.Int("NumKeys", NumKeys),
zap.Int("NumRunes", NumRunes),
zap.Int("LenVal", LenVal),
zap.Int("NumAllKeys", NumAllKeys),
)

Expand Down

0 comments on commit 86311cd

Please sign in to comment.