From 29cfed315a9f20205b9fe0dba829f18489d943f7 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 15 Jul 2021 14:01:44 +0800 Subject: [PATCH] Move setting of data_stream fields to processor (#5717) --- beater/beater.go | 9 ++- model/apmevent.go | 22 ++++-- model/datastream.go | 38 ++++++++++ model/error.go | 9 --- model/error_test.go | 36 ++++------ model/metricset.go | 20 +----- model/metricset_test.go | 50 +++++-------- model/modelprocessor/datastream.go | 72 +++++++++++++++++++ model/modelprocessor/datastream_test.go | 70 ++++++++++++++++++ model/modelprocessor/environment_test.go | 1 + model/profile.go | 7 +- model/profile_test.go | 6 +- model/span.go | 7 -- model/span_test.go | 16 ++--- model/transaction.go | 9 +-- model/transaction_test.go | 12 ++-- processor/otel/consumer_test.go | 2 +- .../jaeger_sampling_rate.approved.json | 6 -- .../metadata_jaeger-no-language.approved.json | 2 - .../metadata_jaeger-version.approved.json | 2 - .../metadata_jaeger.approved.json | 2 - .../span_jaeger_custom.approved.json | 2 - .../span_jaeger_db.approved.json | 2 - .../span_jaeger_http.approved.json | 14 ---- ...span_jaeger_http_status_code.approved.json | 2 - ...an_jaeger_https_default_port.approved.json | 2 - .../span_jaeger_messaging.approved.json | 2 - ...pan_jaeger_subtype_component.approved.json | 2 - .../transaction_jaeger_custom.approved.json | 2 - .../transaction_jaeger_full.approved.json | 14 ---- .../transaction_jaeger_no_attrs.approved.json | 2 - ...action_jaeger_type_component.approved.json | 2 - ...action_jaeger_type_messaging.approved.json | 2 - ...nsaction_jaeger_type_request.approved.json | 2 - ...n_jaeger_type_request_result.approved.json | 2 - processor/stream/processor_test.go | 2 +- .../testIntakeIntegrationErrors.approved.json | 10 --- .../testIntakeIntegrationEvents.approved.json | 8 --- ...ntakeIntegrationInvalidEvent.approved.json | 2 - ...eIntegrationInvalidJSONEvent.approved.json | 2 - ...ntegrationMetadataNullValues.approved.json | 2 - ...tIntakeIntegrationMetricsets.approved.json | 10 --- ...akeIntegrationMinimalService.approved.json | 4 -- ...ntegrationOptionalTimestamps.approved.json | 6 -- ...stIntakeIntegrationRumErrors.approved.json | 2 - ...keIntegrationRumTransactions.approved.json | 4 -- .../testIntakeIntegrationSpans.approved.json | 12 ---- ...ntakeIntegrationTransactions.approved.json | 8 --- .../testIntakeRUMV3Errors.approved.json | 2 - .../testIntakeRUMV3Events.approved.json | 26 ------- publish/pub.go | 4 -- transform/transform.go | 3 - 52 files changed, 252 insertions(+), 305 deletions(-) create mode 100644 model/datastream.go create mode 100644 model/modelprocessor/datastream.go create mode 100644 model/modelprocessor/datastream_test.go diff --git a/beater/beater.go b/beater/beater.go index 913fbfba62c..e323f845842 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -456,6 +456,11 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R DefaultServiceEnvironment: s.config.DefaultServiceEnvironment, }) } + if s.config.DataStreams.Enabled { + processors = append(processors, &modelprocessor.SetDataStream{ + Namespace: s.namespace, + }) + } return WrapRunServerWithProcessors(runServer, processors...) } @@ -625,9 +630,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ } func newTransformConfig(beatInfo beat.Info, cfg *config.Config) *transform.Config { - return &transform.Config{ - DataStreams: cfg.DataStreams.Enabled, - } + return &transform.Config{} } func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) { diff --git a/model/apmevent.go b/model/apmevent.go index 030b272051e..9e610346706 100644 --- a/model/apmevent.go +++ b/model/apmevent.go @@ -28,6 +28,12 @@ import ( // // Exactly one of the event fields should be non-nil. type APMEvent struct { + // DataStream optionally holds data stream identifiers. + // + // This will have the zero value when APM Server is run + // in standalone mode. + DataStream DataStream + Transaction *Transaction Span *Span Metricset *Metricset @@ -36,17 +42,21 @@ type APMEvent struct { } func (e *APMEvent) appendBeatEvent(ctx context.Context, cfg *transform.Config, out []beat.Event) []beat.Event { + var event beat.Event switch { case e.Transaction != nil: - out = append(out, e.Transaction.toBeatEvent(cfg)) + event = e.Transaction.toBeatEvent(cfg) case e.Span != nil: - out = append(out, e.Span.toBeatEvent(ctx, cfg)) + event = e.Span.toBeatEvent(ctx, cfg) case e.Metricset != nil: - out = append(out, e.Metricset.toBeatEvent(cfg)) + event = e.Metricset.toBeatEvent(cfg) case e.Error != nil: - out = append(out, e.Error.toBeatEvent(ctx, cfg)) + event = e.Error.toBeatEvent(ctx, cfg) case e.ProfileSample != nil: - out = append(out, e.ProfileSample.toBeatEvent(cfg)) + event = e.ProfileSample.toBeatEvent(cfg) + default: + return out } - return out + e.DataStream.setFields((*mapStr)(&event.Fields)) + return append(out, event) } diff --git a/model/datastream.go b/model/datastream.go new file mode 100644 index 00000000000..e94aec05e5b --- /dev/null +++ b/model/datastream.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +import "github.com/elastic/apm-server/datastreams" + +// DataStream identifies the data stream to which an event will be written. +type DataStream struct { + // Type holds the data_stream.type identifier. + Type string + + // Dataset holds the data_stream.dataset identifier. + Dataset string + + // Namespace holds the data_stream.namespace identifier. + Namespace string +} + +func (d *DataStream) setFields(fields *mapStr) { + fields.maybeSetString(datastreams.TypeField, d.Type) + fields.maybeSetString(datastreams.DatasetField, d.Dataset) + fields.maybeSetString(datastreams.NamespaceField, d.Namespace) +} diff --git a/model/error.go b/model/error.go index 23db37d560f..6608f6ed241 100644 --- a/model/error.go +++ b/model/error.go @@ -32,7 +32,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -111,14 +110,6 @@ func (e *Error) toBeatEvent(ctx context.Context, cfg *transform.Config) beat.Eve "processor": errorProcessorEntry, } - if cfg.DataStreams { - // Errors are stored in an APM errors-specific "logs" data stream, per service. - // By storing errors in a "logs" data stream, they can be viewed in the Logs app - // in Kibana. - fields[datastreams.TypeField] = datastreams.LogsType - fields[datastreams.DatasetField] = ErrorsDataset - } - // first set the generic metadata (order is relevant) e.Metadata.set(&fields, e.Labels) if client := fields["client"]; client != nil { diff --git a/model/error_test.go b/model/error_test.go index 7d4e352fc04..28a4059a62d 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -294,10 +294,8 @@ func TestEvents(t *testing.T) { "valid": { Error: &Error{Timestamp: timestamp, Metadata: md}, Output: common.MapStr{ - "data_stream.type": "logs", - "data_stream.dataset": "apm.error", - "agent": common.MapStr{"name": "go", "version": "1.0"}, - "service": common.MapStr{"name": "myservice", "version": "1.0"}, + "agent": common.MapStr{"name": "go", "version": "1.0"}, + "service": common.MapStr{"name": "myservice", "version": "1.0"}, "error": common.MapStr{ "grouping_key": "d41d8cd98f00b204e9800998ecf8427e", }, @@ -309,11 +307,9 @@ func TestEvents(t *testing.T) { "notSampled": { Error: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse}, Output: common.MapStr{ - "data_stream.type": "logs", - "data_stream.dataset": "apm.error", - "transaction": common.MapStr{"sampled": false}, - "agent": common.MapStr{"name": "go", "version": "1.0"}, - "service": common.MapStr{"name": "myservice", "version": "1.0"}, + "transaction": common.MapStr{"sampled": false}, + "agent": common.MapStr{"name": "go", "version": "1.0"}, + "service": common.MapStr{"name": "myservice", "version": "1.0"}, "error": common.MapStr{ "grouping_key": "d41d8cd98f00b204e9800998ecf8427e", }, @@ -325,9 +321,7 @@ func TestEvents(t *testing.T) { "withMeta": { Error: &Error{Timestamp: timestamp, Metadata: md, TransactionType: transactionType}, Output: common.MapStr{ - "data_stream.type": "logs", - "data_stream.dataset": "apm.error", - "transaction": common.MapStr{"type": "request"}, + "transaction": common.MapStr{"type": "request"}, "error": common.MapStr{ "grouping_key": "d41d8cd98f00b204e9800998ecf8427e", }, @@ -357,15 +351,13 @@ func TestEvents(t *testing.T) { }, Output: common.MapStr{ - "data_stream.type": "logs", - "data_stream.dataset": "apm.error", - "labels": common.MapStr{"key": true, "label": 101}, - "service": common.MapStr{"name": "myservice", "version": "1.0"}, - "agent": common.MapStr{"name": "go", "version": "1.0"}, - "user": common.MapStr{"id": uid, "email": email}, - "client": common.MapStr{"ip": userIP}, - "source": common.MapStr{"ip": userIP}, - "user_agent": common.MapStr{"original": userAgent}, + "labels": common.MapStr{"key": true, "label": 101}, + "service": common.MapStr{"name": "myservice", "version": "1.0"}, + "agent": common.MapStr{"name": "go", "version": "1.0"}, + "user": common.MapStr{"id": uid, "email": email}, + "client": common.MapStr{"ip": userIP}, + "source": common.MapStr{"ip": userIP}, + "user_agent": common.MapStr{"original": userAgent}, "error": common.MapStr{ "custom": common.MapStr{ "foo_bar": "baz", @@ -394,7 +386,7 @@ func TestEvents(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - outputEvent := tc.Error.toBeatEvent(context.Background(), &transform.Config{DataStreams: true}) + outputEvent := tc.Error.toBeatEvent(context.Background(), &transform.Config{}) assert.Equal(t, tc.Output, outputEvent.Fields) assert.Equal(t, timestamp, outputEvent.Timestamp) diff --git a/model/metricset.go b/model/metricset.go index c3c310c5c20..bd77b91a59d 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -18,7 +18,6 @@ package model import ( - "fmt" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -26,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/datastreams" logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/transform" ) @@ -179,7 +177,7 @@ type MetricsetSpan struct { DestinationService DestinationService } -func (me *Metricset) toBeatEvent(cfg *transform.Config) beat.Event { +func (me *Metricset) toBeatEvent(*transform.Config) beat.Event { metricsetTransformations.Inc() fields := mapStr{} for _, sample := range me.Samples { @@ -200,17 +198,13 @@ func (me *Metricset) toBeatEvent(cfg *transform.Config) beat.Event { me.Metadata.set(&fields, me.Labels) - var isInternal bool if eventFields := me.Event.fields(); eventFields != nil { - isInternal = true common.MapStr(fields).DeepUpdate(common.MapStr{metricsetEventKey: eventFields}) } if transactionFields := me.Transaction.fields(); transactionFields != nil { - isInternal = true common.MapStr(fields).DeepUpdate(common.MapStr{metricsetTransactionKey: transactionFields}) } if spanFields := me.Span.fields(); spanFields != nil { - isInternal = true common.MapStr(fields).DeepUpdate(common.MapStr{metricsetSpanKey: spanFields}) } @@ -234,18 +228,6 @@ func (me *Metricset) toBeatEvent(cfg *transform.Config) beat.Event { } fields.maybeSetMapStr("_metric_descriptions", common.MapStr(metricDescriptions)) - if cfg.DataStreams { - // Metrics that include well-defined transaction/span fields - // (i.e. breakdown metrics, transaction and span metrics) will - // be stored separately from application and runtime metrics. - dataset := InternalMetricsDataset - if !isInternal { - dataset = fmt.Sprintf("%s.%s", AppMetricsDataset, datastreams.NormalizeServiceName(me.Metadata.Service.Name)) - } - fields[datastreams.DatasetField] = dataset - fields[datastreams.TypeField] = datastreams.MetricsType - } - return beat.Event{ Fields: common.MapStr(fields), Timestamp: me.Timestamp, diff --git a/model/metricset_test.go b/model/metricset_test.go index 668d5fd6f8a..85126a5de95 100644 --- a/model/metricset_test.go +++ b/model/metricset_test.go @@ -55,9 +55,7 @@ func TestTransform(t *testing.T) { { Metricset: &Metricset{Timestamp: timestamp, Metadata: metadata}, Output: common.MapStr{ - "data_stream.type": "metrics", - "data_stream.dataset": "apm.app.myservice", - "processor": common.MapStr{"event": "metric", "name": "metric"}, + "processor": common.MapStr{"event": "metric", "name": "metric"}, "service": common.MapStr{ "name": "myservice", }, @@ -67,10 +65,8 @@ func TestTransform(t *testing.T) { { Metricset: &Metricset{Timestamp: timestamp, Metadata: metadata, Name: "raj"}, Output: common.MapStr{ - "data_stream.type": "metrics", - "data_stream.dataset": "apm.app.myservice", - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "metricset.name": "raj", + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "metricset.name": "raj", "service": common.MapStr{ "name": "myservice", }, @@ -94,11 +90,9 @@ func TestTransform(t *testing.T) { }, }, Output: common.MapStr{ - "data_stream.type": "metrics", - "data_stream.dataset": "apm.app.myservice", - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "service": common.MapStr{"name": "myservice"}, - "labels": common.MapStr{"a_b": "a.b.value"}, + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "labels": common.MapStr{"a_b": "a.b.value"}, "a": common.MapStr{"counter": float64(612)}, "some": common.MapStr{"gauge": float64(9.16)}, @@ -117,11 +111,9 @@ func TestTransform(t *testing.T) { }}, }, Output: common.MapStr{ - "data_stream.type": "metrics", - "data_stream.dataset": "apm.internal", - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "service": common.MapStr{"name": "myservice"}, - "transaction": common.MapStr{"type": trType, "name": trName}, + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "transaction": common.MapStr{"type": trType, "name": trName}, "span": common.MapStr{ "type": spType, "subtype": spSubtype, "self_time": common.MapStr{ @@ -153,12 +145,10 @@ func TestTransform(t *testing.T) { }, }, Output: common.MapStr{ - "data_stream.type": "metrics", - "data_stream.dataset": "apm.internal", - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "service": common.MapStr{"name": "myservice"}, - "event": common.MapStr{"outcome": eventOutcome}, - "timeseries": common.MapStr{"instance": "foo"}, + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "event": common.MapStr{"outcome": eventOutcome}, + "timeseries": common.MapStr{"instance": "foo"}, "transaction": common.MapStr{ "type": trType, "name": trName, @@ -194,10 +184,8 @@ func TestTransform(t *testing.T) { }, }, Output: common.MapStr{ - "data_stream.type": "metrics", - "data_stream.dataset": "apm.internal", - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "service": common.MapStr{"name": "myservice"}, + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, "span": common.MapStr{"type": spType, "subtype": spSubtype, "destination": common.MapStr{"service": common.MapStr{"resource": resource}}}, "destination": common.MapStr{"service": common.MapStr{"response_time": common.MapStr{ @@ -230,10 +218,8 @@ func TestTransform(t *testing.T) { }}, }, Output: common.MapStr{ - "data_stream.type": "metrics", - "data_stream.dataset": "apm.app.myservice", - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "service": common.MapStr{"name": "myservice"}, + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, "latency_histogram": common.MapStr{ "counts": []int64{1, 2, 3}, "values": []float64{1.1, 2.2, 3.3}, @@ -258,7 +244,7 @@ func TestTransform(t *testing.T) { } for idx, test := range tests { - outputEvent := test.Metricset.toBeatEvent(&transform.Config{DataStreams: true}) + outputEvent := test.Metricset.toBeatEvent(&transform.Config{}) assert.Equal(t, test.Output, outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) assert.Equal(t, timestamp, outputEvent.Timestamp, fmt.Sprintf("Bad timestamp at idx %v; %s", idx, test.Msg)) } diff --git a/model/modelprocessor/datastream.go b/model/modelprocessor/datastream.go new file mode 100644 index 00000000000..b36190d83af --- /dev/null +++ b/model/modelprocessor/datastream.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor + +import ( + "context" + "fmt" + + "github.com/elastic/apm-server/datastreams" + "github.com/elastic/apm-server/model" +) + +// SetDataStream is a model.BatchProcessor that sets the data stream for events. +type SetDataStream struct { + Namespace string +} + +// ProcessBatch sets data stream fields for each event in b. +func (s *SetDataStream) ProcessBatch(ctx context.Context, b *model.Batch) error { + for i := range *b { + s.setDataStream(&(*b)[i]) + } + return nil +} + +func (s *SetDataStream) setDataStream(event *model.APMEvent) { + switch { + case event.Transaction != nil || event.Span != nil: + event.DataStream.Type = datastreams.TracesType + event.DataStream.Dataset = model.TracesDataset + case event.Error != nil: + event.DataStream.Type = datastreams.LogsType + event.DataStream.Dataset = model.ErrorsDataset + case event.Metricset != nil: + event.DataStream.Type = datastreams.MetricsType + // Metrics that include well-defined transaction/span fields + // (i.e. breakdown metrics, transaction and span metrics) will + // be stored separately from application and runtime metrics. + event.DataStream.Dataset = model.InternalMetricsDataset + if isApplicationMetricset(event.Metricset) { + event.DataStream.Dataset = fmt.Sprintf( + "%s.%s", model.AppMetricsDataset, + datastreams.NormalizeServiceName(event.Metricset.Metadata.Service.Name), + ) + } + case event.ProfileSample != nil: + event.DataStream.Type = datastreams.MetricsType + event.DataStream.Dataset = model.ProfilesDataset + } + event.DataStream.Namespace = s.Namespace +} + +func isApplicationMetricset(ms *model.Metricset) bool { + return ms.Event == (model.MetricsetEventCategorization{}) && + ms.Transaction == (model.MetricsetTransaction{}) && + ms.Span == (model.MetricsetSpan{}) +} diff --git a/model/modelprocessor/datastream_test.go b/model/modelprocessor/datastream_test.go new file mode 100644 index 00000000000..68eaaed6030 --- /dev/null +++ b/model/modelprocessor/datastream_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" +) + +func TestSetDataStream(t *testing.T) { + tests := []struct { + input model.APMEvent + output model.DataStream + }{{ + input: model.APMEvent{}, + output: model.DataStream{Namespace: "custom"}, + }, { + input: model.APMEvent{Transaction: &model.Transaction{}}, + output: model.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, + }, { + input: model.APMEvent{Span: &model.Span{}}, + output: model.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, + }, { + input: model.APMEvent{Error: &model.Error{}}, + output: model.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"}, + }, { + input: model.APMEvent{Metricset: &model.Metricset{ + Metadata: model.Metadata{Service: model.Service{Name: "service-name"}}, + Transaction: model.MetricsetTransaction{Name: "foo"}, + }}, + output: model.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"}, + }, { + input: model.APMEvent{Metricset: &model.Metricset{ + Metadata: model.Metadata{Service: model.Service{Name: "service-name"}}, + }}, + output: model.DataStream{Type: "metrics", Dataset: "apm.app.service_name", Namespace: "custom"}, + }, { + input: model.APMEvent{ProfileSample: &model.ProfileSample{}}, + output: model.DataStream{Type: "metrics", Dataset: "apm.profiling", Namespace: "custom"}, + }} + + for _, test := range tests { + batch := model.Batch{test.input} + processor := modelprocessor.SetDataStream{Namespace: "custom"} + err := processor.ProcessBatch(context.Background(), &batch) + assert.NoError(t, err) + assert.Equal(t, test.output, batch[0].DataStream) + } + +} diff --git a/model/modelprocessor/environment_test.go b/model/modelprocessor/environment_test.go index 5690d8f1e4f..ec92ed7a91c 100644 --- a/model/modelprocessor/environment_test.go +++ b/model/modelprocessor/environment_test.go @@ -51,6 +51,7 @@ func testProcessBatchMetadata(t *testing.T, processor model.BatchProcessor, in, apmEventFields = append(apmEventFields, typ.Field(i).Name) } assert.ElementsMatch(t, []string{ + "DataStream", "Transaction", "Span", "Metricset", diff --git a/model/profile.go b/model/profile.go index aa72699fd59..711d84bfe08 100644 --- a/model/profile.go +++ b/model/profile.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" ) @@ -57,7 +56,7 @@ type ProfileSampleStackframe struct { Line int64 } -func (p *ProfileSample) toBeatEvent(cfg *transform.Config) beat.Event { +func (p *ProfileSample) toBeatEvent(*transform.Config) beat.Event { var profileFields mapStr profileFields.maybeSetString("id", p.ProfileID) if p.Duration > 0 { @@ -90,10 +89,6 @@ func (p *ProfileSample) toBeatEvent(cfg *transform.Config) beat.Event { profileDocType: common.MapStr(profileFields), } p.Metadata.set(&fields, p.Labels) - if cfg.DataStreams { - fields[datastreams.TypeField] = datastreams.MetricsType - fields[datastreams.DatasetField] = ProfilesDataset - } return beat.Event{ Timestamp: p.Timestamp, diff --git a/model/profile_test.go b/model/profile_test.go index b72b23f8de7..8a8408bb28a 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -67,16 +67,14 @@ func TestProfileSampleTransform(t *testing.T) { } batch := &model.Batch{{ProfileSample: &sample}, {ProfileSample: &sample}} - output := batch.Transform(context.Background(), &transform.Config{DataStreams: true}) + output := batch.Transform(context.Background(), &transform.Config{}) require.Len(t, output, 2) assert.Equal(t, output[0], output[1]) assert.Equal(t, beat.Event{ Timestamp: timestamp, Fields: common.MapStr{ - "data_stream.type": "metrics", - "data_stream.dataset": "apm.profiling", - "processor": common.MapStr{"event": "profile", "name": "profile"}, + "processor": common.MapStr{"event": "profile", "name": "profile"}, "service": common.MapStr{ "name": "myService", "environment": "staging", diff --git a/model/span.go b/model/span.go index 4de6c15e1f1..61fc839b723 100644 --- a/model/span.go +++ b/model/span.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -193,12 +192,6 @@ func (e *Span) toBeatEvent(ctx context.Context, cfg *transform.Config) beat.Even spanDocType: e.fields(ctx, cfg), } - if cfg.DataStreams { - // Spans are stored in a "traces" data stream along with transactions. - fields[datastreams.TypeField] = datastreams.TracesType - fields[datastreams.DatasetField] = TracesDataset - } - // first set the generic metadata e.Metadata.set(&fields, e.Labels) diff --git a/model/span_test.go b/model/span_test.go index 8c9689b37f4..a37405a8a66 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -56,10 +56,8 @@ func TestSpanTransform(t *testing.T) { Msg: "Span without a Stacktrace", Span: Span{Timestamp: timestamp, Metadata: metadata}, Output: common.MapStr{ - "data_stream.type": "traces", - "data_stream.dataset": "apm", - "processor": common.MapStr{"event": "span", "name": "transaction"}, - "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, + "processor": common.MapStr{"event": "span", "name": "transaction"}, + "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, "span": common.MapStr{ "duration": common.MapStr{"us": 0}, "name": "", @@ -74,10 +72,8 @@ func TestSpanTransform(t *testing.T) { Msg: "Span with outcome", Span: Span{Timestamp: timestamp, Metadata: metadata, Outcome: "success"}, Output: common.MapStr{ - "data_stream.type": "traces", - "data_stream.dataset": "apm", - "processor": common.MapStr{"event": "span", "name": "transaction"}, - "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, + "processor": common.MapStr{"event": "span", "name": "transaction"}, + "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, "span": common.MapStr{ "duration": common.MapStr{"us": 0}, "name": "", @@ -123,8 +119,6 @@ func TestSpanTransform(t *testing.T) { Message: &Message{QueueName: "users"}, }, Output: common.MapStr{ - "data_stream.type": "traces", - "data_stream.dataset": "apm", "span": common.MapStr{ "id": hexID, "duration": common.MapStr{"us": 1200}, @@ -176,7 +170,7 @@ func TestSpanTransform(t *testing.T) { } for _, test := range tests { - output := test.Span.toBeatEvent(context.Background(), &transform.Config{DataStreams: true}) + output := test.Span.toBeatEvent(context.Background(), &transform.Config{}) assert.Equal(t, test.Output, output.Fields, test.Msg) } } diff --git a/model/transaction.go b/model/transaction.go index 575b381816d..f6979b4138c 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -24,7 +24,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -120,7 +119,7 @@ func (e *Transaction) fields() common.MapStr { return common.MapStr(fields) } -func (e *Transaction) toBeatEvent(cfg *transform.Config) beat.Event { +func (e *Transaction) toBeatEvent(*transform.Config) beat.Event { transactionTransformations.Inc() fields := mapStr{ @@ -128,12 +127,6 @@ func (e *Transaction) toBeatEvent(cfg *transform.Config) beat.Event { transactionDocType: e.fields(), } - if cfg.DataStreams { - // Transactions are stored in a "traces" data stream along with spans. - fields[datastreams.TypeField] = datastreams.TracesType - fields[datastreams.DatasetField] = TracesDataset - } - // first set generic metadata (order is relevant) e.Metadata.set(&fields, e.Labels) if client := fields["client"]; client != nil { diff --git a/model/transaction_test.go b/model/transaction_test.go index d624c1e2dbb..244f7622a6b 100644 --- a/model/transaction_test.go +++ b/model/transaction_test.go @@ -175,14 +175,12 @@ func TestEventsTransformWithMetadata(t *testing.T) { Custom: common.MapStr{"foo.bar": "baz"}, Message: &Message{QueueName: "routeUser"}, } - event := txWithContext.toBeatEvent(&transform.Config{DataStreams: true}) + event := txWithContext.toBeatEvent(&transform.Config{}) assert.Equal(t, event.Fields, common.MapStr{ - "data_stream.type": "traces", - "data_stream.dataset": "apm", - "user": common.MapStr{"id": "123", "name": "jane"}, - "client": common.MapStr{"ip": ip}, - "source": common.MapStr{"ip": ip}, - "user_agent": common.MapStr{"original": userAgent}, + "user": common.MapStr{"id": "123", "name": "jane"}, + "client": common.MapStr{"ip": ip}, + "source": common.MapStr{"ip": ip}, + "user_agent": common.MapStr{"original": userAgent}, "host": common.MapStr{ "architecture": "darwin", "hostname": "a.b.c", diff --git a/processor/otel/consumer_test.go b/processor/otel/consumer_test.go index 20c11339d7b..447d6189419 100644 --- a/processor/otel/consumer_test.go +++ b/processor/otel/consumer_test.go @@ -1166,7 +1166,7 @@ func eventRecorderBatchProcessor(out *[]beat.Event) model.BatchProcessor { func transformBatch(ctx context.Context, batches ...*model.Batch) []beat.Event { var out []beat.Event for _, batch := range batches { - out = append(out, batch.Transform(ctx, &transform.Config{DataStreams: true})...) + out = append(out, batch.Transform(ctx, &transform.Config{})...) } return out } diff --git a/processor/otel/test_approved/jaeger_sampling_rate.approved.json b/processor/otel/test_approved/jaeger_sampling_rate.approved.json index 5342958a4cd..9262f94d224 100644 --- a/processor/otel/test_approved/jaeger_sampling_rate.approved.json +++ b/processor/otel/test_approved/jaeger_sampling_rate.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -42,8 +40,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -83,8 +79,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/metadata_jaeger-no-language.approved.json b/processor/otel/test_approved/metadata_jaeger-no-language.approved.json index 0dc34aa4121..b0db5a40175 100644 --- a/processor/otel/test_approved/metadata_jaeger-no-language.approved.json +++ b/processor/otel/test_approved/metadata_jaeger-no-language.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "3.4.12" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/metadata_jaeger-version.approved.json b/processor/otel/test_approved/metadata_jaeger-version.approved.json index a648c2fad35..4827fa32217 100644 --- a/processor/otel/test_approved/metadata_jaeger-version.approved.json +++ b/processor/otel/test_approved/metadata_jaeger-version.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger/PHP", "version": "3.4.12" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/metadata_jaeger.approved.json b/processor/otel/test_approved/metadata_jaeger.approved.json index f89beffda02..6d4476d08f7 100644 --- a/processor/otel/test_approved/metadata_jaeger.approved.json +++ b/processor/otel/test_approved/metadata_jaeger.approved.json @@ -7,8 +7,6 @@ "name": "Jaeger/C++", "version": "3.2.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/span_jaeger_custom.approved.json b/processor/otel/test_approved/span_jaeger_custom.approved.json index 16b4ae40318..44b601bef31 100644 --- a/processor/otel/test_approved/span_jaeger_custom.approved.json +++ b/processor/otel/test_approved/span_jaeger_custom.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/span_jaeger_db.approved.json b/processor/otel/test_approved/span_jaeger_db.approved.json index a1365f95e9c..84db3e698cd 100644 --- a/processor/otel/test_approved/span_jaeger_db.approved.json +++ b/processor/otel/test_approved/span_jaeger_db.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "db", "port": 3306 diff --git a/processor/otel/test_approved/span_jaeger_http.approved.json b/processor/otel/test_approved/span_jaeger_http.approved.json index 9d00b13678b..97620f8e091 100644 --- a/processor/otel/test_approved/span_jaeger_http.approved.json +++ b/processor/otel/test_approved/span_jaeger_http.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "foo.bar.com", "port": 80 @@ -83,8 +81,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -125,8 +121,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "23b7ac1bdf1ca957f9f581cfadee467c", "log": { @@ -162,8 +156,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -201,8 +193,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -240,8 +230,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -279,8 +267,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "c9221918248f05433f6b81c46a666aee", "log": { diff --git a/processor/otel/test_approved/span_jaeger_http_status_code.approved.json b/processor/otel/test_approved/span_jaeger_http_status_code.approved.json index e0ed060c675..241cfac713f 100644 --- a/processor/otel/test_approved/span_jaeger_http_status_code.approved.json +++ b/processor/otel/test_approved/span_jaeger_http_status_code.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "foo.bar.com", "port": 80 diff --git a/processor/otel/test_approved/span_jaeger_https_default_port.approved.json b/processor/otel/test_approved/span_jaeger_https_default_port.approved.json index 5bba9ae62ef..63758c5de73 100644 --- a/processor/otel/test_approved/span_jaeger_https_default_port.approved.json +++ b/processor/otel/test_approved/span_jaeger_https_default_port.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "foo.bar.com", "port": 443 diff --git a/processor/otel/test_approved/span_jaeger_messaging.approved.json b/processor/otel/test_approved/span_jaeger_messaging.approved.json index 9654747d4bc..e7944ec07e5 100644 --- a/processor/otel/test_approved/span_jaeger_messaging.approved.json +++ b/processor/otel/test_approved/span_jaeger_messaging.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "mq", "port": 1234 diff --git a/processor/otel/test_approved/span_jaeger_subtype_component.approved.json b/processor/otel/test_approved/span_jaeger_subtype_component.approved.json index e472f7674dd..da21a1d4e04 100644 --- a/processor/otel/test_approved/span_jaeger_subtype_component.approved.json +++ b/processor/otel/test_approved/span_jaeger_subtype_component.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/transaction_jaeger_custom.approved.json b/processor/otel/test_approved/transaction_jaeger_custom.approved.json index 48096e8333f..5cb4381872b 100644 --- a/processor/otel/test_approved/transaction_jaeger_custom.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_custom.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/transaction_jaeger_full.approved.json b/processor/otel/test_approved/transaction_jaeger_full.approved.json index 982040fedaa..31370d22e4d 100644 --- a/processor/otel/test_approved/transaction_jaeger_full.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_full.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "failure" }, @@ -71,8 +69,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -134,8 +130,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "23b7ac1bdf1ca957f9f581cfadee467c", "log": { @@ -192,8 +186,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -252,8 +244,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -312,8 +302,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -372,8 +360,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "c9221918248f05433f6b81c46a666aee", "log": { diff --git a/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json b/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json index e3a22f06d40..739ab7dea11 100644 --- a/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "failure" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_component.approved.json b/processor/otel/test_approved/transaction_jaeger_type_component.approved.json index 0a619284104..840831dc533 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_component.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_component.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json b/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json index 1be7265c505..58e0fcf85d8 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_request.approved.json b/processor/otel/test_approved/transaction_jaeger_type_request.approved.json index fb18f41a471..c49d22c5889 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_request.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_request.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "failure" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json b/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json index 38b7ad24dd1..2418d4c7dab 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json @@ -6,8 +6,6 @@ "name": "Jaeger", "version": "unknown" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/stream/processor_test.go b/processor/stream/processor_test.go index 8eb5b109419..9d7e7940639 100644 --- a/processor/stream/processor_test.go +++ b/processor/stream/processor_test.go @@ -257,7 +257,7 @@ func TestRUMV3(t *testing.T) { func makeApproveEventsBatchProcessor(t *testing.T, name string, count *int) model.BatchProcessor { return model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { - events := b.Transform(ctx, &transform.Config{DataStreams: true}) + events := b.Transform(ctx, &transform.Config{}) *count += len(events) docs := beatertest.EncodeEventDocs(events...) approvaltest.ApproveEventDocs(t, name, docs) diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json index 3338630746f..a2d41d4930d 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json @@ -36,8 +36,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "culprit": "my.module.function_name", "custom": { @@ -383,8 +381,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "dc8dd667f7036ec5f0bae87bf2188243", "id": "xFoaabb123FFFFFF", @@ -493,8 +489,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -599,8 +593,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "exception": [ { @@ -711,8 +703,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", "id": "abcdef0123456789", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json index c9220d740ec..c09b66d15f7 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json @@ -13,8 +13,6 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "culprit": "opbeans.controllers.DTInterceptor.preHandle(DTInterceptor.java:73)", "custom": { @@ -290,8 +288,6 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -446,8 +442,6 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -630,8 +624,6 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "dotted": { "float": { "gauge": 6.12 diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json index 3c0be4e4581..bb7538be513 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json index 3c0be4e4581..bb7538be513 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json index c15059bb4b6..a287923824a 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", "id": "abcdef0123456789", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json index 384f9083d97..5a2f402df10 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json @@ -7,8 +7,6 @@ "version": "3.14.0" }, "byte_counter": 1, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "dotted": { "float": { "gauge": 6.12 @@ -99,8 +97,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "go": { "memstats": { "heap": { @@ -145,8 +141,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "host": { "ip": "192.0.0.1" }, @@ -198,8 +192,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "host": { "ip": "192.0.0.1" }, @@ -272,8 +264,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "host": { "ip": "192.0.0.1" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json index cc5806da98b..fc65b17f96e 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", "id": "abcdef0123456789", @@ -39,8 +37,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", "go": { "memstats": { "heap": { diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json index 67b20f1949b..94fb271ace9 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json @@ -6,8 +6,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -79,8 +77,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -157,8 +153,6 @@ "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.backendspans", - "data_stream.type": "metrics", "host": { "architecture": "x64", "hostname": "prod1.example.com", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json index 3cfbd7d8493..4ab0dcc5952 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json @@ -9,8 +9,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "culprit": "test/e2e/general-usecase/bundle.js.map", "exception": [ diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json index 96df5088eb9..d5e21d64d41 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json @@ -9,8 +9,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -77,8 +75,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json index 3f22c4d8bce..f10d1977034 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json @@ -37,8 +37,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -154,8 +152,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -272,8 +268,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -394,8 +388,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -513,8 +505,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "0:0::0:1", "port": 5432 @@ -720,8 +710,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "0:0::0:1" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json index 95f1ae87ea2..e7fb8b76cab 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json @@ -32,8 +32,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -153,8 +151,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -366,8 +362,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -510,8 +504,6 @@ "container": { "id": "container-id" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json b/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json index e58bbaba770..c35eb2463e9 100644 --- a/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json @@ -9,8 +9,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", "error": { "culprit": "test/e2e/general-usecase/app.e2e-bundle.min.js?token=secret", "custom": { diff --git a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json index 4567d59d489..dd889abb425 100644 --- a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json @@ -9,8 +9,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -157,8 +155,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "labels": { "testTagKey": "testTagValue" }, @@ -214,8 +210,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "labels": { "testTagKey": "testTagValue" }, @@ -271,8 +265,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "labels": { "testTagKey": "testTagValue" }, @@ -328,8 +320,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -399,8 +389,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -470,8 +458,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8000 @@ -563,8 +549,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -633,8 +617,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8000 @@ -732,8 +714,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8003 @@ -831,8 +811,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8003 @@ -931,8 +909,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -1024,8 +1000,6 @@ "client": { "ip": "192.0.0.1" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", "labels": { "tag1": "value1", "testTagKey": "testTagValue" diff --git a/publish/pub.go b/publish/pub.go index 0c5fbadc4b7..1fc16eac3ef 100644 --- a/publish/pub.go +++ b/publish/pub.go @@ -26,7 +26,6 @@ import ( "github.com/pkg/errors" "go.elastic.co/apm" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -104,9 +103,6 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf Fields: common.MapStr{"observer": observerFields}, Processor: cfg.Processor, } - if cfg.TransformConfig.DataStreams { - processingCfg.Fields[datastreams.NamespaceField] = cfg.Namespace - } if cfg.Pipeline != "" { processingCfg.Meta = map[string]interface{}{"pipeline": cfg.Pipeline} } diff --git a/transform/transform.go b/transform/transform.go index b4323fc98f7..af0cd3ed39c 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -31,7 +31,4 @@ type Transformable interface { // Config holds general transformation configuration. type Config struct { - // DataStreams records whether or not data streams are enabled. - // If true, then data_stream fields should be added to all events. - DataStreams bool }