From 6ceef084dddbf4be3b2a659f0b9a5be418929003 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Mon, 3 Jun 2024 14:27:09 +0200 Subject: [PATCH 1/4] [exporter/elasticsearch] route based on data stream attributes --- ...r-attribute-based-data-stream-routing.yaml | 27 +++ exporter/elasticsearchexporter/README.md | 58 +++++-- exporter/elasticsearchexporter/attribute.go | 8 +- exporter/elasticsearchexporter/config.go | 29 +++- exporter/elasticsearchexporter/config_test.go | 104 ++++++++++-- exporter/elasticsearchexporter/exporter.go | 112 ++++++++----- .../elasticsearchexporter/exporter_test.go | 155 +++++++++++++++++- exporter/elasticsearchexporter/factory.go | 22 ++- exporter/elasticsearchexporter/model.go | 136 +++------------ exporter/elasticsearchexporter/model_test.go | 14 +- .../testdata/config.yaml | 22 +++ exporter/elasticsearchexporter/utils_test.go | 11 ++ 12 files changed, 499 insertions(+), 199 deletions(-) create mode 100644 .chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml diff --git a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml new file mode 100644 index 000000000000..842b7b62161b --- /dev/null +++ b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/elasticsearch + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add data stream routing + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 8b52bf4da343..bd69d937417b 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -14,7 +14,7 @@ [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib -This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](https://www.elastic.co/elasticsearch). +This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch). ## Configuration options @@ -83,32 +83,52 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export ### Elasticsearch document routing Telemetry data will be written to signal specific data streams by default: -logs to `logs-generic-default`, and traces to `traces-generic-default`. +logs to `logs-generic-default`, metrics to `metrics-generic-default`, and traces to `traces-generic-default`. This can be customised through the following settings: -- `index` (DEPRECATED, please use `logs_index` for logs, `traces_index` for traces): The [index] or [data stream] name to publish events to. +- `index` (DEPRECATED, please use `logs_index` for logs, `metrics_index` for metrics, `traces_index` for traces): The [index] or [data stream] name to publish events to. The default value is `logs-generic-default`. + - `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default` -- `logs_dynamic_index` (optional): - takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute) + +- `logs_dynamic_index` (optional): uses resource or log record attributes to dynamically construct index name. See `mode` for details. - `enabled`(default=false): Enable/Disable dynamic index for log records -- `metrics_index`: The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. + - `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed. + - `data_stream` - uses resource, scope or log record attributes `data_stream.dataset` and `data_stream.namespace` + to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. + Log record attributes take precedence over scope attributes, which take precedence over resource attributes. + - `prefix_suffix` - uses resource or log record attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` + to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. (priority: resource attribute > log record attribute) + +- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. ⚠️ Note that metrics support is currently in development. -- `metrics_dynamic_index` (optional): - takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - resulting dynamically prefixed / suffixed indexing based on `metrics_index`. + +- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. See `mode` for details. ⚠️ Note that metrics support is currently in development. - - `enabled`(default=false): Enable/Disable dynamic index for metrics + - `enabled`(default=true): Enable/disable dynamic index for metrics + - `mode` (default=`data_stream`): defines how dynamic index name is constructed. + - `data_stream` - uses resource, scope or data point attributes `data_stream.dataset` and `data_stream.namespace` + to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. + Data point attributes take precedence over scope attributes, which take precedence over resource attributes. + - `prefix_suffix` - uses resource, scope or data point attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` + to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. + Data point attributes take precedence over scope attributes, which take precedence over resource attributes. + - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. -- `traces_dynamic_index` (optional): - takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute) + +- `traces_dynamic_index` (optional): uses resource or span attributes to dynamically construct index name. See `mode` for details. - `enabled`(default=false): Enable/Disable dynamic index for trace spans -- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format. - - `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date, - e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`. - The last string appended belongs to the date when the data is being generated. + - `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed. + - `data_stream` - uses resource attributes `data_stream.dataset` and `data_stream.namespace` + to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. + Span attributes take precedence over scope attributes, which take precedence over resource attributes. + - `prefix_suffix` - uses resource or span attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` + to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. (priority: resource attribute > span attribute) + +- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. + - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, + e.g: If `logs_index` or `logs_dynamic_index` is equal to `logs-generic-default`, your index will become `logs-generic-default-YYYY.MM.DD`. + The last string appended belongs to the date when the data is being generated. - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. @@ -189,6 +209,8 @@ The only metric types supported are: Other metric types (Histogram, Exponential Histogram, Summary) are ignored. +Dynamic indexing in `data_stream` mode is enabled by default for metrics. See `metrics_dynamic_index` configuration property for details. + [confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings [configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings [configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 987b13f807bb..85feec25f8bb 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -7,8 +7,12 @@ import "go.opentelemetry.io/collector/pdata/pcommon" // dynamic index attribute key constants const ( - indexPrefix = "elasticsearch.index.prefix" - indexSuffix = "elasticsearch.index.suffix" + indexPrefix = "elasticsearch.index.prefix" + indexSuffix = "elasticsearch.index.suffix" + dataStreamDataset = "data_stream.dataset" + dataStreamNamespace = "data_stream.namespace" + defaultDataStreamDataset = "generic" + defaultDataStreamNamespace = "default" ) // resource is higher priotized than record attribute diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index daf83e4da3ba..4feff0325d56 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -4,6 +4,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( + "encoding" "encoding/base64" "errors" "fmt" @@ -80,7 +81,33 @@ type LogstashFormatSettings struct { } type DynamicIndexSetting struct { - Enabled bool `mapstructure:"enabled"` + Enabled bool `mapstructure:"enabled"` + Mode DynamicIndexMode `mapstructure:"mode"` +} + +type DynamicIndexMode string + +const DynamicIndexModeDataStream DynamicIndexMode = "data_stream" +const DynamicIndexModePrefixSuffix DynamicIndexMode = "prefix_suffix" + +var _ encoding.TextUnmarshaler = (*DynamicIndexMode)(nil) + +func (m *DynamicIndexMode) UnmarshalText(text []byte) error { + if m == nil { + return errors.New("cannot unmarshal to a nil *DynamicIndexMode") + } + + str := string(text) + switch str { + case string(DynamicIndexModeDataStream): + *m = DynamicIndexModeDataStream + case string(DynamicIndexModePrefixSuffix): + *m = DynamicIndexModePrefixSuffix + default: + return fmt.Errorf("unknown dynamic index mode %s", str) + } + + return nil } // AuthenticationSettings defines user authentication related settings. diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 8ca137118a0a..c0739e29468e 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -57,12 +57,24 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"https://elastic.example.com:9200"}, - Index: "", - LogsIndex: "logs-generic-default", + Endpoints: []string{"https://elastic.example.com:9200"}, + Index: "", + LogsIndex: "logs-generic-default", + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, MetricsIndex: "metrics-generic-default", - TracesIndex: "trace_index", - Pipeline: "mypipeline", + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + }, + TracesIndex: "trace_index", + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, @@ -110,12 +122,24 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"http://localhost:9200"}, - Index: "", - LogsIndex: "my_log_index", + Endpoints: []string{"http://localhost:9200"}, + Index: "", + LogsIndex: "my_log_index", + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, MetricsIndex: "metrics-generic-default", - TracesIndex: "traces-generic-default", - Pipeline: "mypipeline", + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + }, + TracesIndex: "traces-generic-default", + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, @@ -163,12 +187,24 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"http://localhost:9200"}, - Index: "", - LogsIndex: "logs-generic-default", + Endpoints: []string{"http://localhost:9200"}, + Index: "", + LogsIndex: "logs-generic-default", + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, MetricsIndex: "my_metric_index", - TracesIndex: "traces-generic-default", - Pipeline: "mypipeline", + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + }, + TracesIndex: "traces-generic-default", + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, @@ -239,6 +275,44 @@ func TestConfig(t *testing.T) { cfg.Endpoint = "https://elastic.example.com:9200" }), }, + { + id: component.NewIDWithName(metadata.Type, "data-stream-mode"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + cfg.LogsDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + } + cfg.MetricsDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + } + cfg.TracesDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + } + }), + }, + { + id: component.NewIDWithName(metadata.Type, "prefix-suffix-mode"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + cfg.LogsDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModePrefixSuffix, + } + cfg.MetricsDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModePrefixSuffix, + } + cfg.TracesDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModePrefixSuffix, + } + }), + }, } for _, tt := range tests { diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 01103adfa12f..ce696c274f7d 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -22,11 +22,12 @@ type elasticsearchExporter struct { component.TelemetrySettings userAgent string - config *Config - index string - logstashFormat LogstashFormatSettings - dynamicIndex bool - model mappingModel + config *Config + index string + logstashFormat LogstashFormatSettings + dynamicIndex bool + dynamicIndexMode DynamicIndexMode + model mappingModel bulkIndexer *esBulkIndexerCurrent } @@ -36,6 +37,7 @@ func newExporter( set exporter.Settings, index string, dynamicIndex bool, + dynamicIndexMode DynamicIndexMode, ) (*elasticsearchExporter, error) { if err := cfg.Validate(); err != nil { return nil, err @@ -59,11 +61,12 @@ func newExporter( TelemetrySettings: set.TelemetrySettings, userAgent: userAgent, - config: cfg, - index: index, - dynamicIndex: dynamicIndex, - model: model, - logstashFormat: cfg.LogstashFormat, + config: cfg, + index: index, + dynamicIndex: dynamicIndex, + dynamicIndexMode: dynamicIndexMode, + model: model, + logstashFormat: cfg.LogstashFormat, }, nil } @@ -117,10 +120,17 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromAttributes(indexPrefix, resource, scope, record) - suffix := getFromAttributes(indexSuffix, resource, scope, record) + if e.dynamicIndexMode == DynamicIndexModeDataStream { + dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes()) - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + fIndex = fmt.Sprintf("logs-%s-%s", dataSet, namespace) + } else { + prefix := getFromAttributes(indexPrefix, resource, scope, record) + suffix := getFromAttributes(indexSuffix, resource, scope, record) + + fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } } if e.logstashFormat.Enabled { @@ -150,50 +160,69 @@ func (e *elasticsearchExporter) pushMetricsData( resource := resourceMetric.Resource() scopeMetrics := resourceMetric.ScopeMetrics() for j := 0; j < scopeMetrics.Len(); j++ { - scope := scopeMetrics.At(j).Scope() - metricSlice := scopeMetrics.At(j).Metrics() - - if err := e.pushMetricSlice(ctx, resource, metricSlice, scope); err != nil { - if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr + scopeMetrics := scopeMetrics.At(j) + for k := 0; k < scopeMetrics.Metrics().Len(); k++ { + metric := scopeMetrics.Metrics().At(k) + + // We only support Sum and Gauge metrics at the moment. + var dataPoints pmetric.NumberDataPointSlice + switch metric.Type() { + case pmetric.MetricTypeSum: + dataPoints = metric.Sum().DataPoints() + case pmetric.MetricTypeGauge: + dataPoints = metric.Gauge().DataPoints() } - errs = append(errs, err) + for l := 0; l < dataPoints.Len(); l++ { + dataPoint := dataPoints.At(l) + if err := e.pushMetricDataPoint(ctx, resource, scopeMetrics.Scope(), metric, dataPoint); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } + } } - } } return errors.Join(errs...) } -func (e *elasticsearchExporter) pushMetricSlice( +func (e *elasticsearchExporter) pushMetricDataPoint( ctx context.Context, resource pcommon.Resource, - slice pmetric.MetricSlice, scope pcommon.InstrumentationScope, + metric pmetric.Metric, + dataPoint pmetric.NumberDataPoint, ) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromAttributesNew(indexPrefix, "", resource.Attributes()) - suffix := getFromAttributesNew(indexSuffix, "", resource.Attributes()) - - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) - } - - documents, err := e.model.encodeMetrics(resource, slice, scope) - if err != nil { - return fmt.Errorf("failed to encode a metric event: %w", err) + if e.dynamicIndexMode == DynamicIndexModeDataStream { + dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + fIndex = fmt.Sprintf("metrics-%s-%s", dataSet, namespace) + } else { + prefix := getFromAttributesNew(indexPrefix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + suffix := getFromAttributesNew(indexSuffix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } } - for _, document := range documents { - err := pushDocuments(ctx, fIndex, document, e.bulkIndexer) + if e.logstashFormat.Enabled { + formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) if err != nil { return err } + fIndex = formattedIndex } - return nil + document, err := e.model.encodeMetricDataPoint(resource, scope, metric, dataPoint) + if err != nil { + return fmt.Errorf("failed to encode a metric data point: %w", err) + } + + return pushDocuments(ctx, fIndex, document, e.bulkIndexer) } func (e *elasticsearchExporter) pushTraceData( @@ -228,10 +257,17 @@ func (e *elasticsearchExporter) pushTraceData( func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromAttributes(indexPrefix, resource, scope, span) - suffix := getFromAttributes(indexSuffix, resource, scope, span) + if e.dynamicIndexMode == DynamicIndexModeDataStream { + dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes()) - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + fIndex = fmt.Sprintf("traces-%s-%s", dataSet, namespace) + } else { + prefix := getFromAttributes(indexPrefix, resource, scope, span) + suffix := getFromAttributes(indexSuffix, resource, scope, span) + + fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } } if e.logstashFormat.Enabled { diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index d2fda9b9aeb9..62006c5c3667 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -168,7 +168,7 @@ func TestExporterLogs(t *testing.T) { <-done }) - t.Run("publish with dynamic index", func(t *testing.T) { + t.Run("publish with dynamic index prefix_suffix mode", func(t *testing.T) { rec := newBulkRecorder() var ( @@ -213,6 +213,43 @@ func TestExporterLogs(t *testing.T) { rec.WaitItems(1) }) + t.Run("publish with dynamic index data_stream mode", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + assert.Equal(t, "logs-record.dataset-resource.namespace", create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsDynamicIndex.Enabled = true + cfg.LogsDynamicIndex.Mode = DynamicIndexModeDataStream + }) + logs := newLogsWithAttributeAndResourceMap( + map[string]string{ + dataStreamDataset: "record.dataset", + }, + map[string]string{ + dataStreamDataset: "resource.dataset", + dataStreamNamespace: "resource.namespace", + }, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) + }) + t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { @@ -469,6 +506,81 @@ func TestExporterMetrics(t *testing.T) { rec.WaitItems(2) }) + t.Run("publish with dynamic index prefix_suffix mode", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + expected := "resource.prefix-metrics.index-data.point.suffix" + assert.Equal(t, expected, create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.MetricsIndex = "metrics.index" + cfg.MetricsDynamicIndex.Mode = DynamicIndexModePrefixSuffix + }) + metrics := newMetricsWithAttributeAndResourceMap( + map[string]string{ + indexSuffix: "-data.point.suffix", + }, + map[string]string{ + indexPrefix: "resource.prefix-", + indexSuffix: "-resource.suffix", + }, + ) + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric") + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + }) + + t.Run("publish with dynamic index data_stream mode", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + expected := "metrics-resource.dataset-data.point.namespace" + assert.Equal(t, expected, create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.MetricsIndex = "metrics.index" + }) + metrics := newMetricsWithAttributeAndResourceMap( + map[string]string{ + dataStreamNamespace: "data.point.namespace", + }, + map[string]string{ + dataStreamDataset: "resource.dataset", + dataStreamNamespace: "resource.namespace", + }, + ) + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric") + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + }) + } func TestExporterTraces(t *testing.T) { @@ -486,7 +598,7 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(2) }) - t.Run("publish with dynamic index", func(t *testing.T) { + t.Run("publish with dynamic index prefix_suffix mode", func(t *testing.T) { rec := newBulkRecorder() var ( @@ -531,6 +643,45 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(1) }) + t.Run("publish with dynamic index data_stream mode", func(t *testing.T) { + + rec := newBulkRecorder() + + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + + expected := "traces-span.dataset-default" + assert.Equal(t, expected, create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.TracesDynamicIndex.Enabled = true + cfg.TracesDynamicIndex.Mode = DynamicIndexModeDataStream + }) + + mustSendTraces(t, exporter, newTracesWithAttributeAndResourceMap( + map[string]string{ + dataStreamDataset: "span.dataset", + }, + map[string]string{ + dataStreamDataset: "resource.dataset", + }, + )) + + rec.WaitItems(1) + }) + t.Run("publish with logstash format index", func(t *testing.T) { var defaultCfg Config diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 8829215c8ac6..e0bcaf6884b3 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -49,8 +49,20 @@ func createDefaultConfig() component.Config { ClientConfig: httpClientConfig, Index: "", LogsIndex: defaultLogsIndex, - MetricsIndex: defaultMetricsIndex, - TracesIndex: defaultTracesIndex, + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, + MetricsIndex: defaultMetricsIndex, + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + }, + TracesIndex: defaultTracesIndex, + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, Retry: RetrySettings{ Enabled: true, MaxRequests: 3, @@ -93,7 +105,7 @@ func createLogsExporter( index = cf.Index } - exporter, err := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled) + exporter, err := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled, cf.LogsDynamicIndex.Mode) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } @@ -116,7 +128,7 @@ func createMetricsExporter( ) (exporter.Metrics, error) { cf := cfg.(*Config) - exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) + exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled, cf.MetricsDynamicIndex.Mode) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } @@ -137,7 +149,7 @@ func createTracesExporter(ctx context.Context, cf := cfg.(*Config) - exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) + exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled, cf.TracesDynamicIndex.Mode) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 626b8f566e1f..7652d2453c2c 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -5,12 +5,8 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry import ( "bytes" - "encoding/binary" "encoding/json" "fmt" - "hash" - "hash/fnv" - "math" "time" "go.opentelemetry.io/collector/pdata/pcommon" @@ -64,7 +60,7 @@ var resourceAttrsToPreserve = map[string]bool{ type mappingModel interface { encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error) - encodeMetrics(pcommon.Resource, pmetric.MetricSlice, pcommon.InstrumentationScope) ([][]byte, error) + encodeMetricDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric, dataPoint pmetric.NumberDataPoint) ([]byte, error) encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error) } @@ -169,123 +165,31 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo return document } -func (m *encodeModel) encodeMetrics(resource pcommon.Resource, metrics pmetric.MetricSlice, _ pcommon.InstrumentationScope) ([][]byte, error) { - var baseDoc objmodel.Document - - baseDoc.AddAttributes("", resource.Attributes()) - - // Put all metrics that have the same attributes and timestamp in one document. - docs := map[uint32]*objmodel.Document{} - for i := 0; i < metrics.Len(); i++ { - metric := metrics.At(i) - - var dps pmetric.NumberDataPointSlice - - // Only Gauge and Sum metric types are supported at the moment. - switch metric.Type() { - case pmetric.MetricTypeGauge: - dps = metric.Gauge().DataPoints() - case pmetric.MetricTypeSum: - dps = metric.Sum().DataPoints() - } - - for j := 0; j < dps.Len(); j++ { - dp := dps.At(j) - - hash := metricHash(dp.Timestamp(), dp.Attributes()) - doc, docExists := docs[hash] - if !docExists { - doc = baseDoc.Clone() - doc.AddTimestamp("@timestamp", dp.Timestamp()) - doc.AddAttributes("", dp.Attributes()) - - docs[hash] = doc - } - - switch dp.ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - doc.AddAttribute(metric.Name(), pcommon.NewValueDouble(dp.DoubleValue())) - case pmetric.NumberDataPointValueTypeInt: - doc.AddAttribute(metric.Name(), pcommon.NewValueInt(dp.IntValue())) - } - } - } - - res := make([][]byte, 0, len(docs)) - - for _, doc := range docs { - if m.dedup { - doc.Dedup() - } else if m.dedot { - doc.Sort() - } - - var buf bytes.Buffer - err := doc.Serialize(&buf, m.dedot) - if err != nil { - return nil, err - } +func (m *encodeModel) encodeMetricDataPoint(resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dataPoint pmetric.NumberDataPoint) ([]byte, error) { + var document objmodel.Document - res = append(res, buf.Bytes()) + document.AddAttributes("", resource.Attributes()) + document.AddTimestamp("@timestamp", dataPoint.Timestamp()) + document.AddAttributes("", dataPoint.Attributes()) + switch dataPoint.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + document.AddAttribute(metric.Name(), pcommon.NewValueDouble(dataPoint.DoubleValue())) + case pmetric.NumberDataPointValueTypeInt: + document.AddAttribute(metric.Name(), pcommon.NewValueInt(dataPoint.IntValue())) } - return res, nil -} - -func metricHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { - hasher := fnv.New32a() - - timestampBuf := make([]byte, 8) - binary.LittleEndian.PutUint64(timestampBuf, uint64(timestamp)) - hasher.Write(timestampBuf) - - mapHash(hasher, attributes) - - return hasher.Sum32() -} - -func mapHash(hasher hash.Hash, m pcommon.Map) { - m.Range(func(k string, v pcommon.Value) bool { - hasher.Write([]byte(k)) - valueHash(hasher, v) - - return true - }) -} - -func valueHash(h hash.Hash, v pcommon.Value) { - switch v.Type() { - case pcommon.ValueTypeEmpty: - h.Write([]byte{0}) - case pcommon.ValueTypeStr: - h.Write([]byte(v.Str())) - case pcommon.ValueTypeBool: - if v.Bool() { - h.Write([]byte{1}) - } else { - h.Write([]byte{0}) - } - case pcommon.ValueTypeDouble: - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, math.Float64bits(v.Double())) - h.Write(buf) - case pcommon.ValueTypeInt: - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, uint64(v.Int())) - h.Write(buf) - case pcommon.ValueTypeBytes: - h.Write(v.Bytes().AsRaw()) - case pcommon.ValueTypeMap: - mapHash(h, v.Map()) - case pcommon.ValueTypeSlice: - sliceHash(h, v.Slice()) + if m.dedup { + document.Dedup() + } else if m.dedot { + document.Sort() } -} -func sliceHash(h hash.Hash, s pcommon.Slice) { - for i := 0; i < s.Len(); i++ { - valueHash(h, s.At(i)) + var buf bytes.Buffer + err := document.Serialize(&buf, m.dedot) + if err != nil { + return nil, err } + return buf.Bytes(), nil } func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 26199ba8cca9..0cc56b186600 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -92,10 +92,20 @@ func TestEncodeMetric(t *testing.T) { dedup: true, mode: MappingNone, } - docsBytes, err := model.encodeMetrics(metrics.ResourceMetrics().At(0).Resource(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope()) + var docsBytes [][]byte + for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ { + documentBytes, err := model.encodeMetricDataPoint( + metrics.ResourceMetrics().At(0).Resource(), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)) + require.NoError(t, err) + docsBytes = append(docsBytes, documentBytes) + } + + assert.Len(t, docsBytes, len(strings.Split(expectedMetricsEncoded, "\n"))) // Convert the byte arrays to strings and sort the docs to make the test deterministic. - require.NoError(t, err) docs := make([]string, 0, len(docsBytes)) for _, docBytes := range docsBytes { docs = append(docs, string(docBytes)) diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index acd6e92f9001..3f1409c6cfab 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -82,3 +82,25 @@ elasticsearch/deprecated_index: index: my_log_index elasticsearch/confighttp_endpoint: endpoint: https://elastic.example.com:9200 +elasticsearch/prefix-suffix-mode: + endpoint: https://elastic.example.com:9200 + logs_dynamic_index: + enabled: true + mode: prefix_suffix + metrics_dynamic_index: + enabled: true + mode: prefix_suffix + traces_dynamic_index: + enabled: true + mode: prefix_suffix +elasticsearch/data-stream-mode: + endpoint: https://elastic.example.com:9200 + logs_dynamic_index: + enabled: true + mode: data_stream + metrics_dynamic_index: + enabled: true + mode: data_stream + traces_dynamic_index: + enabled: true + mode: data_stream diff --git a/exporter/elasticsearchexporter/utils_test.go b/exporter/elasticsearchexporter/utils_test.go index de3d60418b24..21b839823604 100644 --- a/exporter/elasticsearchexporter/utils_test.go +++ b/exporter/elasticsearchexporter/utils_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -236,6 +237,16 @@ func newLogsWithAttributeAndResourceMap(attrMp map[string]string, resMp map[stri return logs } +func newMetricsWithAttributeAndResourceMap(attrMp map[string]string, resMp map[string]string) pmetric.Metrics { + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + + fillResourceAttributeMap(resourceMetrics.Resource().Attributes(), resMp) + fillResourceAttributeMap(resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty().Attributes(), attrMp) + + return metrics +} + func newTracesWithAttributeAndResourceMap(attrMp map[string]string, resMp map[string]string) ptrace.Traces { traces := ptrace.NewTraces() resourceSpans := traces.ResourceSpans() From dc8e9803d5cd73de20285ed2759d60d41090bc05 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Tue, 25 Jun 2024 12:29:44 +0200 Subject: [PATCH 2/4] add issue number to changelog entry --- ...sticsearch-exporter-attribute-based-data-stream-routing.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml index 842b7b62161b..80f294615042 100644 --- a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml +++ b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml @@ -10,7 +10,7 @@ component: exporter/elasticsearch note: Add data stream routing # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [] +issues: [33755] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From cb88ebcef50226de5630e4c080f032ce59ef989b Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Wed, 26 Jun 2024 12:17:13 +0200 Subject: [PATCH 3/4] fill in missing data stream attributes --- exporter/elasticsearchexporter/attribute.go | 16 +++-- .../data-stream-router.go | 67 +++++++++++++++++++ exporter/elasticsearchexporter/exporter.go | 14 +--- exporter/elasticsearchexporter/factory.go | 4 ++ 4 files changed, 84 insertions(+), 17 deletions(-) create mode 100644 exporter/elasticsearchexporter/data-stream-router.go diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 85feec25f8bb..25f08568ec86 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -7,12 +7,16 @@ import "go.opentelemetry.io/collector/pdata/pcommon" // dynamic index attribute key constants const ( - indexPrefix = "elasticsearch.index.prefix" - indexSuffix = "elasticsearch.index.suffix" - dataStreamDataset = "data_stream.dataset" - dataStreamNamespace = "data_stream.namespace" - defaultDataStreamDataset = "generic" - defaultDataStreamNamespace = "default" + indexPrefix = "elasticsearch.index.prefix" + indexSuffix = "elasticsearch.index.suffix" + dataStreamDataset = "data_stream.dataset" + dataStreamNamespace = "data_stream.namespace" + dataStreamType = "data_stream.type" + defaultDataStreamDataset = "generic" + defaultDataStreamNamespace = "default" + defaultDataStreamTypeLogs = "logs" + defaultDataStreamTypeMetrics = "metrics" + defaultDataStreamTypeTraces = "traces" ) // resource is higher priotized than record attribute diff --git a/exporter/elasticsearchexporter/data-stream-router.go b/exporter/elasticsearchexporter/data-stream-router.go new file mode 100644 index 000000000000..7b1044115e89 --- /dev/null +++ b/exporter/elasticsearchexporter/data-stream-router.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + +import ( + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes. +// It searches for the routing attributes on the log record, scope, and resource. +// It creates missing routing attributes on the log record if they are not found. +func routeLogRecord( + record *plog.LogRecord, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, +) string { + dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes()) + dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeLogs, record.Attributes(), scope.Attributes(), resource.Attributes()) + return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace) +} + +// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes. +// It searches for the routing attributes on the data point, scope, and resource. +// It creates missing routing attributes on the data point if they are not found. +func routeDataPoint( + dataPoint pmetric.NumberDataPoint, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, +) string { + dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeMetrics, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace) +} + +// routeSpan returns the name of the index to send the span to according to data stream routing attributes. +// It searches for the routing attributes on the span, scope, and resource. +// It creates missing routing attributes on the span if they are not found. +func routeSpan( + span ptrace.Span, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, +) string { + dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes()) + dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeTraces, span.Attributes(), scope.Attributes(), resource.Attributes()) + return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace) +} + +func ensureAttribute(attributeName string, defaultValue string, recordAttributes, scopeAttributes, resourceAttributes pcommon.Map) string { + // Try to retrieve the attribute value from an existing attribute. + value := getFromAttributesNew(attributeName, "", recordAttributes, scopeAttributes, resourceAttributes) + + // If the value is not found, set the default value on the record. + if value == "" { + value = defaultValue + recordAttributes.PutStr(attributeName, value) + } + return value +} diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ce696c274f7d..7dc564695168 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -121,10 +121,7 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcom fIndex := e.index if e.dynamicIndex { if e.dynamicIndexMode == DynamicIndexModeDataStream { - dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes()) - namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes()) - - fIndex = fmt.Sprintf("logs-%s-%s", dataSet, namespace) + fIndex = routeLogRecord(&record, scope, resource) } else { prefix := getFromAttributes(indexPrefix, resource, scope, record) suffix := getFromAttributes(indexSuffix, resource, scope, record) @@ -199,9 +196,7 @@ func (e *elasticsearchExporter) pushMetricDataPoint( fIndex := e.index if e.dynamicIndex { if e.dynamicIndexMode == DynamicIndexModeDataStream { - dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - fIndex = fmt.Sprintf("metrics-%s-%s", dataSet, namespace) + fIndex = routeDataPoint(dataPoint, scope, resource) } else { prefix := getFromAttributesNew(indexPrefix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) suffix := getFromAttributesNew(indexSuffix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) @@ -258,10 +253,7 @@ func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pc fIndex := e.index if e.dynamicIndex { if e.dynamicIndexMode == DynamicIndexModeDataStream { - dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes()) - namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes()) - - fIndex = fmt.Sprintf("traces-%s-%s", dataSet, namespace) + fIndex = routeSpan(span, scope, resource) } else { prefix := getFromAttributes(indexPrefix, resource, scope, span) suffix := getFromAttributes(indexSuffix, resource, scope, span) diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index e0bcaf6884b3..c107fb090b2a 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -115,6 +116,7 @@ func createLogsExporter( set, cfg, exporter.pushLogsData, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), @@ -137,6 +139,7 @@ func createMetricsExporter( set, cfg, exporter.pushMetricsData, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), @@ -158,6 +161,7 @@ func createTracesExporter(ctx context.Context, set, cfg, exporter.pushTraceData, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), From 7f41b41c769086580fdcf3225b3e07bf89c1ef0f Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Wed, 26 Jun 2024 12:45:44 +0200 Subject: [PATCH 4/4] make gotidy --- exporter/elasticsearchexporter/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index bbff538ff408..9c49a61022cb 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -17,6 +17,7 @@ require ( go.opentelemetry.io/collector/config/confighttp v0.103.0 go.opentelemetry.io/collector/config/configopaque v1.10.0 go.opentelemetry.io/collector/confmap v0.103.0 + go.opentelemetry.io/collector/consumer v0.103.0 go.opentelemetry.io/collector/exporter v0.103.0 go.opentelemetry.io/collector/extension/auth v0.103.0 go.opentelemetry.io/collector/pdata v1.10.0 @@ -70,7 +71,6 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.103.0 // indirect go.opentelemetry.io/collector/config/configtls v0.103.0 // indirect go.opentelemetry.io/collector/config/internal v0.103.0 // indirect - go.opentelemetry.io/collector/consumer v0.103.0 // indirect go.opentelemetry.io/collector/extension v0.103.0 // indirect go.opentelemetry.io/collector/featuregate v1.10.0 // indirect go.opentelemetry.io/collector/receiver v0.103.0 // indirect