Skip to content

Commit

Permalink
fix influxdata#9398 and avoid all corner cases when aggregation funct…
Browse files Browse the repository at this point in the history
…ions try to aggregate from empty rows set, improve float type handling from system.asynchronous_metrics

Signed-off-by: Eugene Klimov <[email protected]>
  • Loading branch information
Slach committed Jun 18, 2021
1 parent e1d1fde commit 33c8235
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 40 deletions.
51 changes: 35 additions & 16 deletions plugins/inputs/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,34 @@ func (ch *ClickHouse) clusterIncludeExcludeFilter() string {
}

func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, metric string) error {
var result []struct {
var intResult []struct {
Metric string `json:"metric"`
Value chUInt64 `json:"value"`
}
if err := ch.execQuery(conn.url, commonMetrics[metric], &result); err != nil {
return err

var floatResult []struct {
Metric string `json:"metric"`
Value float64 `json:"value"`
}

tags := ch.makeDefaultTags(conn)

fields := make(map[string]interface{})
for _, r := range result {
fields[internal.SnakeCase(r.Metric)] = uint64(r.Value)
}

if commonMetricsIsFloat[metric] {
if err := ch.execQuery(conn.url, commonMetrics[metric], &floatResult); err != nil {
return err
}
for _, r := range floatResult {
fields[internal.SnakeCase(r.Metric)] = r.Value
}
} else {
if err := ch.execQuery(conn.url, commonMetrics[metric], &intResult); err != nil {
return err
}
for _, r := range intResult {
fields[internal.SnakeCase(r.Metric)] = uint64(r.Value)
}
}
acc.AddFields("clickhouse_"+metric, fields, tags)

return nil
Expand Down Expand Up @@ -575,7 +588,7 @@ func (ch *ClickHouse) execQuery(address *url.URL, query string, i interface{}) e
if err != nil {
return err
}
defer resp.Body.Close()
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode >= 300 {
body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 200))
return &clickhouseError{
Expand Down Expand Up @@ -614,9 +627,9 @@ func (i *chUInt64) UnmarshalJSON(b []byte) error {
}

const (
systemEventsSQL = "SELECT event AS metric, CAST(value AS UInt64) AS value FROM system.events"
systemMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.metrics"
systemAsyncMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.asynchronous_metrics"
systemEventsSQL = "SELECT event AS metric, toUInt64(value) AS value FROM system.events"
systemMetricsSQL = "SELECT metric, toUInt64(value) AS value FROM system.metrics"
systemAsyncMetricsSQL = "SELECT metric, toFloat64(value) AS value FROM system.asynchronous_metrics"
systemPartsSQL = `
SELECT
database,
Expand All @@ -635,18 +648,18 @@ const (
systemZookeeperRootNodesSQL = "SELECT count() AS zk_root_nodes FROM system.zookeeper WHERE path='/'"

systemReplicationExistsSQL = "SELECT count() AS replication_queue_exists FROM system.tables WHERE database='system' AND name='replication_queue'"
systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) AS replication_too_many_tries_replicas FROM system.replication_queue"
systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) AS replication_too_many_tries_replicas FROM system.replication_queue SETTINGS empty_result_for_aggregation_by_empty_set=0"

systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts"
systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts SETTINGS empty_result_for_aggregation_by_empty_set=0"

systemDictionariesSQL = "SELECT origin, status, bytes_allocated FROM system.dictionaries"

systemMutationSQL = "SELECT countIf(latest_fail_time>toDateTime('0000-00-00 00:00:00') AND is_done=0) AS failed, countIf(latest_fail_time=toDateTime('0000-00-00 00:00:00') AND is_done=0) AS running, countIf(is_done=1) AS completed FROM system.mutations"
systemMutationSQL = "SELECT countIf(latest_fail_time>toDateTime('0000-00-00 00:00:00') AND is_done=0) AS failed, countIf(latest_fail_time=toDateTime('0000-00-00 00:00:00') AND is_done=0) AS running, countIf(is_done=1) AS completed FROM system.mutations SETTINGS empty_result_for_aggregation_by_empty_set=0"
systemDisksSQL = "SELECT name, path, toUInt64(100*free_space / total_space) AS free_space_percent, toUInt64( 100 * keep_free_space / total_space) AS keep_free_space_percent FROM system.disks"
systemProcessesSQL = "SELECT multiIf(positionCaseInsensitive(query,'select')=1,'select',positionCaseInsensitive(query,'insert')=1,'insert','other') AS query_type, quantile\n(0.5)(elapsed) AS p50, quantile(0.9)(elapsed) AS p90, max(elapsed) AS longest_running FROM system.processes GROUP BY query_type"
systemProcessesSQL = "SELECT multiIf(positionCaseInsensitive(query,'select')=1,'select',positionCaseInsensitive(query,'insert')=1,'insert','other') AS query_type, quantile\n(0.5)(elapsed) AS p50, quantile(0.9)(elapsed) AS p90, max(elapsed) AS longest_running FROM system.processes GROUP BY query_type SETTINGS empty_result_for_aggregation_by_empty_set=0"

systemTextLogExistsSQL = "SELECT count() AS text_log_exists FROM system.tables WHERE database='system' AND name='text_log'"
systemTextLogSQL = "SELECT count() AS messages_last_10_min, level FROM system.text_log WHERE level <= 'Notice' AND event_time >= now() - INTERVAL 600 SECOND GROUP BY level"
systemTextLogSQL = "SELECT count() AS messages_last_10_min, level FROM system.text_log WHERE level <= 'Notice' AND event_time >= now() - INTERVAL 600 SECOND GROUP BY level SETTINGS empty_result_for_aggregation_by_empty_set=0"
)

var commonMetrics = map[string]string{
Expand All @@ -655,4 +668,10 @@ var commonMetrics = map[string]string{
"asynchronous_metrics": systemAsyncMetricsSQL,
}

var commonMetricsIsFloat = map[string]bool{
"events": false,
"metrics": false,
"asynchronous_metrics": true,
}

var _ telegraf.ServiceInput = &ClickHouse{}
4 changes: 2 additions & 2 deletions plugins/inputs/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ func TestGather(t *testing.T) {
)
acc.AssertContainsFields(t, "clickhouse_asynchronous_metrics",
map[string]interface{}{
"test_system_asynchronous_metric": uint64(1000),
"test_system_asynchronous_metric2": uint64(2000),
"test_system_asynchronous_metric": float64(1000),
"test_system_asynchronous_metric2": float64(2000),
},
)
acc.AssertContainsFields(t, "clickhouse_zookeeper",
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/clickhouse/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ services:
# choose `:latest` after resolve https://github.com/ClickHouse/ClickHouse/issues/13057
image: docker.io/yandex/clickhouse-server:${CLICKHOUSE_VERSION:-latest}
volumes:
- ./init_schema.sql:/docker-entrypoint-initdb.d/init_schema.sql
- ./test_dictionary.xml:/etc/clickhouse-server/01-test_dictionary.xml
- ./zookeeper.xml:/etc/clickhouse-server/config.d/00-zookeeper.xml
- ./tls_settings.xml:/etc/clickhouse-server/config.d/01-tls_settings.xml
# please comment text_log.xml when CLICKHOUSE_VERSION = 19.16
- ./text_log.xml:/etc/clickhouse-server/config.d/02-text_log.xml
- ./part_log.xml:/etc/clickhouse-server/config.d/03-part_log.xml
- ./mysql_port.xml:/etc/clickhouse-server/config.d/04-mysql_port.xml
- ./dhparam.pem:/etc/clickhouse-server/dhparam.pem
- ../../../../testutil/pki/serverkey.pem:/etc/clickhouse-server/server.key
- ../../../../testutil/pki/servercert.pem:/etc/clickhouse-server/server.crt
ports:
- 3306:3306
- 8123:8123
- 8443:8443
- 9000:9000
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/clickhouse/dev/init_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DROP TABLE IF EXISTS default.test;
CREATE TABLE default.test(
Nom String,
Code Nullable(String) DEFAULT Null,
Cur Nullable(String) DEFAULT Null
) ENGINE=MergeTree() ORDER BY tuple();
3 changes: 3 additions & 0 deletions plugins/inputs/clickhouse/dev/mysql_port.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<yandex>
<mysql_port>3306</mysql_port>
</yandex>
43 changes: 21 additions & 22 deletions plugins/inputs/clickhouse/dev/test_dictionary.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<!--
CREATE DICTIONARY IF NOT EXISTS default.test_dict1(
nom String,
code String DEFAULT Null,
cur String DEFAULT Null
Nom String,
Code Nullable(String) DEFAULT Null,
Cur Nullable(String) DEFAULT Null
) PRIMARY KEY nom
SOURCE(
MYSQL(port 9000 host '127.0.0.1' user 'wrong' password 'wrong' db 'default' table 'test')
MYSQL(port 3306 host '127.0.0.1' user 'default' password '' db 'default' table 'test')
)
LAYOUT(COMPLEX_KEY_HASHED())
LIFETIME(MIN 300 MAX 600);
Expand All @@ -16,32 +16,32 @@ LIFETIME(MIN 300 MAX 600);

<structure>
<!-- Complex key configuration -->
<id>
<name>Nom</name>
</id>
<attribute>
<!-- Attribute parameters -->
<key>
<attribute>
<name>Nom</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>Code</name>
<type>String</type>
</attribute>
<attribute>
<name>Cur</name>
<type>String</type>
</attribute>
</key>
<!-- Attribute parameters -->
<attribute>
<name>Code</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>Cur</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>

<source>
<!-- Source configuration -->
<mysql>
<port>3306</port>
<user>wrong</user>
<password>wrong</password>
<user>default</user>
<password/>
<replica>
<host>127.0.0.1</host>
<priority>1</priority>
Expand All @@ -56,8 +56,7 @@ LIFETIME(MIN 300 MAX 600);
<complex_key_hashed />
</layout>

<lifetime>
<!-- Lifetime of dictionary in memory -->
</lifetime>
<!-- Lifetime of dictionary in memory -->
<lifetime>300</lifetime>
</dictionary>
</yandex>

0 comments on commit 33c8235

Please sign in to comment.