Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Move setting of data_stream fields to processor (backport #5717) #5728

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R
DefaultServiceEnvironment: s.config.DefaultServiceEnvironment,
})
}
if s.config.DataStreams.Enabled {
processors = append(processors, &modelprocessor.SetDataStream{
Namespace: s.namespace,
})
}
return WrapRunServerWithProcessors(runServer, processors...)
}

Expand Down Expand Up @@ -626,13 +631,17 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) *transform.Config {
<<<<<<< HEAD
return &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
LibraryPattern: regexp.MustCompile(cfg.RumConfig.LibraryPattern),
ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping),
},
}
=======
return &transform.Config{}
>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717))
}

func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) {
Expand Down
34 changes: 34 additions & 0 deletions model/apmevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
//
// Exactly one of the event fields should be non-nil.
type APMEvent struct {
<<<<<<< HEAD
Transaction *Transaction
Span *Span
Metricset *Metricset
Expand All @@ -37,4 +38,37 @@ type APMEvent struct {

func (e *APMEvent) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
return nil
=======
// DataStream optionally holds data stream identifiers.
//
// This will have the zero value when APM Server is run
// in standalone mode.
DataStream DataStream

Transaction *Transaction
Span *Span
Metricset *Metricset
Error *Error
ProfileSample *ProfileSample
}

func (e *APMEvent) appendBeatEvent(ctx context.Context, cfg *transform.Config, out []beat.Event) []beat.Event {
var event beat.Event
switch {
case e.Transaction != nil:
event = e.Transaction.toBeatEvent(cfg)
case e.Span != nil:
event = e.Span.toBeatEvent(ctx, cfg)
case e.Metricset != nil:
event = e.Metricset.toBeatEvent(cfg)
case e.Error != nil:
event = e.Error.toBeatEvent(ctx, cfg)
case e.ProfileSample != nil:
event = e.ProfileSample.toBeatEvent(cfg)
default:
return out
}
e.DataStream.setFields((*mapStr)(&event.Fields))
return append(out, event)
>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717))
}
38 changes: 38 additions & 0 deletions model/datastream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package model

import "github.com/elastic/apm-server/datastreams"

// DataStream identifies the data stream to which an event will be written.
type DataStream struct {
// Type holds the data_stream.type identifier.
Type string

// Dataset holds the data_stream.dataset identifier.
Dataset string

// Namespace holds the data_stream.namespace identifier.
Namespace string
}

func (d *DataStream) setFields(fields *mapStr) {
fields.maybeSetString(datastreams.TypeField, d.Type)
fields.maybeSetString(datastreams.DatasetField, d.Dataset)
fields.maybeSetString(datastreams.NamespaceField, d.Namespace)
}
9 changes: 0 additions & 9 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/utility"
)
Expand Down Expand Up @@ -115,14 +114,6 @@ func (e *Error) appendBeatEvents(ctx context.Context, cfg *transform.Config, eve
"processor": errorProcessorEntry,
}

if cfg.DataStreams {
// Errors are stored in an APM errors-specific "logs" data stream, per service.
// By storing errors in a "logs" data stream, they can be viewed in the Logs app
// in Kibana.
fields[datastreams.TypeField] = datastreams.LogsType
fields[datastreams.DatasetField] = ErrorsDataset
}

// first set the generic metadata (order is relevant)
e.Metadata.set(&fields, e.Labels)
if client := fields["client"]; client != nil {
Expand Down
38 changes: 17 additions & 21 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,8 @@ func TestEvents(t *testing.T) {
"valid": {
Error: &Error{Timestamp: timestamp, Metadata: md},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"error": common.MapStr{
"grouping_key": "d41d8cd98f00b204e9800998ecf8427e",
},
Expand All @@ -312,11 +310,9 @@ func TestEvents(t *testing.T) {
"notSampled": {
Error: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"transaction": common.MapStr{"sampled": false},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"transaction": common.MapStr{"sampled": false},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"error": common.MapStr{
"grouping_key": "d41d8cd98f00b204e9800998ecf8427e",
},
Expand All @@ -328,9 +324,7 @@ func TestEvents(t *testing.T) {
"withMeta": {
Error: &Error{Timestamp: timestamp, Metadata: md, TransactionType: transactionType},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"transaction": common.MapStr{"type": "request"},
"transaction": common.MapStr{"type": "request"},
"error": common.MapStr{
"grouping_key": "d41d8cd98f00b204e9800998ecf8427e",
},
Expand Down Expand Up @@ -361,15 +355,13 @@ func TestEvents(t *testing.T) {
},

Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"labels": common.MapStr{"key": true, "label": 101},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"user": common.MapStr{"id": uid, "email": email},
"client": common.MapStr{"ip": userIP},
"source": common.MapStr{"ip": userIP},
"user_agent": common.MapStr{"original": userAgent},
"labels": common.MapStr{"key": true, "label": 101},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"user": common.MapStr{"id": uid, "email": email},
"client": common.MapStr{"ip": userIP},
"source": common.MapStr{"ip": userIP},
"user_agent": common.MapStr{"original": userAgent},
"error": common.MapStr{
"custom": common.MapStr{
"foo_bar": "baz",
Expand Down Expand Up @@ -398,11 +390,15 @@ func TestEvents(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
<<<<<<< HEAD
outputEvents := tc.Error.appendBeatEvents(context.Background(), &transform.Config{
DataStreams: true,
}, nil)
require.Len(t, outputEvents, 1)
outputEvent := outputEvents[0]
=======
outputEvent := tc.Error.toBeatEvent(context.Background(), &transform.Config{})
>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717))
assert.Equal(t, tc.Output, outputEvent.Fields)
assert.Equal(t, timestamp, outputEvent.Timestamp)

Expand Down
14 changes: 8 additions & 6 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
package model

import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/datastreams"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/transform"
)
Expand Down Expand Up @@ -179,7 +177,11 @@ type MetricsetSpan struct {
DestinationService DestinationService
}

<<<<<<< HEAD
func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event) []beat.Event {
=======
func (me *Metricset) toBeatEvent(*transform.Config) beat.Event {
>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717))
metricsetTransformations.Inc()
if me == nil {
return nil
Expand All @@ -204,17 +206,13 @@ func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event

me.Metadata.set(&fields, me.Labels)

var isInternal bool
if eventFields := me.Event.fields(); eventFields != nil {
isInternal = true
common.MapStr(fields).DeepUpdate(common.MapStr{metricsetEventKey: eventFields})
}
if transactionFields := me.Transaction.fields(); transactionFields != nil {
isInternal = true
common.MapStr(fields).DeepUpdate(common.MapStr{metricsetTransactionKey: transactionFields})
}
if spanFields := me.Span.fields(); spanFields != nil {
isInternal = true
common.MapStr(fields).DeepUpdate(common.MapStr{metricsetSpanKey: spanFields})
}

Expand All @@ -238,6 +236,7 @@ func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event
}
fields.maybeSetMapStr("_metric_descriptions", common.MapStr(metricDescriptions))

<<<<<<< HEAD
if cfg.DataStreams {
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
Expand All @@ -251,6 +250,9 @@ func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event
}

return append(events, beat.Event{
=======
return beat.Event{
>>>>>>> 29cfed31 (Move setting of data_stream fields to processor (#5717))
Fields: common.MapStr(fields),
Timestamp: me.Timestamp,
})
Expand Down
Loading