Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] route based on data stream attributes #33755

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add data stream routing

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33755]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
58 changes: 40 additions & 18 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).
This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).

## Configuration options

Expand Down Expand Up @@ -83,32 +83,52 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export
### Elasticsearch document routing

Telemetry data will be written to signal specific data streams by default:
logs to `logs-generic-default`, and traces to `traces-generic-default`.
logs to `logs-generic-default`, metrics to `metrics-generic-default`, and traces to `traces-generic-default`.
This can be customised through the following settings:

- `index` (DEPRECATED, please use `logs_index` for logs, `traces_index` for traces): The [index] or [data stream] name to publish events to.
- `index` (DEPRECATED, please use `logs_index` for logs, `metrics_index` for metrics, `traces_index` for traces): The [index] or [data stream] name to publish events to.
The default value is `logs-generic-default`.

- `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default`
- `logs_dynamic_index` (optional):
takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute)

- `logs_dynamic_index` (optional): uses resource or log record attributes to dynamically construct index name. See `mode` for details.
- `enabled`(default=false): Enable/Disable dynamic index for log records
- `metrics_index`: The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.
- `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed.
- `data_stream` - uses resource, scope or log record attributes `data_stream.dataset` and `data_stream.namespace`
to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`.
Log record attributes take precedence over scope attributes, which take precedence over resource attributes.
- `prefix_suffix` - uses resource or log record attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. (priority: resource attribute > log record attribute)

- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.
⚠️ Note that metrics support is currently in development.
- `metrics_dynamic_index` (optional):
takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `metrics_index`.

- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. See `mode` for details.
⚠️ Note that metrics support is currently in development.
- `enabled`(default=false): Enable/Disable dynamic index for metrics
- `enabled`(default=true): Enable/disable dynamic index for metrics
- `mode` (default=`data_stream`): defines how dynamic index name is constructed.
- `data_stream` - uses resource, scope or data point attributes `data_stream.dataset` and `data_stream.namespace`
to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`.
Data point attributes take precedence over scope attributes, which take precedence over resource attributes.
- `prefix_suffix` - uses resource, scope or data point attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`.
Data point attributes take precedence over scope attributes, which take precedence over resource attributes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metrics prefix_suffix precedence is different to logs and traces. Is this deliberate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you've created #33725 but the question remains: is it worth it to have this exception / inconsistency for metrics prefix_suffix mode? Otherwise, does it make sense to break all of them at once when a decision is made in #33725 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I created #33725 because I think the current setting for prefix_suffix should be changed.

When introducing metrics, my thinking was to introduce the prefix_suffix mode for metrics using the correct precedence, to prevent a breaking change at least in metrics behavior.

I do see value in having those consistent between signals though. If you think we should rather keep prefix_suffix mode precedence consistent with other signals, I can change that.


- `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`.
- `traces_dynamic_index` (optional):
takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute)

- `traces_dynamic_index` (optional): uses resource or span attributes to dynamically construct index name. See `mode` for details.
- `enabled`(default=false): Enable/Disable dynamic index for trace spans
- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format.
- `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date,
e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.
- `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed.
- `data_stream` - uses resource attributes `data_stream.dataset` and `data_stream.namespace`
to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`.
Span attributes take precedence over scope attributes, which take precedence over resource attributes.
- `prefix_suffix` - uses resource or span attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. (priority: resource attribute > span attribute)

- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format.
- `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix,
e.g: If `logs_index` or `logs_dynamic_index` is equal to `logs-generic-default`, your index will become `logs-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.

Expand Down Expand Up @@ -189,6 +209,8 @@ The only metric types supported are:

Other metric types (Histogram, Exponential Histogram, Summary) are ignored.

Dynamic indexing in `data_stream` mode is enabled by default for metrics. See `metrics_dynamic_index` configuration property for details.

[confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings
[configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings
[configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration
Expand Down
12 changes: 10 additions & 2 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@ import "go.opentelemetry.io/collector/pdata/pcommon"

// dynamic index attribute key constants
const (
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
dataStreamDataset = "data_stream.dataset"
dataStreamNamespace = "data_stream.namespace"
dataStreamType = "data_stream.type"
defaultDataStreamDataset = "generic"
defaultDataStreamNamespace = "default"
defaultDataStreamTypeLogs = "logs"
defaultDataStreamTypeMetrics = "metrics"
defaultDataStreamTypeTraces = "traces"
)

// resource is higher priotized than record attribute
Expand Down
29 changes: 28 additions & 1 deletion exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"encoding"
"encoding/base64"
"errors"
"fmt"
Expand Down Expand Up @@ -80,7 +81,33 @@ type LogstashFormatSettings struct {
}

type DynamicIndexSetting struct {
Enabled bool `mapstructure:"enabled"`
Enabled bool `mapstructure:"enabled"`
Mode DynamicIndexMode `mapstructure:"mode"`
}

type DynamicIndexMode string

const DynamicIndexModeDataStream DynamicIndexMode = "data_stream"
const DynamicIndexModePrefixSuffix DynamicIndexMode = "prefix_suffix"

var _ encoding.TextUnmarshaler = (*DynamicIndexMode)(nil)

func (m *DynamicIndexMode) UnmarshalText(text []byte) error {
if m == nil {
return errors.New("cannot unmarshal to a nil *DynamicIndexMode")
}

str := string(text)
switch str {
case string(DynamicIndexModeDataStream):
*m = DynamicIndexModeDataStream
case string(DynamicIndexModePrefixSuffix):
*m = DynamicIndexModePrefixSuffix
default:
return fmt.Errorf("unknown dynamic index mode %s", str)
}

return nil
}

// AuthenticationSettings defines user authentication related settings.
Expand Down
104 changes: 89 additions & 15 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,24 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
MetricsIndex: "metrics-generic-default",
TracesIndex: "trace_index",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
},
TracesIndex: "trace_index",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -110,12 +122,24 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
MetricsIndex: "metrics-generic-default",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
},
TracesIndex: "traces-generic-default",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -163,12 +187,24 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
MetricsIndex: "my_metric_index",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
},
TracesIndex: "traces-generic-default",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -239,6 +275,44 @@ func TestConfig(t *testing.T) {
cfg.Endpoint = "https://elastic.example.com:9200"
}),
},
{
id: component.NewIDWithName(metadata.Type, "data-stream-mode"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"
cfg.LogsDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
}
cfg.MetricsDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
}
cfg.TracesDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
}
}),
},
{
id: component.NewIDWithName(metadata.Type, "prefix-suffix-mode"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"
cfg.LogsDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModePrefixSuffix,
}
cfg.MetricsDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModePrefixSuffix,
}
cfg.TracesDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModePrefixSuffix,
}
}),
},
}

for _, tt := range tests {
Expand Down
67 changes: 67 additions & 0 deletions exporter/elasticsearchexporter/data-stream-router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes.
// It searches for the routing attributes on the log record, scope, and resource.
// It creates missing routing attributes on the log record if they are not found.
func routeLogRecord(
record *plog.LogRecord,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
) string {
dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes())
namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes())
dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeLogs, record.Attributes(), scope.Attributes(), resource.Attributes())
return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace)
Comment on lines +25 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking this PR but the type should be fixed based on the record type. Similar story for metrics and spans.

Suggested change
dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeLogs, record.Attributes(), scope.Attributes(), resource.Attributes())
return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace)
recordAttributes.PutStr(dataStreamType, "logs")
return fmt.Sprintf("logs-%s-%s", dataSet, namespace)

}

// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
// It searches for the routing attributes on the data point, scope, and resource.
// It creates missing routing attributes on the data point if they are not found.
func routeDataPoint(
dataPoint pmetric.NumberDataPoint,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
) string {
dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes())
namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes())
dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeMetrics, dataPoint.Attributes(), scope.Attributes(), resource.Attributes())
return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace)
}

// routeSpan returns the name of the index to send the span to according to data stream routing attributes.
// It searches for the routing attributes on the span, scope, and resource.
// It creates missing routing attributes on the span if they are not found.
func routeSpan(
span ptrace.Span,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
) string {
dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes())
namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes())
dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeTraces, span.Attributes(), scope.Attributes(), resource.Attributes())
return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace)
}

func ensureAttribute(attributeName string, defaultValue string, recordAttributes, scopeAttributes, resourceAttributes pcommon.Map) string {
// Try to retrieve the attribute value from an existing attribute.
value := getFromAttributesNew(attributeName, "", recordAttributes, scopeAttributes, resourceAttributes)

// If the value is not found, set the default value on the record.
if value == "" {
value = defaultValue
recordAttributes.PutStr(attributeName, value)
}
return value
}
Loading