diff --git a/.chloggen/clickhouse-default-async-insert.yaml b/.chloggen/clickhouse-default-async-insert.yaml new file mode 100644 index 000000000000..40d0d2632f66 --- /dev/null +++ b/.chloggen/clickhouse-default-async-insert.yaml @@ -0,0 +1,33 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/clickhouse + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add `async_insert` config option to enable inserting asynchronously by default." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33614] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Adds `async_insert` config option to enable inserting asynchronously by default. + To preserve the previous behavior, set `async_insert` to `false` in your config. + When enabled, the exporter will insert asynchronously, which can improve performance for high-throughput deployments. + The `async_insert` option can be set to `true` or `false` to enable or disable async inserts, respectively. The default value is `true`. + Keep in mind this setting is added since the exporter now sets it to default. + Async insert and its related settings can still be defined in `endpoint` and `connection_params`, which take priority over the new config option. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/clickhouseexporter/README.md b/exporter/clickhouseexporter/README.md index 1e9d4ee5a2c3..f9983c586dcf 100644 --- a/exporter/clickhouseexporter/README.md +++ b/exporter/clickhouseexporter/README.md @@ -36,13 +36,13 @@ as [ClickHouse document says:](https://clickhouse.com/docs/en/introduction/perfo dashboard. Support time-series graph, table and logs. -2. Analyze logs via powerful clickhouse SQL. +2. Analyze logs via powerful ClickHouse SQL. ### Logs - Get log severity count time series. -```clickhouse +```sql SELECT toDateTime(toStartOfInterval(TimestampTime, INTERVAL 60 second)) as time, SeverityText, count() as count FROM otel_logs WHERE time >= NOW() - INTERVAL 1 HOUR @@ -52,7 +52,7 @@ ORDER BY time; - Find any log. -```clickhouse +```sql SELECT Timestamp as log_time, Body FROM otel_logs WHERE TimestampTime >= NOW() - INTERVAL 1 HOUR @@ -61,7 +61,7 @@ Limit 100; - Find log with specific service. -```clickhouse +```sql SELECT Timestamp as log_time, Body FROM otel_logs WHERE ServiceName = 'clickhouse-exporter' @@ -71,7 +71,7 @@ Limit 100; - Find log with specific attribute. -```clickhouse +```sql SELECT Timestamp as log_time, Body FROM otel_logs WHERE LogAttributes['container_name'] = '/example_flog_1' @@ -81,7 +81,7 @@ Limit 100; - Find log with body contain string token. -```clickhouse +```sql SELECT Timestamp as log_time, Body FROM otel_logs WHERE hasToken(Body, 'http') @@ -91,7 +91,7 @@ Limit 100; - Find log with body contain string. -```clickhouse +```sql SELECT Timestamp as log_time, Body FROM otel_logs WHERE Body like '%http%' @@ -101,7 +101,7 @@ Limit 100; - Find log with body regexp match string. -```clickhouse +```sql SELECT Timestamp as log_time, Body FROM otel_logs WHERE match(Body, 'http') @@ -111,7 +111,7 @@ Limit 100; - Find log with body json extract. -```clickhouse +```sql SELECT Timestamp as log_time, Body FROM otel_logs WHERE JSONExtractFloat(Body, 'bytes') > 1000 @@ -123,7 +123,7 @@ Limit 100; - Find spans with specific attribute. -```clickhouse +```sql SELECT Timestamp as log_time, TraceId, SpanId, @@ -147,7 +147,7 @@ Limit 100; - Find traces with traceID (using time primary index and TraceID skip index). -```clickhouse +```sql WITH '391dae938234560b16bb63f51501cb6f' as trace_id, (SELECT min(Start) FROM otel_traces_trace_id_ts WHERE TraceId = trace_id) as start, @@ -175,7 +175,7 @@ Limit 100; - Find spans is error. -```clickhouse +```sql SELECT Timestamp as log_time, TraceId, SpanId, @@ -199,7 +199,7 @@ Limit 100; - Find slow spans. -```clickhouse +```sql SELECT Timestamp as log_time, TraceId, SpanId, @@ -240,13 +240,13 @@ Prometheus(or someone else uses OpenMetrics protocol), you also need to know the between Prometheus(OpenMetrics) and OTLP Metrics. - Find a sum metrics with name -```clickhouse +```sql select TimeUnix,MetricName,Attributes,Value from otel_metrics_sum where MetricName='calls_total' limit 100 ``` - Find a sum metrics with name, attribute. -```clickhouse +```sql select TimeUnix,MetricName,Attributes,Value from otel_metrics_sum where MetricName='calls_total' and Attributes['service_name']='featureflagservice' limit 100 @@ -279,10 +279,11 @@ Connection options: - `username` (default = ): The authentication username. - `password` (default = ): The authentication password. -- `connection_params` (default = {}). Params is the extra connection parameters with map format. - `ttl` (default = 0): The data time-to-live example 30m, 48h. Also, 0 means no ttl. -- `database` (default = otel): The database name. +- `database` (default = default): The database name. Overrides the database defined in `endpoint` when this setting is not equal to `default`. +- `connection_params` (default = {}). Params is the extra connection parameters with map format. Query parameters provided in `endpoint` will be individually overwritten if present in this map. - `create_schema` (default = true): When set to true, will run DDL to create the database and tables. (See [schema management](#schema-management)) +- `async_insert` (default = true): Enables [async inserts](https://clickhouse.com/docs/en/optimize/asynchronous-inserts). Ignored if async inserts are configured in the `endpoint` or `connection_params`. Async inserts may still be overridden server-side. ClickHouse tables: @@ -351,6 +352,7 @@ exporters: clickhouse: endpoint: tcp://127.0.0.1:9000?dial_timeout=10s&compress=lz4 database: otel + async_insert: true ttl: 72h create_schema: true logs_table_name: otel_logs diff --git a/exporter/clickhouseexporter/config.go b/exporter/clickhouseexporter/config.go index 2ffb50550bf6..6c89831d5d90 100644 --- a/exporter/clickhouseexporter/config.go +++ b/exporter/clickhouseexporter/config.go @@ -46,6 +46,10 @@ type Config struct { ClusterName string `mapstructure:"cluster_name"` // CreateSchema if set to true will run the DDL for creating the database and tables. default is true. CreateSchema bool `mapstructure:"create_schema"` + // AsyncInsert if true will enable async inserts. Default is `true`. + // Ignored if async inserts are configured in the `endpoint` or `connection_params`. + // Async inserts may still be overridden server-side. + AsyncInsert bool `mapstructure:"async_insert"` } // TableEngine defines the ENGINE string value when creating the table. @@ -99,6 +103,11 @@ func (cfg *Config) buildDSN() (string, error) { queryParams.Set("secure", "true") } + // Use async_insert from config if not specified in DSN. + if !queryParams.Has("async_insert") { + queryParams.Set("async_insert", fmt.Sprintf("%t", cfg.AsyncInsert)) + } + // Use database from config if not specified in path, or if config is not default. if dsnURL.Path == "" || cfg.Database != defaultDatabase { dsnURL.Path = cfg.Database diff --git a/exporter/clickhouseexporter/config_test.go b/exporter/clickhouseexporter/config_test.go index 0d5c9cd8543f..a6ce32f52fcf 100644 --- a/exporter/clickhouseexporter/config_test.go +++ b/exporter/clickhouseexporter/config_test.go @@ -74,6 +74,7 @@ func TestLoadConfig(t *testing.T) { QueueSize: 100, StorageID: &storageID, }, + AsyncInsert: true, }, }, } @@ -108,6 +109,7 @@ func TestConfig_buildDSN(t *testing.T) { Password string Database string ConnectionParams map[string]string + AsyncInsert *bool } mergeConfigWithFields := func(cfg *Config, fields fields) { if fields.Endpoint != "" { @@ -125,6 +127,9 @@ func TestConfig_buildDSN(t *testing.T) { if fields.ConnectionParams != nil { cfg.ConnectionParams = fields.ConnectionParams } + if fields.AsyncInsert != nil { + cfg.AsyncInsert = *fields.AsyncInsert + } } type ChOptions struct { @@ -133,6 +138,8 @@ func TestConfig_buildDSN(t *testing.T) { Compress clickhouse.CompressionMethod } + configTrue := true + configFalse := false tests := []struct { name string fields fields @@ -148,7 +155,7 @@ func TestConfig_buildDSN(t *testing.T) { wantChOptions: ChOptions{ Secure: false, }, - want: "clickhouse://127.0.0.1:9000/default", + want: "clickhouse://127.0.0.1:9000/default?async_insert=true", }, { name: "Support tcp scheme", @@ -158,7 +165,7 @@ func TestConfig_buildDSN(t *testing.T) { wantChOptions: ChOptions{ Secure: false, }, - want: "tcp://127.0.0.1:9000/default", + want: "tcp://127.0.0.1:9000/default?async_insert=true", }, { name: "prefers database name from config over from DSN", @@ -171,7 +178,7 @@ func TestConfig_buildDSN(t *testing.T) { wantChOptions: ChOptions{ Secure: false, }, - want: "clickhouse://foo:bar@127.0.0.1:9000/otel", + want: "clickhouse://foo:bar@127.0.0.1:9000/otel?async_insert=true", }, { name: "use database name from DSN if not set in config", @@ -183,7 +190,7 @@ func TestConfig_buildDSN(t *testing.T) { wantChOptions: ChOptions{ Secure: false, }, - want: "clickhouse://foo:bar@127.0.0.1:9000/otel", + want: "clickhouse://foo:bar@127.0.0.1:9000/otel?async_insert=true", }, { name: "invalid config", @@ -203,7 +210,7 @@ func TestConfig_buildDSN(t *testing.T) { wantChOptions: ChOptions{ Secure: true, }, - want: "https://127.0.0.1:9000/default?secure=true", + want: "https://127.0.0.1:9000/default?async_insert=true&secure=true", }, { name: "Preserve query parameters", @@ -213,7 +220,7 @@ func TestConfig_buildDSN(t *testing.T) { wantChOptions: ChOptions{ Secure: true, }, - want: "clickhouse://127.0.0.1:9000/default?foo=bar&secure=true", + want: "clickhouse://127.0.0.1:9000/default?async_insert=true&foo=bar&secure=true", }, { name: "Parse clickhouse settings", @@ -225,7 +232,7 @@ func TestConfig_buildDSN(t *testing.T) { DialTimeout: 30 * time.Second, Compress: clickhouse.CompressionLZ4, }, - want: "https://127.0.0.1:9000/default?compress=lz4&dial_timeout=30s&secure=true", + want: "https://127.0.0.1:9000/default?async_insert=true&compress=lz4&dial_timeout=30s&secure=true", }, { name: "Should respect connection parameters", @@ -236,7 +243,7 @@ func TestConfig_buildDSN(t *testing.T) { wantChOptions: ChOptions{ Secure: true, }, - want: "clickhouse://127.0.0.1:9000/default?foo=bar&secure=true", + want: "clickhouse://127.0.0.1:9000/default?async_insert=true&foo=bar&secure=true", }, { name: "support replace database in DSN with config to override database", @@ -244,7 +251,86 @@ func TestConfig_buildDSN(t *testing.T) { Endpoint: "tcp://127.0.0.1:9000/otel", Database: "override", }, - want: "tcp://127.0.0.1:9000/override", + want: "tcp://127.0.0.1:9000/override?async_insert=true", + }, + { + name: "when config option is missing, preserve async_insert false in DSN", + fields: fields{ + Endpoint: "tcp://127.0.0.1:9000?async_insert=false", + }, + want: "tcp://127.0.0.1:9000/default?async_insert=false", + }, + { + name: "when config option is missing, preserve async_insert true in DSN", + fields: fields{ + Endpoint: "tcp://127.0.0.1:9000?async_insert=true", + }, + want: "tcp://127.0.0.1:9000/default?async_insert=true", + }, + { + name: "ignore config option when async_insert is present in connection params as false", + fields: fields{ + Endpoint: "tcp://127.0.0.1:9000?async_insert=false", + ConnectionParams: map[string]string{"async_insert": "false"}, + AsyncInsert: &configTrue, + }, + + want: "tcp://127.0.0.1:9000/default?async_insert=false", + }, + { + name: "ignore config option when async_insert is present in connection params as true", + fields: fields{ + Endpoint: "tcp://127.0.0.1:9000?async_insert=false", + ConnectionParams: map[string]string{"async_insert": "true"}, + AsyncInsert: &configFalse, + }, + + want: "tcp://127.0.0.1:9000/default?async_insert=true", + }, + { + name: "ignore config option when async_insert is present in DSN as false", + fields: fields{ + Endpoint: "tcp://127.0.0.1:9000?async_insert=false", + AsyncInsert: &configTrue, + }, + + want: "tcp://127.0.0.1:9000/default?async_insert=false", + }, + { + name: "use async_insert true config option when it is not present in DSN", + fields: fields{ + Endpoint: "tcp://127.0.0.1:9000", + AsyncInsert: &configTrue, + }, + + want: "tcp://127.0.0.1:9000/default?async_insert=true", + }, + { + name: "use async_insert false config option when it is not present in DSN", + fields: fields{ + Endpoint: "tcp://127.0.0.1:9000", + AsyncInsert: &configFalse, + }, + + want: "tcp://127.0.0.1:9000/default?async_insert=false", + }, + { + name: "set async_insert to true when not present in config or DSN", + fields: fields{ + Endpoint: "tcp://127.0.0.1:9000", + }, + + want: "tcp://127.0.0.1:9000/default?async_insert=true", + }, + { + name: "connection_params takes priority over endpoint and async_insert option.", + fields: fields{ + Endpoint: "tcp://127.0.0.1:9000?async_insert=false", + ConnectionParams: map[string]string{"async_insert": "true"}, + AsyncInsert: &configFalse, + }, + + want: "tcp://127.0.0.1:9000/default?async_insert=true", }, } for _, tt := range tests { @@ -303,7 +389,7 @@ func TestShouldCreateSchema(t *testing.T) { } for _, tt := range tests { - t.Run(fmt.Sprintf("shouldCreateSchema case %s", tt.name), func(t *testing.T) { + t.Run(fmt.Sprintf("ShouldCreateSchema case %s", tt.name), func(t *testing.T) { assert.NoError(t, component.ValidateConfig(tt)) assert.Equal(t, tt.expected, tt.input.shouldCreateSchema()) }) @@ -374,7 +460,7 @@ func TestClusterString(t *testing.T) { } for i, tt := range tests { - t.Run(fmt.Sprintf("clusterString case %d", i), func(t *testing.T) { + t.Run(fmt.Sprintf("ClusterString case %d", i), func(t *testing.T) { cfg := createDefaultConfig() cfg.(*Config).Endpoint = defaultEndpoint cfg.(*Config).ClusterName = tt.input diff --git a/exporter/clickhouseexporter/factory.go b/exporter/clickhouseexporter/factory.go index c32c9944267f..3ffda431177e 100644 --- a/exporter/clickhouseexporter/factory.go +++ b/exporter/clickhouseexporter/factory.go @@ -44,6 +44,7 @@ func createDefaultConfig() component.Config { MetricsTableName: "otel_metrics", TTL: 0, CreateSchema: true, + AsyncInsert: true, } }