Skip to content

Commit

Permalink
fix: improve Clickhouse corner cases for empty recordset in aggregati…
Browse files Browse the repository at this point in the history
…on queries, fix dictionaries behavior (#9401)

Signed-off-by: Eugene Klimov <[email protected]>
  • Loading branch information
Slach authored Aug 10, 2021
1 parent 7def7e7 commit b193052
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 40 deletions.
51 changes: 35 additions & 16 deletions plugins/inputs/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,34 @@ func (ch *ClickHouse) clusterIncludeExcludeFilter() string {
}

func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, metric string) error {
var result []struct {
var intResult []struct {
Metric string `json:"metric"`
Value chUInt64 `json:"value"`
}
if err := ch.execQuery(conn.url, commonMetrics[metric], &result); err != nil {
return err

var floatResult []struct {
Metric string `json:"metric"`
Value float64 `json:"value"`
}

tags := ch.makeDefaultTags(conn)

fields := make(map[string]interface{})
for _, r := range result {
fields[internal.SnakeCase(r.Metric)] = uint64(r.Value)
}

if commonMetricsIsFloat[metric] {
if err := ch.execQuery(conn.url, commonMetrics[metric], &floatResult); err != nil {
return err
}
for _, r := range floatResult {
fields[internal.SnakeCase(r.Metric)] = r.Value
}
} else {
if err := ch.execQuery(conn.url, commonMetrics[metric], &intResult); err != nil {
return err
}
for _, r := range intResult {
fields[internal.SnakeCase(r.Metric)] = uint64(r.Value)
}
}
acc.AddFields("clickhouse_"+metric, fields, tags)

return nil
Expand Down Expand Up @@ -575,7 +588,7 @@ func (ch *ClickHouse) execQuery(address *url.URL, query string, i interface{}) e
if err != nil {
return err
}
defer resp.Body.Close()
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode >= 300 {
body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 200))
return &clickhouseError{
Expand Down Expand Up @@ -614,9 +627,9 @@ func (i *chUInt64) UnmarshalJSON(b []byte) error {
}

const (
systemEventsSQL = "SELECT event AS metric, CAST(value AS UInt64) AS value FROM system.events"
systemMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.metrics"
systemAsyncMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.asynchronous_metrics"
systemEventsSQL = "SELECT event AS metric, toUInt64(value) AS value FROM system.events"
systemMetricsSQL = "SELECT metric, toUInt64(value) AS value FROM system.metrics"
systemAsyncMetricsSQL = "SELECT metric, toFloat64(value) AS value FROM system.asynchronous_metrics"
systemPartsSQL = `
SELECT
database,
Expand All @@ -635,18 +648,18 @@ const (
systemZookeeperRootNodesSQL = "SELECT count() AS zk_root_nodes FROM system.zookeeper WHERE path='/'"

systemReplicationExistsSQL = "SELECT count() AS replication_queue_exists FROM system.tables WHERE database='system' AND name='replication_queue'"
systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) AS replication_too_many_tries_replicas FROM system.replication_queue"
systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) AS replication_too_many_tries_replicas FROM system.replication_queue SETTINGS empty_result_for_aggregation_by_empty_set=0"

systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts"
systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts SETTINGS empty_result_for_aggregation_by_empty_set=0"

systemDictionariesSQL = "SELECT origin, status, bytes_allocated FROM system.dictionaries"

systemMutationSQL = "SELECT countIf(latest_fail_time>toDateTime('0000-00-00 00:00:00') AND is_done=0) AS failed, countIf(latest_fail_time=toDateTime('0000-00-00 00:00:00') AND is_done=0) AS running, countIf(is_done=1) AS completed FROM system.mutations"
systemMutationSQL = "SELECT countIf(latest_fail_time>toDateTime('0000-00-00 00:00:00') AND is_done=0) AS failed, countIf(latest_fail_time=toDateTime('0000-00-00 00:00:00') AND is_done=0) AS running, countIf(is_done=1) AS completed FROM system.mutations SETTINGS empty_result_for_aggregation_by_empty_set=0"
systemDisksSQL = "SELECT name, path, toUInt64(100*free_space / total_space) AS free_space_percent, toUInt64( 100 * keep_free_space / total_space) AS keep_free_space_percent FROM system.disks"
systemProcessesSQL = "SELECT multiIf(positionCaseInsensitive(query,'select')=1,'select',positionCaseInsensitive(query,'insert')=1,'insert','other') AS query_type, quantile\n(0.5)(elapsed) AS p50, quantile(0.9)(elapsed) AS p90, max(elapsed) AS longest_running FROM system.processes GROUP BY query_type"
systemProcessesSQL = "SELECT multiIf(positionCaseInsensitive(query,'select')=1,'select',positionCaseInsensitive(query,'insert')=1,'insert','other') AS query_type, quantile\n(0.5)(elapsed) AS p50, quantile(0.9)(elapsed) AS p90, max(elapsed) AS longest_running FROM system.processes GROUP BY query_type SETTINGS empty_result_for_aggregation_by_empty_set=0"

systemTextLogExistsSQL = "SELECT count() AS text_log_exists FROM system.tables WHERE database='system' AND name='text_log'"
systemTextLogSQL = "SELECT count() AS messages_last_10_min, level FROM system.text_log WHERE level <= 'Notice' AND event_time >= now() - INTERVAL 600 SECOND GROUP BY level"
systemTextLogSQL = "SELECT count() AS messages_last_10_min, level FROM system.text_log WHERE level <= 'Notice' AND event_time >= now() - INTERVAL 600 SECOND GROUP BY level SETTINGS empty_result_for_aggregation_by_empty_set=0"
)

var commonMetrics = map[string]string{
Expand All @@ -655,4 +668,10 @@ var commonMetrics = map[string]string{
"asynchronous_metrics": systemAsyncMetricsSQL,
}

var commonMetricsIsFloat = map[string]bool{
"events": false,
"metrics": false,
"asynchronous_metrics": true,
}

var _ telegraf.ServiceInput = &ClickHouse{}
4 changes: 2 additions & 2 deletions plugins/inputs/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ func TestGather(t *testing.T) {
)
acc.AssertContainsFields(t, "clickhouse_asynchronous_metrics",
map[string]interface{}{
"test_system_asynchronous_metric": uint64(1000),
"test_system_asynchronous_metric2": uint64(2000),
"test_system_asynchronous_metric": float64(1000),
"test_system_asynchronous_metric2": float64(2000),
},
)
acc.AssertContainsFields(t, "clickhouse_zookeeper",
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/clickhouse/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ services:
# choose `:latest` after resolve https://github.com/ClickHouse/ClickHouse/issues/13057
image: docker.io/yandex/clickhouse-server:${CLICKHOUSE_VERSION:-latest}
volumes:
- ./init_schema.sql:/docker-entrypoint-initdb.d/init_schema.sql
- ./test_dictionary.xml:/etc/clickhouse-server/01-test_dictionary.xml
- ./zookeeper.xml:/etc/clickhouse-server/config.d/00-zookeeper.xml
- ./tls_settings.xml:/etc/clickhouse-server/config.d/01-tls_settings.xml
# please comment text_log.xml when CLICKHOUSE_VERSION = 19.16
- ./text_log.xml:/etc/clickhouse-server/config.d/02-text_log.xml
- ./part_log.xml:/etc/clickhouse-server/config.d/03-part_log.xml
- ./mysql_port.xml:/etc/clickhouse-server/config.d/04-mysql_port.xml
- ./dhparam.pem:/etc/clickhouse-server/dhparam.pem
- ../../../../testutil/pki/serverkey.pem:/etc/clickhouse-server/server.key
- ../../../../testutil/pki/servercert.pem:/etc/clickhouse-server/server.crt
ports:
- 3306:3306
- 8123:8123
- 8443:8443
- 9000:9000
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/clickhouse/dev/init_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DROP TABLE IF EXISTS default.test;
CREATE TABLE default.test(
Nom String,
Code Nullable(String) DEFAULT Null,
Cur Nullable(String) DEFAULT Null
) ENGINE=MergeTree() ORDER BY tuple();
3 changes: 3 additions & 0 deletions plugins/inputs/clickhouse/dev/mysql_port.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<yandex>
<mysql_port>3306</mysql_port>
</yandex>
43 changes: 21 additions & 22 deletions plugins/inputs/clickhouse/dev/test_dictionary.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<!--
CREATE DICTIONARY IF NOT EXISTS default.test_dict1(
nom String,
code String DEFAULT Null,
cur String DEFAULT Null
Nom String,
Code Nullable(String) DEFAULT Null,
Cur Nullable(String) DEFAULT Null
) PRIMARY KEY nom
SOURCE(
MYSQL(port 9000 host '127.0.0.1' user 'wrong' password 'wrong' db 'default' table 'test')
MYSQL(port 3306 host '127.0.0.1' user 'default' password '' db 'default' table 'test')
)
LAYOUT(COMPLEX_KEY_HASHED())
LIFETIME(MIN 300 MAX 600);
Expand All @@ -16,32 +16,32 @@ LIFETIME(MIN 300 MAX 600);

<structure>
<!-- Complex key configuration -->
<id>
<name>Nom</name>
</id>
<attribute>
<!-- Attribute parameters -->
<key>
<attribute>
<name>Nom</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>Code</name>
<type>String</type>
</attribute>
<attribute>
<name>Cur</name>
<type>String</type>
</attribute>
</key>
<!-- Attribute parameters -->
<attribute>
<name>Code</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>Cur</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>

<source>
<!-- Source configuration -->
<mysql>
<port>3306</port>
<user>wrong</user>
<password>wrong</password>
<user>default</user>
<password/>
<replica>
<host>127.0.0.1</host>
<priority>1</priority>
Expand All @@ -56,8 +56,7 @@ LIFETIME(MIN 300 MAX 600);
<complex_key_hashed />
</layout>

<lifetime>
<!-- Lifetime of dictionary in memory -->
</lifetime>
<!-- Lifetime of dictionary in memory -->
<lifetime>300</lifetime>
</dictionary>
</yandex>

0 comments on commit b193052

Please sign in to comment.