Skip to content

Commit

Permalink
[exporter/elasticsearch] add metrics support
Browse files Browse the repository at this point in the history
  • Loading branch information
andrzej-stencel committed Jun 13, 2024
1 parent cafcd4d commit 19af1ea
Show file tree
Hide file tree
Showing 18 changed files with 872 additions and 29 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add-metrics-to-elasticsearch-exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add initial support for metrics

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33513]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
21 changes: 20 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta]: traces, logs |
| Stability | [development]: metrics |
| | [beta]: traces, logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Felasticsearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Felasticsearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Felasticsearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Felasticsearch) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@JaredTan95](https://www.github.com/JaredTan95), [@ycombinator](https://www.github.com/ycombinator), [@carsonip](https://www.github.com/carsonip) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->
Expand Down Expand Up @@ -91,6 +93,13 @@ This can be customised through the following settings:
takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute)
- `enabled`(default=false): Enable/Disable dynamic index for log records
- `metrics_index`: The [index] or [data stream] name to publish events to. The default value is `metrics-generic-default`.
⚠️ Note that metrics support is currently in development.
- `metrics_dynamic_index` (optional):
takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `metrics_index`.
⚠️ Note that metrics support is currently in development.
- `enabled`(default=false): Enable/Disable dynamic index for metrics
- `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`.
- `traces_dynamic_index` (optional):
takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
Expand Down Expand Up @@ -170,6 +179,16 @@ Settings related to node discovery are:

Node discovery can be disabled by setting `discover.interval` to 0.

## Exporting metrics

Metrics support is currently in development.
The only metric types supported are:

- Gauge
- Sum

Other metric types (Histogram, Exponential Histogram, Summary) are ignored.

[confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings
[configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings
[configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration
Expand Down
10 changes: 10 additions & 0 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type attrGetter interface {
}

// retrieve attribute out of resource, scope, and record (span or log, if not found in resource)
// Deprecated: Use getFromAttributesNew instead.
func getFromAttributes(name string, resource, scope, record attrGetter) string {
var str string
val, exist := resource.Attributes().Get(name)
Expand All @@ -37,3 +38,12 @@ func getFromAttributes(name string, resource, scope, record attrGetter) string {
}
return str
}

func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) string {
for _, attributeMap := range attributeMaps {
if value, exists := attributeMap.Get(name); exists {
return value.AsString()
}
}
return defaultValue
}
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type Config struct {
LogsIndex string `mapstructure:"logs_index"`
// fall back to pure LogsIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute)
LogsDynamicIndex DynamicIndexSetting `mapstructure:"logs_dynamic_index"`

// This setting is required when the exporter is used in a metrics pipeline.
MetricsIndex string `mapstructure:"metrics_index"`
// fall back to pure MetricsIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource attributes
MetricsDynamicIndex DynamicIndexSetting `mapstructure:"metrics_dynamic_index"`

// This setting is required when traces pipelines used.
TracesIndex string `mapstructure:"traces_index"`
// fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute)
Expand Down
75 changes: 65 additions & 10 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
TracesIndex: "trace_index",
Pipeline: "mypipeline",
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
MetricsIndex: "metrics-generic-default",
TracesIndex: "trace_index",
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -109,11 +110,65 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
MetricsIndex: "metrics-generic-default",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
IdleConnTimeout: &defaultIdleConnTimeout,
Headers: map[string]configopaque.String{
"myheader": "test",
},
},
Authentication: AuthenticationSettings{
User: "elastic",
Password: "search",
APIKey: "AvFsEiPs==",
},
Discovery: DiscoverySettings{
OnStart: true,
},
Flush: FlushSettings{
Bytes: 10485760,
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
},
Mapping: MappingsSettings{
Mode: "none",
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
},
},
{
id: component.NewIDWithName(metadata.Type, "metric"),
configFile: "config.yaml",
expected: &Config{
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
MetricsIndex: "my_metric_index",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down
63 changes: 61 additions & 2 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -132,11 +133,69 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcom

document, err := e.model.encodeLog(resource, record, scope)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
return fmt.Errorf("failed to encode log event: %w", err)
}
return pushDocuments(ctx, fIndex, document, e.bulkIndexer)
}

func (e *elasticsearchExporter) pushMetricsData(
ctx context.Context,
metrics pmetric.Metrics,
) error {
var errs []error

rls := metrics.ResourceMetrics()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
resource := rl.Resource()
ills := rl.ScopeMetrics()
for j := 0; j < ills.Len(); j++ {
scope := ills.At(j).Scope()
metrics := ills.At(j).Metrics()

if err := e.pushMetricSlice(ctx, resource, metrics, scope); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}

errs = append(errs, err)
}

}
}

return errors.Join(errs...)
}

func (e *elasticsearchExporter) pushMetricSlice(
ctx context.Context,
resource pcommon.Resource,
slice pmetric.MetricSlice,
scope pcommon.InstrumentationScope,
) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromAttributesNew(indexPrefix, "", resource.Attributes())
suffix := getFromAttributesNew(indexSuffix, "", resource.Attributes())

fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

documents, err := e.model.encodeMetrics(resource, slice, scope)
if err != nil {
return fmt.Errorf("failed to encode metric event: %w", err)
}

for _, document := range documents {
err := pushDocuments(ctx, fIndex, document, e.bulkIndexer)
if err != nil {
return err
}
}

return nil
}

func (e *elasticsearchExporter) pushTraceData(
ctx context.Context,
td ptrace.Traces,
Expand Down Expand Up @@ -185,7 +244,7 @@ func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pc

document, err := e.model.encodeSpan(resource, span, scope)
if err != nil {
return fmt.Errorf("Failed to encode trace record: %w", err)
return fmt.Errorf("failed to encode trace record: %w", err)
}
return pushDocuments(ctx, fIndex, document, e.bulkIndexer)
}
69 changes: 69 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/auth/authtest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -449,6 +451,26 @@ func TestExporterLogs(t *testing.T) {
})
}

func TestExporterMetrics(t *testing.T) {
t.Run("publish with success", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL)
dp := pmetric.NewNumberDataPoint()
dp.SetDoubleValue(123.456)
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
mustSendMetricSumDataPoints(t, exporter, dp)
mustSendMetricGaugeDataPoints(t, exporter, dp)

rec.WaitItems(2)
})

}

func TestExporterTraces(t *testing.T) {
t.Run("publish with success", func(t *testing.T) {
rec := newBulkRecorder()
Expand Down Expand Up @@ -634,6 +656,24 @@ func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) expor
return exp
}

func newTestMetricsExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Metrics {
f := NewFactory()
cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) {
cfg.Endpoints = []string{url}
cfg.NumWorkers = 1
cfg.Flush.Interval = 10 * time.Millisecond
}}, fns...)...)
exp, err := f.CreateMetricsExporter(context.Background(), exportertest.NewNopSettings(), cfg)
require.NoError(t, err)

err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
return exp
}

func newTestLogsExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Logs {
exp := newUnstartedTestLogsExporter(t, url, fns...)
err := exp.Start(context.Background(), componenttest.NewNopHost())
Expand Down Expand Up @@ -671,6 +711,35 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) {
require.NoError(t, err)
}

func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) {
metrics := pmetric.NewMetrics()
scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
for _, dataPoint := range dataPoints {
metric := scopeMetrics.Metrics().AppendEmpty()
metric.SetEmptySum()
metric.SetName("sum")
dataPoint.CopyTo(metric.Sum().DataPoints().AppendEmpty())
}
mustSendMetrics(t, exporter, metrics)
}

func mustSendMetricGaugeDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) {
metrics := pmetric.NewMetrics()
scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
for _, dataPoint := range dataPoints {
metric := scopeMetrics.Metrics().AppendEmpty()
metric.SetEmptyGauge()
metric.SetName("gauge")
dataPoint.CopyTo(metric.Gauge().DataPoints().AppendEmpty())
}
mustSendMetrics(t, exporter, metrics)
}

func mustSendMetrics(t *testing.T, exporter exporter.Metrics, metrics pmetric.Metrics) {
err := exporter.ConsumeMetrics(context.Background(), metrics)
require.NoError(t, err)
}

func mustSendSpans(t *testing.T, exporter exporter.Traces, spans ...ptrace.Span) {
traces := ptrace.NewTraces()
resourceSpans := traces.ResourceSpans().AppendEmpty()
Expand Down
Loading

0 comments on commit 19af1ea

Please sign in to comment.