diff --git a/.chloggen/add-metrics-to-elasticsearch-exporter.yaml b/.chloggen/add-metrics-to-elasticsearch-exporter.yaml new file mode 100644 index 000000000000..9263bef32fde --- /dev/null +++ b/.chloggen/add-metrics-to-elasticsearch-exporter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/elasticsearch + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add initial support for metrics + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33513] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 43b799582107..b2093cd2ecb4 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -3,11 +3,13 @@ | Status | | | ------------- |-----------| -| Stability | [beta]: traces, logs | +| Stability | [development]: metrics | +| | [beta]: traces, logs | | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Felasticsearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Felasticsearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Felasticsearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Felasticsearch) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@JaredTan95](https://www.github.com/JaredTan95), [@ycombinator](https://www.github.com/ycombinator), [@carsonip](https://www.github.com/carsonip) | +[development]: https://github.com/open-telemetry/opentelemetry-collector#development [beta]: https://github.com/open-telemetry/opentelemetry-collector#beta [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib @@ -91,6 +93,13 @@ This can be customised through the following settings: takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute) - `enabled`(default=false): Enable/Disable dynamic index for log records +- `metrics_index`: The [index] or [data stream] name to publish events to. The default value is `metrics-generic-default`. + ⚠️ Note that metrics support is currently in development. +- `metrics_dynamic_index` (optional): + takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` + resulting dynamically prefixed / suffixed indexing based on `metrics_index`. + ⚠️ Note that metrics support is currently in development. + - `enabled`(default=false): Enable/Disable dynamic index for metrics - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. - `traces_dynamic_index` (optional): takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` @@ -170,6 +179,16 @@ Settings related to node discovery are: Node discovery can be disabled by setting `discover.interval` to 0. +## Exporting metrics + +Metrics support is currently in development. +The only metric types supported are: + +- Gauge +- Sum + +Other metric types (Histogram, Exponential Histogram, Summary) are ignored. + [confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings [configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings [configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 311b4ace84d1..987b13f807bb 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -17,6 +17,7 @@ type attrGetter interface { } // retrieve attribute out of resource, scope, and record (span or log, if not found in resource) +// Deprecated: Use getFromAttributesNew instead. func getFromAttributes(name string, resource, scope, record attrGetter) string { var str string val, exist := resource.Attributes().Get(name) @@ -37,3 +38,12 @@ func getFromAttributes(name string, resource, scope, record attrGetter) string { } return str } + +func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) string { + for _, attributeMap := range attributeMaps { + if value, exists := attributeMap.Get(name); exists { + return value.AsString() + } + } + return defaultValue +} diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index f1f13168de22..daf83e4da3ba 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -47,6 +47,12 @@ type Config struct { LogsIndex string `mapstructure:"logs_index"` // fall back to pure LogsIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute) LogsDynamicIndex DynamicIndexSetting `mapstructure:"logs_dynamic_index"` + + // This setting is required when the exporter is used in a metrics pipeline. + MetricsIndex string `mapstructure:"metrics_index"` + // fall back to pure MetricsIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource attributes + MetricsDynamicIndex DynamicIndexSetting `mapstructure:"metrics_dynamic_index"` + // This setting is required when traces pipelines used. TracesIndex string `mapstructure:"traces_index"` // fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute) diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index cafa1f65b541..8ca137118a0a 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -57,11 +57,12 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"https://elastic.example.com:9200"}, - Index: "", - LogsIndex: "logs-generic-default", - TracesIndex: "trace_index", - Pipeline: "mypipeline", + Endpoints: []string{"https://elastic.example.com:9200"}, + Index: "", + LogsIndex: "logs-generic-default", + MetricsIndex: "metrics-generic-default", + TracesIndex: "trace_index", + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, @@ -109,11 +110,65 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"http://localhost:9200"}, - Index: "", - LogsIndex: "my_log_index", - TracesIndex: "traces-generic-default", - Pipeline: "mypipeline", + Endpoints: []string{"http://localhost:9200"}, + Index: "", + LogsIndex: "my_log_index", + MetricsIndex: "metrics-generic-default", + TracesIndex: "traces-generic-default", + Pipeline: "mypipeline", + ClientConfig: confighttp.ClientConfig{ + Timeout: 2 * time.Minute, + MaxIdleConns: &defaultMaxIdleConns, + IdleConnTimeout: &defaultIdleConnTimeout, + Headers: map[string]configopaque.String{ + "myheader": "test", + }, + }, + Authentication: AuthenticationSettings{ + User: "elastic", + Password: "search", + APIKey: "AvFsEiPs==", + }, + Discovery: DiscoverySettings{ + OnStart: true, + }, + Flush: FlushSettings{ + Bytes: 10485760, + }, + Retry: RetrySettings{ + Enabled: true, + MaxRequests: 5, + InitialInterval: 100 * time.Millisecond, + MaxInterval: 1 * time.Minute, + RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, + }, + Mapping: MappingsSettings{ + Mode: "none", + Dedup: true, + Dedot: true, + }, + LogstashFormat: LogstashFormatSettings{ + Enabled: false, + PrefixSeparator: "-", + DateFormat: "%Y.%m.%d", + }, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "metric"), + configFile: "config.yaml", + expected: &Config{ + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, + QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, + }, + Endpoints: []string{"http://localhost:9200"}, + Index: "", + LogsIndex: "logs-generic-default", + MetricsIndex: "my_metric_index", + TracesIndex: "traces-generic-default", + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 2611dd42727d..b5e5801142e9 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -132,11 +133,69 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcom document, err := e.model.encodeLog(resource, record, scope) if err != nil { - return fmt.Errorf("Failed to encode log event: %w", err) + return fmt.Errorf("failed to encode log event: %w", err) } return pushDocuments(ctx, fIndex, document, e.bulkIndexer) } +func (e *elasticsearchExporter) pushMetricsData( + ctx context.Context, + metrics pmetric.Metrics, +) error { + var errs []error + + rls := metrics.ResourceMetrics() + for i := 0; i < rls.Len(); i++ { + rl := rls.At(i) + resource := rl.Resource() + ills := rl.ScopeMetrics() + for j := 0; j < ills.Len(); j++ { + scope := ills.At(j).Scope() + metrics := ills.At(j).Metrics() + + if err := e.pushMetricSlice(ctx, resource, metrics, scope); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + + errs = append(errs, err) + } + + } + } + + return errors.Join(errs...) +} + +func (e *elasticsearchExporter) pushMetricSlice( + ctx context.Context, + resource pcommon.Resource, + slice pmetric.MetricSlice, + scope pcommon.InstrumentationScope, +) error { + fIndex := e.index + if e.dynamicIndex { + prefix := getFromAttributesNew(indexPrefix, "", resource.Attributes()) + suffix := getFromAttributesNew(indexSuffix, "", resource.Attributes()) + + fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } + + documents, err := e.model.encodeMetrics(resource, slice, scope) + if err != nil { + return fmt.Errorf("failed to encode metric event: %w", err) + } + + for _, document := range documents { + err := pushDocuments(ctx, fIndex, document, e.bulkIndexer) + if err != nil { + return err + } + } + + return nil +} + func (e *elasticsearchExporter) pushTraceData( ctx context.Context, td ptrace.Traces, @@ -185,7 +244,7 @@ func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pc document, err := e.model.encodeSpan(resource, span, scope) if err != nil { - return fmt.Errorf("Failed to encode trace record: %w", err) + return fmt.Errorf("failed to encode trace record: %w", err) } return pushDocuments(ctx, fIndex, document, e.bulkIndexer) } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 1fe7c04dc799..d2fda9b9aeb9 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -25,7 +25,9 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/extension/auth/authtest" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -449,6 +451,26 @@ func TestExporterLogs(t *testing.T) { }) } +func TestExporterMetrics(t *testing.T) { + t.Run("publish with success", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL) + dp := pmetric.NewNumberDataPoint() + dp.SetDoubleValue(123.456) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + mustSendMetricSumDataPoints(t, exporter, dp) + mustSendMetricGaugeDataPoints(t, exporter, dp) + + rec.WaitItems(2) + }) + +} + func TestExporterTraces(t *testing.T) { t.Run("publish with success", func(t *testing.T) { rec := newBulkRecorder() @@ -634,6 +656,24 @@ func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) expor return exp } +func newTestMetricsExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Metrics { + f := NewFactory() + cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { + cfg.Endpoints = []string{url} + cfg.NumWorkers = 1 + cfg.Flush.Interval = 10 * time.Millisecond + }}, fns...)...) + exp, err := f.CreateMetricsExporter(context.Background(), exportertest.NewNopSettings(), cfg) + require.NoError(t, err) + + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + return exp +} + func newTestLogsExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Logs { exp := newUnstartedTestLogsExporter(t, url, fns...) err := exp.Start(context.Background(), componenttest.NewNopHost()) @@ -671,6 +711,35 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { require.NoError(t, err) } +func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) { + metrics := pmetric.NewMetrics() + scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + for _, dataPoint := range dataPoints { + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetEmptySum() + metric.SetName("sum") + dataPoint.CopyTo(metric.Sum().DataPoints().AppendEmpty()) + } + mustSendMetrics(t, exporter, metrics) +} + +func mustSendMetricGaugeDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) { + metrics := pmetric.NewMetrics() + scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + for _, dataPoint := range dataPoints { + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("gauge") + dataPoint.CopyTo(metric.Gauge().DataPoints().AppendEmpty()) + } + mustSendMetrics(t, exporter, metrics) +} + +func mustSendMetrics(t *testing.T, exporter exporter.Metrics, metrics pmetric.Metrics) { + err := exporter.ConsumeMetrics(context.Background(), metrics) + require.NoError(t, err) +} + func mustSendSpans(t *testing.T, exporter exporter.Traces, spans ...ptrace.Span) { traces := ptrace.NewTraces() resourceSpans := traces.ResourceSpans().AppendEmpty() diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index fedd0c50b36b..8829215c8ac6 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -21,8 +21,9 @@ import ( const ( // The value of "type" key in configuration. - defaultLogsIndex = "logs-generic-default" - defaultTracesIndex = "traces-generic-default" + defaultLogsIndex = "logs-generic-default" + defaultMetricsIndex = "metrics-generic-default" + defaultTracesIndex = "traces-generic-default" ) // NewFactory creates a factory for Elastic exporter. @@ -31,6 +32,7 @@ func NewFactory() exporter.Factory { metadata.Type, createDefaultConfig, exporter.WithLogs(createLogsExporter, metadata.LogsStability), + exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability), exporter.WithTraces(createTracesExporter, metadata.TracesStability), ) } @@ -47,6 +49,7 @@ func createDefaultConfig() component.Config { ClientConfig: httpClientConfig, Index: "", LogsIndex: defaultLogsIndex, + MetricsIndex: defaultMetricsIndex, TracesIndex: defaultTracesIndex, Retry: RetrySettings{ Enabled: true, @@ -106,6 +109,28 @@ func createLogsExporter( ) } +func createMetricsExporter( + ctx context.Context, + set exporter.Settings, + cfg component.Config, +) (exporter.Metrics, error) { + cf := cfg.(*Config) + + exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) + if err != nil { + return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) + } + return exporterhelper.NewMetricsExporter( + ctx, + set, + cfg, + exporter.pushMetricsData, + exporterhelper.WithStart(exporter.Start), + exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithQueue(cf.QueueSettings), + ) +} + func createTracesExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Traces, error) { diff --git a/exporter/elasticsearchexporter/factory_test.go b/exporter/elasticsearchexporter/factory_test.go index 4cb39d9393e9..268e767692bf 100644 --- a/exporter/elasticsearchexporter/factory_test.go +++ b/exporter/elasticsearchexporter/factory_test.go @@ -44,11 +44,15 @@ func TestFactory_CreateLogsExporter_Fail(t *testing.T) { func TestFactory_CreateMetricsExporter_Fail(t *testing.T) { factory := NewFactory() - cfg := factory.CreateDefaultConfig() + cfg := withDefaultConfig(func(cfg *Config) { + cfg.Endpoints = []string{"http://test:9200"} + }) params := exportertest.NewNopSettings() - _, err := factory.CreateMetricsExporter(context.Background(), params, cfg) - require.Error(t, err, "expected an error when creating a traces exporter") - assert.EqualError(t, err, "telemetry type is not supported") + exporter, err := factory.CreateMetricsExporter(context.Background(), params, cfg) + require.NoError(t, err) + require.NotNil(t, exporter) + + require.NoError(t, exporter.Shutdown(context.Background())) } func TestFactory_CreateTracesExporter(t *testing.T) { diff --git a/exporter/elasticsearchexporter/generated_component_test.go b/exporter/elasticsearchexporter/generated_component_test.go index 6542393b10d1..7b25a61c52de 100644 --- a/exporter/elasticsearchexporter/generated_component_test.go +++ b/exporter/elasticsearchexporter/generated_component_test.go @@ -32,19 +32,26 @@ func TestComponentLifecycle(t *testing.T) { tests := []struct { name string - createFn func(ctx context.Context, set exporter.Settings, cfg component.Config) (component.Component, error) + createFn func(ctx context.Context, set exporter.CreateSettings, cfg component.Config) (component.Component, error) }{ { name: "logs", - createFn: func(ctx context.Context, set exporter.Settings, cfg component.Config) (component.Component, error) { + createFn: func(ctx context.Context, set exporter.CreateSettings, cfg component.Config) (component.Component, error) { return factory.CreateLogsExporter(ctx, set, cfg) }, }, + { + name: "metrics", + createFn: func(ctx context.Context, set exporter.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateMetricsExporter(ctx, set, cfg) + }, + }, + { name: "traces", - createFn: func(ctx context.Context, set exporter.Settings, cfg component.Config) (component.Component, error) { + createFn: func(ctx context.Context, set exporter.CreateSettings, cfg component.Config) (component.Component, error) { return factory.CreateTracesExporter(ctx, set, cfg) }, }, @@ -59,13 +66,13 @@ func TestComponentLifecycle(t *testing.T) { for _, test := range tests { t.Run(test.name+"-shutdown", func(t *testing.T) { - c, err := test.createFn(context.Background(), exportertest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), exportertest.NewNopCreateSettings(), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) }) t.Run(test.name+"-lifecycle", func(t *testing.T) { - c, err := test.createFn(context.Background(), exportertest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), exportertest.NewNopCreateSettings(), cfg) require.NoError(t, err) host := componenttest.NewNopHost() err = c.Start(context.Background(), host) diff --git a/exporter/elasticsearchexporter/generated_package_test.go b/exporter/elasticsearchexporter/generated_package_test.go index 74793158b2e6..6935dfb48fcb 100644 --- a/exporter/elasticsearchexporter/generated_package_test.go +++ b/exporter/elasticsearchexporter/generated_package_test.go @@ -3,9 +3,8 @@ package elasticsearchexporter import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/exporter/elasticsearchexporter/internal/metadata/generated_status.go b/exporter/elasticsearchexporter/internal/metadata/generated_status.go index 459f80d7a82d..acdd38f76af5 100644 --- a/exporter/elasticsearchexporter/internal/metadata/generated_status.go +++ b/exporter/elasticsearchexporter/internal/metadata/generated_status.go @@ -11,6 +11,7 @@ var ( ) const ( - TracesStability = component.StabilityLevelBeta - LogsStability = component.StabilityLevelBeta + MetricsStability = component.StabilityLevelDevelopment + TracesStability = component.StabilityLevelBeta + LogsStability = component.StabilityLevelBeta ) diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go index a26260144fa1..ef80136ed395 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go @@ -108,6 +108,12 @@ func DocumentFromAttributesWithPath(path string, am pcommon.Map) Document { return Document{fields} } +func (doc *Document) Clone() *Document { + fields := make([]field, len(doc.fields)) + copy(fields, doc.fields) + return &Document{fields} +} + // AddTimestamp adds a raw timestamp value to the Document. func (doc *Document) AddTimestamp(key string, ts pcommon.Timestamp) { doc.Add(key, TimestampValue(ts.AsTime())) diff --git a/exporter/elasticsearchexporter/metadata.yaml b/exporter/elasticsearchexporter/metadata.yaml index 1d5820e9b746..e1220cce1896 100644 --- a/exporter/elasticsearchexporter/metadata.yaml +++ b/exporter/elasticsearchexporter/metadata.yaml @@ -5,10 +5,11 @@ status: class: exporter stability: beta: [traces, logs] + development: [metrics] distributions: [contrib] codeowners: active: [JaredTan95, ycombinator, carsonip] tests: config: - endpoints: [http://localhost:9200] \ No newline at end of file + endpoints: [http://localhost:9200] diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 1bb8fd2c4b43..b1ae0345b8bf 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -5,12 +5,17 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry import ( "bytes" + "crypto/sha256" + "encoding/binary" "encoding/json" "fmt" + "hash" + "math" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.22.0" @@ -51,6 +56,7 @@ var resourceAttrsConversionMap = map[string]string{ type mappingModel interface { encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error) + encodeMetrics(pcommon.Resource, pmetric.MetricSlice, pcommon.InstrumentationScope) ([][]byte, error) encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error) } @@ -155,6 +161,131 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo return document } +func (m *encodeModel) encodeMetrics(resource pcommon.Resource, metrics pmetric.MetricSlice, _ pcommon.InstrumentationScope) ([][]byte, error) { + hasher := sha256.New() + var baseDoc objmodel.Document + + baseDoc.AddAttributes("", resource.Attributes()) + + // Put all metrics that have the same attributes and timestamp in one document. + docs := map[string]*objmodel.Document{} + for i := 0; i < metrics.Len(); i++ { + metric := metrics.At(i) + + var dps pmetric.NumberDataPointSlice + + // Only Gauge and Sum metric types are supported at the moment. + switch metric.Type() { + case pmetric.MetricTypeGauge: + dps = metric.Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = metric.Sum().DataPoints() + } + + for j := 0; j < dps.Len(); j++ { + dp := dps.At(j) + + hash := metricHash(hasher, dp.Timestamp(), dp.Attributes()) + doc, docExists := docs[hash] + if !docExists { + doc = baseDoc.Clone() + doc.AddTimestamp("@timestamp", dp.Timestamp()) + doc.AddAttributes("", dp.Attributes()) + + docs[hash] = doc + } + + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + doc.AddAttribute(metric.Name(), pcommon.NewValueDouble(dp.DoubleValue())) + case pmetric.NumberDataPointValueTypeInt: + doc.AddAttribute(metric.Name(), pcommon.NewValueInt(dp.IntValue())) + } + } + } + + res := make([][]byte, 0, len(docs)) + + for _, doc := range docs { + if m.dedup { + doc.Dedup() + } else if m.dedot { + doc.Sort() + } + + var buf bytes.Buffer + err := doc.Serialize(&buf, m.dedot) + + if err != nil { + fmt.Printf("Serialize error, dropping doc: %v\n", err) + } else { + res = append(res, buf.Bytes()) + } + } + + return res, nil +} + +func metricHash(hasher hash.Hash, timestamp pcommon.Timestamp, attributes pcommon.Map) string { + // Avoid any hashing if there are no attributes + if attributes.Len() == 0 { + return timestamp.String() + } + + hasher.Reset() + + hasher.Write([]byte(timestamp.AsTime().Format(time.RFC3339Nano))) + + mapHash(hasher, attributes) + + return string(hasher.Sum(nil)) +} + +func mapHash(hasher hash.Hash, m pcommon.Map) { + // TODO: Only hasing values, not keys? :thinking: + m.Range(func(_ string, v pcommon.Value) bool { + hasher.Write([]byte(v.Str())) + valueHash(hasher, v) + + return true + }) +} + +func valueHash(h hash.Hash, v pcommon.Value) { + switch v.Type() { + case pcommon.ValueTypeEmpty: + h.Write([]byte{0}) + case pcommon.ValueTypeStr: + h.Write([]byte(v.Str())) + case pcommon.ValueTypeBool: + if v.Bool() { + h.Write([]byte{1}) + } else { + h.Write([]byte{0}) + } + case pcommon.ValueTypeDouble: + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, math.Float64bits(v.Double())) + h.Write(buf) + case pcommon.ValueTypeInt: + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(v.Int())) + h.Write(buf) + case pcommon.ValueTypeBytes: + h.Write(v.Bytes().AsRaw()) + case pcommon.ValueTypeMap: + mapHash(h, v.Map()) + case pcommon.ValueTypeSlice: + sliceHash(h, v.Slice()) + } +} + +func sliceHash(h hash.Hash, s pcommon.Slice) { + for i := 0; i < s.Len(); i++ { + valueHash(h, s.At(i)) + } +} + func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) { var document objmodel.Document document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index a8c652a64485..eb265277c0a5 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -5,6 +5,9 @@ package elasticsearchexporter import ( "fmt" + "os" + "sort" + "strings" "testing" "time" @@ -12,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.22.0" @@ -22,6 +26,23 @@ var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attribut var expectedLogBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.log-attr1":"value1","Body":"log-body","Resource.key1":"value1","Scope.name":"","Scope.version":"","SeverityNumber":0,"TraceFlags":0}` +var expectedMetricsEncoded = `{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"name":"my-host"},"os":{"type":"linux"},"state":"idle","system":{"cpu":{"time":440.23}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"name":"my-host"},"os":{"type":"linux"},"state":"interrupt","system":{"cpu":{"time":0}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"name":"my-host"},"os":{"type":"linux"},"state":"nice","system":{"cpu":{"time":0.14}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"name":"my-host"},"os":{"type":"linux"},"state":"softirq","system":{"cpu":{"time":0.77}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"name":"my-host"},"os":{"type":"linux"},"state":"steal","system":{"cpu":{"time":0}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"name":"my-host"},"os":{"type":"linux"},"state":"system","system":{"cpu":{"time":24.8}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"name":"my-host"},"os":{"type":"linux"},"state":"user","system":{"cpu":{"time":64.78}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu0","host":{"name":"my-host"},"os":{"type":"linux"},"state":"wait","system":{"cpu":{"time":1.65}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu1","host":{"name":"my-host"},"os":{"type":"linux"},"state":"idle","system":{"cpu":{"time":475.69}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu1","host":{"name":"my-host"},"os":{"type":"linux"},"state":"interrupt","system":{"cpu":{"time":0}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu1","host":{"name":"my-host"},"os":{"type":"linux"},"state":"nice","system":{"cpu":{"time":0.1}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu1","host":{"name":"my-host"},"os":{"type":"linux"},"state":"softirq","system":{"cpu":{"time":0.57}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu1","host":{"name":"my-host"},"os":{"type":"linux"},"state":"steal","system":{"cpu":{"time":0}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu1","host":{"name":"my-host"},"os":{"type":"linux"},"state":"system","system":{"cpu":{"time":15.88}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu1","host":{"name":"my-host"},"os":{"type":"linux"},"state":"user","system":{"cpu":{"time":50.09}}} +{"@timestamp":"2024-06-12T10:20:16.419290690Z","cpu":"cpu1","host":{"name":"my-host"},"os":{"type":"linux"},"state":"wait","system":{"cpu":{"time":0.95}}}` + var expectedLogBodyWithEmptyTimestamp = `{"@timestamp":"1970-01-01T00:00:00.000000000Z","Attributes.log-attr1":"value1","Body":"log-body","Resource.key1":"value1","Scope.name":"","Scope.version":"","SeverityNumber":0,"TraceFlags":0}` var expectedLogBodyDeDottedWithEmptyTimestamp = `{"@timestamp":"1970-01-01T00:00:00.000000000Z","Attributes":{"log-attr1":"value1"},"Body":"log-body","Resource":{"foo":{"bar":"baz"},"key1":"value1"},"Scope":{"name":"","version":""},"SeverityNumber":0,"TraceFlags":0}` @@ -61,6 +82,40 @@ func TestEncodeLog(t *testing.T) { }) } +func TestEncodeMetric(t *testing.T) { + // Prepare metrics to test. + metrics := createTestMetrics(t) + + // Encode the metrics. + model := &encodeModel{ + dedot: true, + dedup: true, + mode: MappingNone, + } + docsBytes, err := model.encodeMetrics(metrics.ResourceMetrics().At(0).Resource(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope()) + + // Convert the byte arrays to strings and sort the docs to make the test deterministic. + require.NoError(t, err) + docs := make([]string, 0, len(docsBytes)) + for _, docBytes := range docsBytes { + docs = append(docs, string(docBytes)) + } + sort.Strings(docs) + allDocsSorted := strings.Join(docs, "\n") + + // Test that the result matches the expected value. + assert.Equal(t, expectedMetricsEncoded, allDocsSorted) +} + +func createTestMetrics(t *testing.T) pmetric.Metrics { + metricsUnmarshaler := &pmetric.JSONUnmarshaler{} + metricBytes, err := os.ReadFile("testdata/metrics-cpu.json") + require.NoError(t, err) + metrics, err := metricsUnmarshaler.UnmarshalMetrics(metricBytes) + require.NoError(t, err) + return metrics +} + func mockResourceSpans() ptrace.Traces { traces := ptrace.NewTraces() diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 4431dd6410d6..acd6e92f9001 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -21,6 +21,29 @@ elasticsearch/trace: retry_on_status: - 429 - 500 +elasticsearch/metric: + tls: + insecure: false + endpoints: [http://localhost:9200] + metrics_index: my_metric_index + timeout: 2m + headers: + myheader: test + pipeline: mypipeline + user: elastic + password: search + api_key: AvFsEiPs== + discover: + on_start: true + flush: + bytes: 10485760 + retry: + max_requests: 5 + retry_on_status: + - 429 + - 500 + sending_queue: + enabled: true elasticsearch/log: tls: insecure: false diff --git a/exporter/elasticsearchexporter/testdata/metrics-cpu.json b/exporter/elasticsearchexporter/testdata/metrics-cpu.json new file mode 100644 index 000000000000..f7c83dd53255 --- /dev/null +++ b/exporter/elasticsearchexporter/testdata/metrics-cpu.json @@ -0,0 +1,346 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "my-host" + } + }, + { + "key": "os.type", + "value": { + "stringValue": "linux" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": { + "name": "otelcol/hostmetricsreceiver/cpu", + "version": "0.102.0-dev" + }, + "metrics": [ + { + "name": "system.cpu.time", + "description": "Total seconds each logical CPU spent on each mode.", + "unit": "s", + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu0" + } + }, + { + "key": "state", + "value": { + "stringValue": "user" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 64.78 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu0" + } + }, + { + "key": "state", + "value": { + "stringValue": "system" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 24.8 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu0" + } + }, + { + "key": "state", + "value": { + "stringValue": "idle" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 440.23 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu0" + } + }, + { + "key": "state", + "value": { + "stringValue": "interrupt" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 0 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu0" + } + }, + { + "key": "state", + "value": { + "stringValue": "nice" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 0.14 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu0" + } + }, + { + "key": "state", + "value": { + "stringValue": "softirq" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 0.77 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu0" + } + }, + { + "key": "state", + "value": { + "stringValue": "steal" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 0 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu0" + } + }, + { + "key": "state", + "value": { + "stringValue": "wait" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 1.65 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu1" + } + }, + { + "key": "state", + "value": { + "stringValue": "user" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 50.09 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu1" + } + }, + { + "key": "state", + "value": { + "stringValue": "system" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 15.88 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu1" + } + }, + { + "key": "state", + "value": { + "stringValue": "idle" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 475.69 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu1" + } + }, + { + "key": "state", + "value": { + "stringValue": "interrupt" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 0 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu1" + } + }, + { + "key": "state", + "value": { + "stringValue": "nice" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 0.1 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu1" + } + }, + { + "key": "state", + "value": { + "stringValue": "softirq" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 0.57 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu1" + } + }, + { + "key": "state", + "value": { + "stringValue": "steal" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 0 + }, + { + "attributes": [ + { + "key": "cpu", + "value": { + "stringValue": "cpu1" + } + }, + { + "key": "state", + "value": { + "stringValue": "wait" + } + } + ], + "startTimeUnixNano": "1718187065000000000", + "timeUnixNano": "1718187616419290690", + "asDouble": 0.95 + } + ] + } + } + ] + } + ], + "schemaUrl": "https://opentelemetry.io/schemas/1.9.0" + } + ] +}