From 0f67eb8c743ce517548e57bd1b99ad38756113e1 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 15 Jul 2021 14:01:44 +0800 Subject: [PATCH] Move setting of data_stream fields to processor (#5717) (cherry picked from commit 29cfed315a9f20205b9fe0dba829f18489d943f7) # Conflicts: # beater/beater.go # model/apmevent.go # model/error_test.go # model/metricset.go # model/metricset_test.go # model/profile.go # model/profile_test.go # model/span_test.go # model/transaction.go # model/transaction_test.go # transform/transform.go --- beater/beater.go | 9 ++ model/apmevent.go | 34 ++++++++ model/datastream.go | 38 +++++++++ model/error.go | 9 -- model/error_test.go | 38 ++++----- model/metricset.go | 14 ++-- model/metricset_test.go | 82 +++++++++++++++++++ model/modelprocessor/datastream.go | 72 ++++++++++++++++ model/modelprocessor/datastream_test.go | 70 ++++++++++++++++ model/modelprocessor/environment_test.go | 1 + model/profile.go | 17 +++- model/profile_test.go | 9 +- model/span.go | 7 -- model/span_test.go | 19 ++--- model/transaction.go | 11 +-- model/transaction_test.go | 9 ++ processor/otel/consumer_test.go | 2 +- .../jaeger_sampling_rate.approved.json | 6 -- .../metadata_jaeger-no-language.approved.json | 2 - .../metadata_jaeger-version.approved.json | 2 - .../metadata_jaeger.approved.json | 2 - .../span_jaeger_custom.approved.json | 2 - .../span_jaeger_db.approved.json | 2 - .../span_jaeger_http.approved.json | 14 ---- ...span_jaeger_http_status_code.approved.json | 2 - ...an_jaeger_https_default_port.approved.json | 2 - .../span_jaeger_messaging.approved.json | 2 - ...pan_jaeger_subtype_component.approved.json | 2 - .../transaction_jaeger_custom.approved.json | 2 - .../transaction_jaeger_full.approved.json | 14 ---- .../transaction_jaeger_no_attrs.approved.json | 2 - ...action_jaeger_type_component.approved.json | 2 - ...action_jaeger_type_messaging.approved.json | 2 - ...nsaction_jaeger_type_request.approved.json | 2 - ...n_jaeger_type_request_result.approved.json | 2 - processor/stream/processor_test.go | 2 +- .../testIntakeIntegrationErrors.approved.json | 10 --- .../testIntakeIntegrationEvents.approved.json | 8 -- ...ntakeIntegrationInvalidEvent.approved.json | 2 - ...eIntegrationInvalidJSONEvent.approved.json | 2 - ...ntegrationMetadataNullValues.approved.json | 2 - ...tIntakeIntegrationMetricsets.approved.json | 10 --- ...akeIntegrationMinimalService.approved.json | 4 - ...ntegrationOptionalTimestamps.approved.json | 6 -- ...stIntakeIntegrationRumErrors.approved.json | 2 - ...keIntegrationRumTransactions.approved.json | 4 - .../testIntakeIntegrationSpans.approved.json | 12 --- ...ntakeIntegrationTransactions.approved.json | 8 -- .../testIntakeRUMV3Errors.approved.json | 2 - .../testIntakeRUMV3Events.approved.json | 26 ------ publish/pub.go | 4 - transform/transform.go | 3 + 52 files changed, 380 insertions(+), 232 deletions(-) create mode 100644 model/datastream.go create mode 100644 model/modelprocessor/datastream.go create mode 100644 model/modelprocessor/datastream_test.go diff --git a/beater/beater.go b/beater/beater.go index 8a7702b727d..29d07f653c5 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -457,6 +457,11 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R DefaultServiceEnvironment: s.config.DefaultServiceEnvironment, }) } + if s.config.DataStreams.Enabled { + processors = append(processors, &modelprocessor.SetDataStream{ + Namespace: s.namespace, + }) + } return WrapRunServerWithProcessors(runServer, processors...) } @@ -626,6 +631,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ } func newTransformConfig(beatInfo beat.Info, cfg *config.Config) *transform.Config { +<<<<<<< HEAD return &transform.Config{ DataStreams: cfg.DataStreams.Enabled, RUM: transform.RUMConfig{ @@ -633,6 +639,9 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) *transform.Confi ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping), }, } +======= + return &transform.Config{} +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) } func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) { diff --git a/model/apmevent.go b/model/apmevent.go index b8c98866e3c..8a226fa86c6 100644 --- a/model/apmevent.go +++ b/model/apmevent.go @@ -28,6 +28,7 @@ import ( // // Exactly one of the event fields should be non-nil. type APMEvent struct { +<<<<<<< HEAD Transaction *Transaction Span *Span Metricset *Metricset @@ -37,4 +38,37 @@ type APMEvent struct { func (e *APMEvent) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { return nil +======= + // DataStream optionally holds data stream identifiers. + // + // This will have the zero value when APM Server is run + // in standalone mode. + DataStream DataStream + + Transaction *Transaction + Span *Span + Metricset *Metricset + Error *Error + ProfileSample *ProfileSample +} + +func (e *APMEvent) appendBeatEvent(ctx context.Context, cfg *transform.Config, out []beat.Event) []beat.Event { + var event beat.Event + switch { + case e.Transaction != nil: + event = e.Transaction.toBeatEvent(cfg) + case e.Span != nil: + event = e.Span.toBeatEvent(ctx, cfg) + case e.Metricset != nil: + event = e.Metricset.toBeatEvent(cfg) + case e.Error != nil: + event = e.Error.toBeatEvent(ctx, cfg) + case e.ProfileSample != nil: + event = e.ProfileSample.toBeatEvent(cfg) + default: + return out + } + e.DataStream.setFields((*mapStr)(&event.Fields)) + return append(out, event) +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) } diff --git a/model/datastream.go b/model/datastream.go new file mode 100644 index 00000000000..e94aec05e5b --- /dev/null +++ b/model/datastream.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +import "github.com/elastic/apm-server/datastreams" + +// DataStream identifies the data stream to which an event will be written. +type DataStream struct { + // Type holds the data_stream.type identifier. + Type string + + // Dataset holds the data_stream.dataset identifier. + Dataset string + + // Namespace holds the data_stream.namespace identifier. + Namespace string +} + +func (d *DataStream) setFields(fields *mapStr) { + fields.maybeSetString(datastreams.TypeField, d.Type) + fields.maybeSetString(datastreams.DatasetField, d.Dataset) + fields.maybeSetString(datastreams.NamespaceField, d.Namespace) +} diff --git a/model/error.go b/model/error.go index 2c59d0bbe57..658ed986d12 100644 --- a/model/error.go +++ b/model/error.go @@ -32,7 +32,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -115,14 +114,6 @@ func (e *Error) appendBeatEvents(ctx context.Context, cfg *transform.Config, eve "processor": errorProcessorEntry, } - if cfg.DataStreams { - // Errors are stored in an APM errors-specific "logs" data stream, per service. - // By storing errors in a "logs" data stream, they can be viewed in the Logs app - // in Kibana. - fields[datastreams.TypeField] = datastreams.LogsType - fields[datastreams.DatasetField] = ErrorsDataset - } - // first set the generic metadata (order is relevant) e.Metadata.set(&fields, e.Labels) if client := fields["client"]; client != nil { diff --git a/model/error_test.go b/model/error_test.go index 9806b9fc41a..8787726b347 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -297,10 +297,8 @@ func TestEvents(t *testing.T) { "valid": { Error: &Error{Timestamp: timestamp, Metadata: md}, Output: common.MapStr{ - "data_stream.type": "logs", - "data_stream.dataset": "apm.error", - "agent": common.MapStr{"name": "go", "version": "1.0"}, - "service": common.MapStr{"name": "myservice", "version": "1.0"}, + "agent": common.MapStr{"name": "go", "version": "1.0"}, + "service": common.MapStr{"name": "myservice", "version": "1.0"}, "error": common.MapStr{ "grouping_key": "d41d8cd98f00b204e9800998ecf8427e", }, @@ -312,11 +310,9 @@ func TestEvents(t *testing.T) { "notSampled": { Error: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse}, Output: common.MapStr{ - "data_stream.type": "logs", - "data_stream.dataset": "apm.error", - "transaction": common.MapStr{"sampled": false}, - "agent": common.MapStr{"name": "go", "version": "1.0"}, - "service": common.MapStr{"name": "myservice", "version": "1.0"}, + "transaction": common.MapStr{"sampled": false}, + "agent": common.MapStr{"name": "go", "version": "1.0"}, + "service": common.MapStr{"name": "myservice", "version": "1.0"}, "error": common.MapStr{ "grouping_key": "d41d8cd98f00b204e9800998ecf8427e", }, @@ -328,9 +324,7 @@ func TestEvents(t *testing.T) { "withMeta": { Error: &Error{Timestamp: timestamp, Metadata: md, TransactionType: transactionType}, Output: common.MapStr{ - "data_stream.type": "logs", - "data_stream.dataset": "apm.error", - "transaction": common.MapStr{"type": "request"}, + "transaction": common.MapStr{"type": "request"}, "error": common.MapStr{ "grouping_key": "d41d8cd98f00b204e9800998ecf8427e", }, @@ -361,15 +355,13 @@ func TestEvents(t *testing.T) { }, Output: common.MapStr{ - "data_stream.type": "logs", - "data_stream.dataset": "apm.error", - "labels": common.MapStr{"key": true, "label": 101}, - "service": common.MapStr{"name": "myservice", "version": "1.0"}, - "agent": common.MapStr{"name": "go", "version": "1.0"}, - "user": common.MapStr{"id": uid, "email": email}, - "client": common.MapStr{"ip": userIP}, - "source": common.MapStr{"ip": userIP}, - "user_agent": common.MapStr{"original": userAgent}, + "labels": common.MapStr{"key": true, "label": 101}, + "service": common.MapStr{"name": "myservice", "version": "1.0"}, + "agent": common.MapStr{"name": "go", "version": "1.0"}, + "user": common.MapStr{"id": uid, "email": email}, + "client": common.MapStr{"ip": userIP}, + "source": common.MapStr{"ip": userIP}, + "user_agent": common.MapStr{"original": userAgent}, "error": common.MapStr{ "custom": common.MapStr{ "foo_bar": "baz", @@ -398,11 +390,15 @@ func TestEvents(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { +<<<<<<< HEAD outputEvents := tc.Error.appendBeatEvents(context.Background(), &transform.Config{ DataStreams: true, }, nil) require.Len(t, outputEvents, 1) outputEvent := outputEvents[0] +======= + outputEvent := tc.Error.toBeatEvent(context.Background(), &transform.Config{}) +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) assert.Equal(t, tc.Output, outputEvent.Fields) assert.Equal(t, timestamp, outputEvent.Timestamp) diff --git a/model/metricset.go b/model/metricset.go index e3673c9b521..549cb235a2b 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -18,7 +18,6 @@ package model import ( - "fmt" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -26,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/datastreams" logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/transform" ) @@ -179,7 +177,11 @@ type MetricsetSpan struct { DestinationService DestinationService } +<<<<<<< HEAD func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event) []beat.Event { +======= +func (me *Metricset) toBeatEvent(*transform.Config) beat.Event { +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) metricsetTransformations.Inc() if me == nil { return nil @@ -204,17 +206,13 @@ func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event me.Metadata.set(&fields, me.Labels) - var isInternal bool if eventFields := me.Event.fields(); eventFields != nil { - isInternal = true common.MapStr(fields).DeepUpdate(common.MapStr{metricsetEventKey: eventFields}) } if transactionFields := me.Transaction.fields(); transactionFields != nil { - isInternal = true common.MapStr(fields).DeepUpdate(common.MapStr{metricsetTransactionKey: transactionFields}) } if spanFields := me.Span.fields(); spanFields != nil { - isInternal = true common.MapStr(fields).DeepUpdate(common.MapStr{metricsetSpanKey: spanFields}) } @@ -238,6 +236,7 @@ func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event } fields.maybeSetMapStr("_metric_descriptions", common.MapStr(metricDescriptions)) +<<<<<<< HEAD if cfg.DataStreams { // Metrics that include well-defined transaction/span fields // (i.e. breakdown metrics, transaction and span metrics) will @@ -251,6 +250,9 @@ func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event } return append(events, beat.Event{ +======= + return beat.Event{ +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) Fields: common.MapStr(fields), Timestamp: me.Timestamp, }) diff --git a/model/metricset_test.go b/model/metricset_test.go index 088dcaf21a1..a684f4bf6b2 100644 --- a/model/metricset_test.go +++ b/model/metricset_test.go @@ -59,6 +59,7 @@ func TestTransform(t *testing.T) { }, { Metricset: &Metricset{Timestamp: timestamp, Metadata: metadata}, +<<<<<<< HEAD Output: []common.MapStr{ { "data_stream.type": "metrics", @@ -67,12 +68,19 @@ func TestTransform(t *testing.T) { "service": common.MapStr{ "name": "myservice", }, +======= + Output: common.MapStr{ + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{ + "name": "myservice", +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) }, }, Msg: "Payload with empty metric.", }, { Metricset: &Metricset{Timestamp: timestamp, Metadata: metadata, Name: "raj"}, +<<<<<<< HEAD Output: []common.MapStr{ { "data_stream.type": "metrics", @@ -82,6 +90,13 @@ func TestTransform(t *testing.T) { "service": common.MapStr{ "name": "myservice", }, +======= + Output: common.MapStr{ + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "metricset.name": "raj", + "service": common.MapStr{ + "name": "myservice", +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) }, }, Msg: "Payload with metricset name.", @@ -102,6 +117,7 @@ func TestTransform(t *testing.T) { }, }, }, +<<<<<<< HEAD Output: []common.MapStr{ { "data_stream.type": "metrics", @@ -109,6 +125,12 @@ func TestTransform(t *testing.T) { "processor": common.MapStr{"event": "metric", "name": "metric"}, "service": common.MapStr{"name": "myservice"}, "labels": common.MapStr{"a_b": "a.b.value"}, +======= + Output: common.MapStr{ + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "labels": common.MapStr{"a_b": "a.b.value"}, +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) "a": common.MapStr{"counter": float64(612)}, "some": common.MapStr{"gauge": float64(9.16)}, @@ -127,6 +149,7 @@ func TestTransform(t *testing.T) { Value: 123, }}, }, +<<<<<<< HEAD Output: []common.MapStr{ { "data_stream.type": "metrics", @@ -139,6 +162,16 @@ func TestTransform(t *testing.T) { "self_time": common.MapStr{ "count": 123.0, }, +======= + Output: common.MapStr{ + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "transaction": common.MapStr{"type": trType, "name": trName}, + "span": common.MapStr{ + "type": spType, "subtype": spSubtype, + "self_time": common.MapStr{ + "count": 123.0, +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) }, }, }, @@ -165,6 +198,7 @@ func TestTransform(t *testing.T) { }, }, }, +<<<<<<< HEAD Output: []common.MapStr{ { "data_stream.type": "metrics", @@ -183,6 +217,22 @@ func TestTransform(t *testing.T) { "counts": []int64{1, 2, 3}, "values": []float64{4.5, 6.0, 9.0}, }, +======= + Output: common.MapStr{ + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "event": common.MapStr{"outcome": eventOutcome}, + "timeseries": common.MapStr{"instance": "foo"}, + "transaction": common.MapStr{ + "type": trType, + "name": trName, + "result": trResult, + "root": true, + "duration": common.MapStr{ + "histogram": common.MapStr{ + "counts": []int64{1, 2, 3}, + "values": []float64{4.5, 6.0, 9.0}, +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) }, }, "_doc_count": int64(6), // 1+2+3 @@ -208,6 +258,7 @@ func TestTransform(t *testing.T) { }, }, }, +<<<<<<< HEAD Output: []common.MapStr{ { "data_stream.type": "metrics", @@ -222,6 +273,18 @@ func TestTransform(t *testing.T) { }, }, }, +======= + Output: common.MapStr{ + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "span": common.MapStr{"type": spType, "subtype": spSubtype, + "destination": common.MapStr{"service": common.MapStr{"resource": resource}}}, + "destination": common.MapStr{"service": common.MapStr{"response_time": common.MapStr{ + "count": 40.0, + "sum": common.MapStr{"us": 500000.0}, + }, + }, +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) }, }, Msg: "Payload with destination service.", @@ -246,12 +309,25 @@ func TestTransform(t *testing.T) { Value: 0.99, }}, }, +<<<<<<< HEAD Output: []common.MapStr{ { "data_stream.type": "metrics", "data_stream.dataset": "apm.app.myservice", "processor": common.MapStr{"event": "metric", "name": "metric"}, "service": common.MapStr{"name": "myservice"}, +======= + Output: common.MapStr{ + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "latency_histogram": common.MapStr{ + "counts": []int64{1, 2, 3}, + "values": []float64{1.1, 2.2, 3.3}, + }, + "just_type": 123.0, + "just_unit": 0.99, + "_metric_descriptions": common.MapStr{ +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) "latency_histogram": common.MapStr{ "counts": []int64{1, 2, 3}, "values": []float64{1.1, 2.2, 3.3}, @@ -277,11 +353,17 @@ func TestTransform(t *testing.T) { } for idx, test := range tests { +<<<<<<< HEAD outputEvents := test.Metricset.appendBeatEvents(&transform.Config{DataStreams: true}, nil) for j, outputEvent := range outputEvents { assert.Equal(t, test.Output[j], outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) assert.Equal(t, timestamp, outputEvent.Timestamp, fmt.Sprintf("Bad timestamp at idx %v; %s", idx, test.Msg)) } +======= + outputEvent := test.Metricset.toBeatEvent(&transform.Config{}) + assert.Equal(t, test.Output, outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) + assert.Equal(t, timestamp, outputEvent.Timestamp, fmt.Sprintf("Bad timestamp at idx %v; %s", idx, test.Msg)) +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) } } diff --git a/model/modelprocessor/datastream.go b/model/modelprocessor/datastream.go new file mode 100644 index 00000000000..b36190d83af --- /dev/null +++ b/model/modelprocessor/datastream.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor + +import ( + "context" + "fmt" + + "github.com/elastic/apm-server/datastreams" + "github.com/elastic/apm-server/model" +) + +// SetDataStream is a model.BatchProcessor that sets the data stream for events. +type SetDataStream struct { + Namespace string +} + +// ProcessBatch sets data stream fields for each event in b. +func (s *SetDataStream) ProcessBatch(ctx context.Context, b *model.Batch) error { + for i := range *b { + s.setDataStream(&(*b)[i]) + } + return nil +} + +func (s *SetDataStream) setDataStream(event *model.APMEvent) { + switch { + case event.Transaction != nil || event.Span != nil: + event.DataStream.Type = datastreams.TracesType + event.DataStream.Dataset = model.TracesDataset + case event.Error != nil: + event.DataStream.Type = datastreams.LogsType + event.DataStream.Dataset = model.ErrorsDataset + case event.Metricset != nil: + event.DataStream.Type = datastreams.MetricsType + // Metrics that include well-defined transaction/span fields + // (i.e. breakdown metrics, transaction and span metrics) will + // be stored separately from application and runtime metrics. + event.DataStream.Dataset = model.InternalMetricsDataset + if isApplicationMetricset(event.Metricset) { + event.DataStream.Dataset = fmt.Sprintf( + "%s.%s", model.AppMetricsDataset, + datastreams.NormalizeServiceName(event.Metricset.Metadata.Service.Name), + ) + } + case event.ProfileSample != nil: + event.DataStream.Type = datastreams.MetricsType + event.DataStream.Dataset = model.ProfilesDataset + } + event.DataStream.Namespace = s.Namespace +} + +func isApplicationMetricset(ms *model.Metricset) bool { + return ms.Event == (model.MetricsetEventCategorization{}) && + ms.Transaction == (model.MetricsetTransaction{}) && + ms.Span == (model.MetricsetSpan{}) +} diff --git a/model/modelprocessor/datastream_test.go b/model/modelprocessor/datastream_test.go new file mode 100644 index 00000000000..68eaaed6030 --- /dev/null +++ b/model/modelprocessor/datastream_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" +) + +func TestSetDataStream(t *testing.T) { + tests := []struct { + input model.APMEvent + output model.DataStream + }{{ + input: model.APMEvent{}, + output: model.DataStream{Namespace: "custom"}, + }, { + input: model.APMEvent{Transaction: &model.Transaction{}}, + output: model.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, + }, { + input: model.APMEvent{Span: &model.Span{}}, + output: model.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, + }, { + input: model.APMEvent{Error: &model.Error{}}, + output: model.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"}, + }, { + input: model.APMEvent{Metricset: &model.Metricset{ + Metadata: model.Metadata{Service: model.Service{Name: "service-name"}}, + Transaction: model.MetricsetTransaction{Name: "foo"}, + }}, + output: model.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"}, + }, { + input: model.APMEvent{Metricset: &model.Metricset{ + Metadata: model.Metadata{Service: model.Service{Name: "service-name"}}, + }}, + output: model.DataStream{Type: "metrics", Dataset: "apm.app.service_name", Namespace: "custom"}, + }, { + input: model.APMEvent{ProfileSample: &model.ProfileSample{}}, + output: model.DataStream{Type: "metrics", Dataset: "apm.profiling", Namespace: "custom"}, + }} + + for _, test := range tests { + batch := model.Batch{test.input} + processor := modelprocessor.SetDataStream{Namespace: "custom"} + err := processor.ProcessBatch(context.Background(), &batch) + assert.NoError(t, err) + assert.Equal(t, test.output, batch[0].DataStream) + } + +} diff --git a/model/modelprocessor/environment_test.go b/model/modelprocessor/environment_test.go index aac424b389a..1c33de27be3 100644 --- a/model/modelprocessor/environment_test.go +++ b/model/modelprocessor/environment_test.go @@ -51,6 +51,7 @@ func testProcessBatchMetadata(t *testing.T, processor model.BatchProcessor, in, apmEventFields = append(apmEventFields, typ.Field(i).Name) } assert.ElementsMatch(t, []string{ + "DataStream", "Transaction", "Span", "Metricset", diff --git a/model/profile.go b/model/profile.go index 3993f78edbb..6a1ceb26d2f 100644 --- a/model/profile.go +++ b/model/profile.go @@ -25,7 +25,6 @@ import ( "github.com/gofrs/uuid" "github.com/google/pprof/profile" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -65,6 +64,14 @@ func (pp PprofProfile) appendBeatEvents(cfg *transform.Config, events []beat.Eve valueFieldNames[i] = sampleType.Type + "." + sampleUnit } +<<<<<<< HEAD +======= +func (p *ProfileSample) toBeatEvent(*transform.Config) beat.Event { + var profileFields mapStr + profileFields.maybeSetString("id", p.ProfileID) + if p.Duration > 0 { + profileFields.set("duration", int64(p.Duration)) +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) } // Generate a unique profile ID shared by all samples in the profile. @@ -146,10 +153,18 @@ func (pp PprofProfile) appendBeatEvents(cfg *transform.Config, events []beat.Eve return events } +<<<<<<< HEAD func normalizeUnit(unit string) string { switch unit { case "nanoseconds": unit = "ns" +======= + fields := mapStr{ + "processor": profileProcessorEntry, + profileDocType: common.MapStr(profileFields), + } + p.Metadata.set(&fields, p.Labels) +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) case "microseconds": unit = "us" diff --git a/model/profile_test.go b/model/profile_test.go index bd935487520..36b778c0693 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -87,8 +87,13 @@ func TestPprofProfileTransform(t *testing.T) { }, } +<<<<<<< HEAD batch := &model.Batch{{Profile: &pp}} output := batch.Transform(context.Background(), &transform.Config{DataStreams: true}) +======= + batch := &model.Batch{{ProfileSample: &sample}, {ProfileSample: &sample}} + output := batch.Transform(context.Background(), &transform.Config{}) +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) require.Len(t, output, 2) assert.Equal(t, output[0], output[1]) @@ -100,9 +105,7 @@ func TestPprofProfileTransform(t *testing.T) { assert.Equal(t, beat.Event{ Timestamp: timestamp, Fields: common.MapStr{ - "data_stream.type": "metrics", - "data_stream.dataset": "apm.profiling", - "processor": common.MapStr{"event": "profile", "name": "profile"}, + "processor": common.MapStr{"event": "profile", "name": "profile"}, "service": common.MapStr{ "name": "myService", "environment": "staging", diff --git a/model/span.go b/model/span.go index 3e8005f631d..bedd34018c8 100644 --- a/model/span.go +++ b/model/span.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -197,12 +196,6 @@ func (e *Span) appendBeatEvents(ctx context.Context, cfg *transform.Config, even spanDocType: e.fields(ctx, cfg), } - if cfg.DataStreams { - // Spans are stored in a "traces" data stream along with transactions. - fields[datastreams.TypeField] = datastreams.TracesType - fields[datastreams.DatasetField] = TracesDataset - } - // first set the generic metadata e.Metadata.set(&fields, e.Labels) diff --git a/model/span_test.go b/model/span_test.go index ed330208d25..c7bc93b1cf1 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -56,10 +56,8 @@ func TestSpanTransform(t *testing.T) { Msg: "Span without a Stacktrace", Span: Span{Timestamp: timestamp, Metadata: metadata}, Output: common.MapStr{ - "data_stream.type": "traces", - "data_stream.dataset": "apm", - "processor": common.MapStr{"event": "span", "name": "transaction"}, - "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, + "processor": common.MapStr{"event": "span", "name": "transaction"}, + "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, "span": common.MapStr{ "duration": common.MapStr{"us": 0}, "name": "", @@ -74,10 +72,8 @@ func TestSpanTransform(t *testing.T) { Msg: "Span with outcome", Span: Span{Timestamp: timestamp, Metadata: metadata, Outcome: "success"}, Output: common.MapStr{ - "data_stream.type": "traces", - "data_stream.dataset": "apm", - "processor": common.MapStr{"event": "span", "name": "transaction"}, - "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, + "processor": common.MapStr{"event": "span", "name": "transaction"}, + "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, "span": common.MapStr{ "duration": common.MapStr{"us": 0}, "name": "", @@ -123,8 +119,6 @@ func TestSpanTransform(t *testing.T) { Message: &Message{QueueName: "users"}, }, Output: common.MapStr{ - "data_stream.type": "traces", - "data_stream.dataset": "apm", "span": common.MapStr{ "id": hexID, "duration": common.MapStr{"us": 1200}, @@ -176,10 +170,15 @@ func TestSpanTransform(t *testing.T) { } for _, test := range tests { +<<<<<<< HEAD output := test.Span.appendBeatEvents(context.Background(), &transform.Config{ DataStreams: true, }, nil) fields := output[0].Fields assert.Equal(t, test.Output, fields, test.Msg) +======= + output := test.Span.toBeatEvent(context.Background(), &transform.Config{}) + assert.Equal(t, test.Output, output.Fields, test.Msg) +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) } } diff --git a/model/transaction.go b/model/transaction.go index ceb010c37e1..0528859c5a5 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -24,7 +24,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -120,7 +119,11 @@ func (e *Transaction) fields() common.MapStr { return common.MapStr(fields) } +<<<<<<< HEAD func (e *Transaction) appendBeatEvents(cfg *transform.Config, events []beat.Event) []beat.Event { +======= +func (e *Transaction) toBeatEvent(*transform.Config) beat.Event { +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) transactionTransformations.Inc() fields := mapStr{ @@ -128,12 +131,6 @@ func (e *Transaction) appendBeatEvents(cfg *transform.Config, events []beat.Even transactionDocType: e.fields(), } - if cfg.DataStreams { - // Transactions are stored in a "traces" data stream along with spans. - fields[datastreams.TypeField] = datastreams.TracesType - fields[datastreams.DatasetField] = TracesDataset - } - // first set generic metadata (order is relevant) e.Metadata.set(&fields, e.Labels) if client := fields["client"]; client != nil { diff --git a/model/transaction_test.go b/model/transaction_test.go index f9d82fe725b..724a6a873d1 100644 --- a/model/transaction_test.go +++ b/model/transaction_test.go @@ -176,6 +176,7 @@ func TestEventsTransformWithMetadata(t *testing.T) { Custom: common.MapStr{"foo.bar": "baz"}, Message: &Message{QueueName: "routeUser"}, } +<<<<<<< HEAD events := txWithContext.appendBeatEvents(&transform.Config{DataStreams: true}, nil) require.Len(t, events, 1) assert.Equal(t, events[0].Fields, common.MapStr{ @@ -185,6 +186,14 @@ func TestEventsTransformWithMetadata(t *testing.T) { "client": common.MapStr{"ip": ip}, "source": common.MapStr{"ip": ip}, "user_agent": common.MapStr{"original": userAgent}, +======= + event := txWithContext.toBeatEvent(&transform.Config{}) + assert.Equal(t, event.Fields, common.MapStr{ + "user": common.MapStr{"id": "123", "name": "jane"}, + "client": common.MapStr{"ip": ip}, + "source": common.MapStr{"ip": ip}, + "user_agent": common.MapStr{"original": userAgent}, +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) "host": common.MapStr{ "architecture": "darwin", "hostname": "a.b.c", diff --git a/processor/otel/consumer_test.go b/processor/otel/consumer_test.go index 8cf54287528..a6bb8e4edfe 100644 --- a/processor/otel/consumer_test.go +++ b/processor/otel/consumer_test.go @@ -1133,7 +1133,7 @@ func eventRecorderBatchProcessor(out *[]beat.Event) model.BatchProcessor { func transformBatch(ctx context.Context, batches ...*model.Batch) []beat.Event { var out []beat.Event for _, batch := range batches { - out = append(out, batch.Transform(ctx, &transform.Config{DataStreams: true})...) + out = append(out, batch.Transform(ctx, &transform.Config{})...) } return out } diff --git a/processor/otel/test_approved/jaeger_sampling_rate.approved.json b/processor/otel/test_approved/jaeger_sampling_rate.approved.json index 5342958a4cd..9262f94d224 100644 --- a/processor/otel/test_approved/jaeger_sampling_rate.approved.json +++ b/processor/otel/test_approved/jaeger_sampling_rate.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -42,8 +40,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -83,8 +79,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/metadata_jaeger-no-language.approved.json b/processor/otel/test_approved/metadata_jaeger-no-language.approved.json index 0dc34aa4121..b0db5a40175 100644 --- a/processor/otel/test_approved/metadata_jaeger-no-language.approved.json +++ b/processor/otel/test_approved/metadata_jaeger-no-language.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "3.4.12" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/metadata_jaeger-version.approved.json b/processor/otel/test_approved/metadata_jaeger-version.approved.json index a648c2fad35..4827fa32217 100644 --- a/processor/otel/test_approved/metadata_jaeger-version.approved.json +++ b/processor/otel/test_approved/metadata_jaeger-version.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger/PHP", "version": "3.4.12" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/metadata_jaeger.approved.json b/processor/otel/test_approved/metadata_jaeger.approved.json index f89beffda02..6d4476d08f7 100644 --- a/processor/otel/test_approved/metadata_jaeger.approved.json +++ b/processor/otel/test_approved/metadata_jaeger.approved.json @@ -7,8 +7,6 @@ "name": "Jaeger/C++", "version": "3.2.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/span_jaeger_custom.approved.json b/processor/otel/test_approved/span_jaeger_custom.approved.json index 16b4ae40318..44b601bef31 100644 --- a/processor/otel/test_approved/span_jaeger_custom.approved.json +++ b/processor/otel/test_approved/span_jaeger_custom.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/span_jaeger_db.approved.json b/processor/otel/test_approved/span_jaeger_db.approved.json index a1365f95e9c..84db3e698cd 100644 --- a/processor/otel/test_approved/span_jaeger_db.approved.json +++ b/processor/otel/test_approved/span_jaeger_db.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "db", "port": 3306 diff --git a/processor/otel/test_approved/span_jaeger_http.approved.json b/processor/otel/test_approved/span_jaeger_http.approved.json index 9d00b13678b..97620f8e091 100644 --- a/processor/otel/test_approved/span_jaeger_http.approved.json +++ b/processor/otel/test_approved/span_jaeger_http.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "foo.bar.com", "port": 80 @@ -83,8 +81,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -125,8 +121,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "23b7ac1bdf1ca957f9f581cfadee467c", "log": { @@ -162,8 +156,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -201,8 +193,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -240,8 +230,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -279,8 +267,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "c9221918248f05433f6b81c46a666aee", "log": { diff --git a/processor/otel/test_approved/span_jaeger_http_status_code.approved.json b/processor/otel/test_approved/span_jaeger_http_status_code.approved.json index e0ed060c675..241cfac713f 100644 --- a/processor/otel/test_approved/span_jaeger_http_status_code.approved.json +++ b/processor/otel/test_approved/span_jaeger_http_status_code.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "foo.bar.com", "port": 80 diff --git a/processor/otel/test_approved/span_jaeger_https_default_port.approved.json b/processor/otel/test_approved/span_jaeger_https_default_port.approved.json index 5bba9ae62ef..63758c5de73 100644 --- a/processor/otel/test_approved/span_jaeger_https_default_port.approved.json +++ b/processor/otel/test_approved/span_jaeger_https_default_port.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "foo.bar.com", "port": 443 diff --git a/processor/otel/test_approved/span_jaeger_messaging.approved.json b/processor/otel/test_approved/span_jaeger_messaging.approved.json index 9654747d4bc..e7944ec07e5 100644 --- a/processor/otel/test_approved/span_jaeger_messaging.approved.json +++ b/processor/otel/test_approved/span_jaeger_messaging.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "mq", "port": 1234 diff --git a/processor/otel/test_approved/span_jaeger_subtype_component.approved.json b/processor/otel/test_approved/span_jaeger_subtype_component.approved.json index e472f7674dd..da21a1d4e04 100644 --- a/processor/otel/test_approved/span_jaeger_subtype_component.approved.json +++ b/processor/otel/test_approved/span_jaeger_subtype_component.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/transaction_jaeger_custom.approved.json b/processor/otel/test_approved/transaction_jaeger_custom.approved.json index 48096e8333f..5cb4381872b 100644 --- a/processor/otel/test_approved/transaction_jaeger_custom.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_custom.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/transaction_jaeger_full.approved.json b/processor/otel/test_approved/transaction_jaeger_full.approved.json index 982040fedaa..31370d22e4d 100644 --- a/processor/otel/test_approved/transaction_jaeger_full.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_full.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "failure" }, @@ -71,8 +69,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -134,8 +130,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "23b7ac1bdf1ca957f9f581cfadee467c", "log": { @@ -192,8 +186,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -252,8 +244,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -312,8 +302,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -372,8 +360,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "c9221918248f05433f6b81c46a666aee", "log": { diff --git a/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json b/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json index e3a22f06d40..739ab7dea11 100644 --- a/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "failure" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_component.approved.json b/processor/otel/test_approved/transaction_jaeger_type_component.approved.json index 0a619284104..840831dc533 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_component.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_component.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json b/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json index 1be7265c505..58e0fcf85d8 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_request.approved.json b/processor/otel/test_approved/transaction_jaeger_type_request.approved.json index fb18f41a471..c49d22c5889 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_request.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_request.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "failure" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json b/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json index 38b7ad24dd1..2418d4c7dab 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/stream/processor_test.go b/processor/stream/processor_test.go index 8eb5b109419..9d7e7940639 100644 --- a/processor/stream/processor_test.go +++ b/processor/stream/processor_test.go @@ -257,7 +257,7 @@ func TestRUMV3(t *testing.T) { func makeApproveEventsBatchProcessor(t *testing.T, name string, count *int) model.BatchProcessor { return model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { - events := b.Transform(ctx, &transform.Config{DataStreams: true}) + events := b.Transform(ctx, &transform.Config{}) *count += len(events) docs := beatertest.EncodeEventDocs(events...) approvaltest.ApproveEventDocs(t, name, docs) diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json index 19570c7acc9..763b2543ac9 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json @@ -36,8 +36,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "culprit": "my.module.function_name", "custom": { @@ -384,8 +382,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "dc8dd667f7036ec5f0bae87bf2188243", "id": "xFoaabb123FFFFFF", @@ -494,8 +490,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -600,8 +594,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -712,8 +704,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", "id": "abcdef0123456789", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json index c9220d740ec..c09b66d15f7 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json @@ -13,8 +13,6 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "culprit": "opbeans.controllers.DTInterceptor.preHandle(DTInterceptor.java:73)", "custom": { @@ -290,8 +288,6 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -446,8 +442,6 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -630,8 +624,6 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "dotted": { "float": { "gauge": 6.12 diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json index 3c0be4e4581..bb7538be513 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json index 3c0be4e4581..bb7538be513 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json index c15059bb4b6..a287923824a 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", "id": "abcdef0123456789", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json index 384f9083d97..5a2f402df10 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json @@ -7,8 +7,6 @@ "version": "3.14.0" }, "byte_counter": 1, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "dotted": { "float": { "gauge": 6.12 @@ -99,8 +97,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "go": { "memstats": { "heap": { @@ -145,8 +141,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "host": { "ip": "192.0.0.1" }, @@ -198,8 +192,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "host": { "ip": "192.0.0.1" }, @@ -272,8 +264,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "host": { "ip": "192.0.0.1" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json index cc5806da98b..fc65b17f96e 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", "id": "abcdef0123456789", @@ -39,8 +37,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "go": { "memstats": { "heap": { diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json index 67b20f1949b..94fb271ace9 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -79,8 +77,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -157,8 +153,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.backendspans", - "data_stream.type": "metrics", "host": { "architecture": "x64", "hostname": "prod1.example.com", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json index 502f1b0ba89..75223646be4 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json @@ -9,8 +9,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "culprit": "test/e2e/general-usecase/bundle.js.map", "exception": [ diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json index 96df5088eb9..d5e21d64d41 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json @@ -9,8 +9,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -77,8 +75,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json index 3f22c4d8bce..f10d1977034 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json @@ -37,8 +37,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -154,8 +152,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -272,8 +268,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -394,8 +388,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -513,8 +505,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "0:0::0:1", "port": 5432 @@ -720,8 +710,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "0:0::0:1" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json index 95f1ae87ea2..e7fb8b76cab 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json @@ -32,8 +32,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -153,8 +151,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -366,8 +362,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -510,8 +504,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json b/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json index e58bbaba770..c35eb2463e9 100644 --- a/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json @@ -9,8 +9,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "culprit": "test/e2e/general-usecase/app.e2e-bundle.min.js?token=secret", "custom": { diff --git a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json index 4567d59d489..dd889abb425 100644 --- a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json @@ -9,8 +9,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -157,8 +155,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "labels": { "testTagKey": "testTagValue" }, @@ -214,8 +210,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "labels": { "testTagKey": "testTagValue" }, @@ -271,8 +265,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "labels": { "testTagKey": "testTagValue" }, @@ -328,8 +320,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -399,8 +389,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -470,8 +458,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8000 @@ -563,8 +549,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -633,8 +617,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8000 @@ -732,8 +714,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8003 @@ -831,8 +811,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8003 @@ -931,8 +909,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -1024,8 +1000,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "labels": { "tag1": "value1", "testTagKey": "testTagValue" diff --git a/publish/pub.go b/publish/pub.go index 0c5fbadc4b7..1fc16eac3ef 100644 --- a/publish/pub.go +++ b/publish/pub.go @@ -26,7 +26,6 @@ import ( "github.com/pkg/errors" "go.elastic.co/apm" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -104,9 +103,6 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf Fields: common.MapStr{"observer": observerFields}, Processor: cfg.Processor, } - if cfg.TransformConfig.DataStreams { - processingCfg.Fields[datastreams.NamespaceField] = cfg.Namespace - } if cfg.Pipeline != "" { processingCfg.Meta = map[string]interface{}{"pipeline": cfg.Pipeline} } diff --git a/transform/transform.go b/transform/transform.go index 58ee99523e8..0995c4d52cb 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -32,6 +32,7 @@ type Transformable interface { // Config holds general transformation configuration. type Config struct { +<<<<<<< HEAD // DataStreams records whether or not data streams are enabled. // If true, then data_stream fields should be added to all events. DataStreams bool @@ -43,4 +44,6 @@ type Config struct { type RUMConfig struct { LibraryPattern *regexp.Regexp ExcludeFromGrouping *regexp.Regexp +======= +>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717)) }