Skip to content

Commit

Permalink
Move setting of data_stream fields to processor (#5717)
Browse files Browse the repository at this point in the history
  • Loading branch information
axw authored Jul 15, 2021
1 parent 61bf74d commit 29cfed3
Show file tree
Hide file tree
Showing 52 changed files with 252 additions and 305 deletions.
9 changes: 6 additions & 3 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R
DefaultServiceEnvironment: s.config.DefaultServiceEnvironment,
})
}
if s.config.DataStreams.Enabled {
processors = append(processors, &modelprocessor.SetDataStream{
Namespace: s.namespace,
})
}
return WrapRunServerWithProcessors(runServer, processors...)
}

Expand Down Expand Up @@ -625,9 +630,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) *transform.Config {
return &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
}
return &transform.Config{}
}

func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) {
Expand Down
22 changes: 16 additions & 6 deletions model/apmevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ import (
//
// Exactly one of the event fields should be non-nil.
type APMEvent struct {
// DataStream optionally holds data stream identifiers.
//
// This will have the zero value when APM Server is run
// in standalone mode.
DataStream DataStream

Transaction *Transaction
Span *Span
Metricset *Metricset
Expand All @@ -36,17 +42,21 @@ type APMEvent struct {
}

func (e *APMEvent) appendBeatEvent(ctx context.Context, cfg *transform.Config, out []beat.Event) []beat.Event {
var event beat.Event
switch {
case e.Transaction != nil:
out = append(out, e.Transaction.toBeatEvent(cfg))
event = e.Transaction.toBeatEvent(cfg)
case e.Span != nil:
out = append(out, e.Span.toBeatEvent(ctx, cfg))
event = e.Span.toBeatEvent(ctx, cfg)
case e.Metricset != nil:
out = append(out, e.Metricset.toBeatEvent(cfg))
event = e.Metricset.toBeatEvent(cfg)
case e.Error != nil:
out = append(out, e.Error.toBeatEvent(ctx, cfg))
event = e.Error.toBeatEvent(ctx, cfg)
case e.ProfileSample != nil:
out = append(out, e.ProfileSample.toBeatEvent(cfg))
event = e.ProfileSample.toBeatEvent(cfg)
default:
return out
}
return out
e.DataStream.setFields((*mapStr)(&event.Fields))
return append(out, event)
}
38 changes: 38 additions & 0 deletions model/datastream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package model

import "github.com/elastic/apm-server/datastreams"

// DataStream identifies the data stream to which an event will be written.
type DataStream struct {
// Type holds the data_stream.type identifier.
Type string

// Dataset holds the data_stream.dataset identifier.
Dataset string

// Namespace holds the data_stream.namespace identifier.
Namespace string
}

func (d *DataStream) setFields(fields *mapStr) {
fields.maybeSetString(datastreams.TypeField, d.Type)
fields.maybeSetString(datastreams.DatasetField, d.Dataset)
fields.maybeSetString(datastreams.NamespaceField, d.Namespace)
}
9 changes: 0 additions & 9 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/utility"
)
Expand Down Expand Up @@ -111,14 +110,6 @@ func (e *Error) toBeatEvent(ctx context.Context, cfg *transform.Config) beat.Eve
"processor": errorProcessorEntry,
}

if cfg.DataStreams {
// Errors are stored in an APM errors-specific "logs" data stream, per service.
// By storing errors in a "logs" data stream, they can be viewed in the Logs app
// in Kibana.
fields[datastreams.TypeField] = datastreams.LogsType
fields[datastreams.DatasetField] = ErrorsDataset
}

// first set the generic metadata (order is relevant)
e.Metadata.set(&fields, e.Labels)
if client := fields["client"]; client != nil {
Expand Down
36 changes: 14 additions & 22 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,8 @@ func TestEvents(t *testing.T) {
"valid": {
Error: &Error{Timestamp: timestamp, Metadata: md},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"error": common.MapStr{
"grouping_key": "d41d8cd98f00b204e9800998ecf8427e",
},
Expand All @@ -309,11 +307,9 @@ func TestEvents(t *testing.T) {
"notSampled": {
Error: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"transaction": common.MapStr{"sampled": false},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"transaction": common.MapStr{"sampled": false},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"error": common.MapStr{
"grouping_key": "d41d8cd98f00b204e9800998ecf8427e",
},
Expand All @@ -325,9 +321,7 @@ func TestEvents(t *testing.T) {
"withMeta": {
Error: &Error{Timestamp: timestamp, Metadata: md, TransactionType: transactionType},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"transaction": common.MapStr{"type": "request"},
"transaction": common.MapStr{"type": "request"},
"error": common.MapStr{
"grouping_key": "d41d8cd98f00b204e9800998ecf8427e",
},
Expand Down Expand Up @@ -357,15 +351,13 @@ func TestEvents(t *testing.T) {
},

Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"labels": common.MapStr{"key": true, "label": 101},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"user": common.MapStr{"id": uid, "email": email},
"client": common.MapStr{"ip": userIP},
"source": common.MapStr{"ip": userIP},
"user_agent": common.MapStr{"original": userAgent},
"labels": common.MapStr{"key": true, "label": 101},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"user": common.MapStr{"id": uid, "email": email},
"client": common.MapStr{"ip": userIP},
"source": common.MapStr{"ip": userIP},
"user_agent": common.MapStr{"original": userAgent},
"error": common.MapStr{
"custom": common.MapStr{
"foo_bar": "baz",
Expand Down Expand Up @@ -394,7 +386,7 @@ func TestEvents(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
outputEvent := tc.Error.toBeatEvent(context.Background(), &transform.Config{DataStreams: true})
outputEvent := tc.Error.toBeatEvent(context.Background(), &transform.Config{})
assert.Equal(t, tc.Output, outputEvent.Fields)
assert.Equal(t, timestamp, outputEvent.Timestamp)

Expand Down
20 changes: 1 addition & 19 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
package model

import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/datastreams"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/transform"
)
Expand Down Expand Up @@ -179,7 +177,7 @@ type MetricsetSpan struct {
DestinationService DestinationService
}

func (me *Metricset) toBeatEvent(cfg *transform.Config) beat.Event {
func (me *Metricset) toBeatEvent(*transform.Config) beat.Event {
metricsetTransformations.Inc()
fields := mapStr{}
for _, sample := range me.Samples {
Expand All @@ -200,17 +198,13 @@ func (me *Metricset) toBeatEvent(cfg *transform.Config) beat.Event {

me.Metadata.set(&fields, me.Labels)

var isInternal bool
if eventFields := me.Event.fields(); eventFields != nil {
isInternal = true
common.MapStr(fields).DeepUpdate(common.MapStr{metricsetEventKey: eventFields})
}
if transactionFields := me.Transaction.fields(); transactionFields != nil {
isInternal = true
common.MapStr(fields).DeepUpdate(common.MapStr{metricsetTransactionKey: transactionFields})
}
if spanFields := me.Span.fields(); spanFields != nil {
isInternal = true
common.MapStr(fields).DeepUpdate(common.MapStr{metricsetSpanKey: spanFields})
}

Expand All @@ -234,18 +228,6 @@ func (me *Metricset) toBeatEvent(cfg *transform.Config) beat.Event {
}
fields.maybeSetMapStr("_metric_descriptions", common.MapStr(metricDescriptions))

if cfg.DataStreams {
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
// be stored separately from application and runtime metrics.
dataset := InternalMetricsDataset
if !isInternal {
dataset = fmt.Sprintf("%s.%s", AppMetricsDataset, datastreams.NormalizeServiceName(me.Metadata.Service.Name))
}
fields[datastreams.DatasetField] = dataset
fields[datastreams.TypeField] = datastreams.MetricsType
}

return beat.Event{
Fields: common.MapStr(fields),
Timestamp: me.Timestamp,
Expand Down
50 changes: 18 additions & 32 deletions model/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ func TestTransform(t *testing.T) {
{
Metricset: &Metricset{Timestamp: timestamp, Metadata: metadata},
Output: common.MapStr{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.app.myservice",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{
"name": "myservice",
},
Expand All @@ -67,10 +65,8 @@ func TestTransform(t *testing.T) {
{
Metricset: &Metricset{Timestamp: timestamp, Metadata: metadata, Name: "raj"},
Output: common.MapStr{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.app.myservice",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"metricset.name": "raj",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"metricset.name": "raj",
"service": common.MapStr{
"name": "myservice",
},
Expand All @@ -94,11 +90,9 @@ func TestTransform(t *testing.T) {
},
},
Output: common.MapStr{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.app.myservice",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"labels": common.MapStr{"a_b": "a.b.value"},
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"labels": common.MapStr{"a_b": "a.b.value"},

"a": common.MapStr{"counter": float64(612)},
"some": common.MapStr{"gauge": float64(9.16)},
Expand All @@ -117,11 +111,9 @@ func TestTransform(t *testing.T) {
}},
},
Output: common.MapStr{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.internal",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"transaction": common.MapStr{"type": trType, "name": trName},
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"transaction": common.MapStr{"type": trType, "name": trName},
"span": common.MapStr{
"type": spType, "subtype": spSubtype,
"self_time": common.MapStr{
Expand Down Expand Up @@ -153,12 +145,10 @@ func TestTransform(t *testing.T) {
},
},
Output: common.MapStr{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.internal",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"event": common.MapStr{"outcome": eventOutcome},
"timeseries": common.MapStr{"instance": "foo"},
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"event": common.MapStr{"outcome": eventOutcome},
"timeseries": common.MapStr{"instance": "foo"},
"transaction": common.MapStr{
"type": trType,
"name": trName,
Expand Down Expand Up @@ -194,10 +184,8 @@ func TestTransform(t *testing.T) {
},
},
Output: common.MapStr{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.internal",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"span": common.MapStr{"type": spType, "subtype": spSubtype,
"destination": common.MapStr{"service": common.MapStr{"resource": resource}}},
"destination": common.MapStr{"service": common.MapStr{"response_time": common.MapStr{
Expand Down Expand Up @@ -230,10 +218,8 @@ func TestTransform(t *testing.T) {
}},
},
Output: common.MapStr{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.app.myservice",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"latency_histogram": common.MapStr{
"counts": []int64{1, 2, 3},
"values": []float64{1.1, 2.2, 3.3},
Expand All @@ -258,7 +244,7 @@ func TestTransform(t *testing.T) {
}

for idx, test := range tests {
outputEvent := test.Metricset.toBeatEvent(&transform.Config{DataStreams: true})
outputEvent := test.Metricset.toBeatEvent(&transform.Config{})
assert.Equal(t, test.Output, outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
assert.Equal(t, timestamp, outputEvent.Timestamp, fmt.Sprintf("Bad timestamp at idx %v; %s", idx, test.Msg))
}
Expand Down
Loading

0 comments on commit 29cfed3

Please sign in to comment.