Skip to content

Commit

Permalink
Move labels and timestamp to model.APMEvent (elastic#5840) (elastic#5847
Browse files Browse the repository at this point in the history
)

* Remove event-specific Labels fields

Remove Transaction.Labels, Span.Labels, etc., and combine
global labels ("metadata labels") with event-specific labels
at decoding and model construction time.

* model: move Timestamp to APMEvent

* model/modeldecoder: adapt to APMEvent.Timestamp

Remove modeldecoder.Input.RequestTime. Instead, we will
have processor/stream initialise the base event's timestamp
to the request time and leave it unmodified in the event
that no timestamp is specified for the event.

* processor/otel: adapt to APMEvent.Timestamp

* aggregation: adapt to APMEvent.Timestamp

(cherry picked from commit c041e85)

Co-authored-by: Andrew Wilkins <[email protected]>
  • Loading branch information
mergify[bot] and axw authored Aug 2, 2021
1 parent fc9f9c9 commit 3836710
Show file tree
Hide file tree
Showing 42 changed files with 629 additions and 681 deletions.
12 changes: 7 additions & 5 deletions agentcfg/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ func (r Reporter) Run(ctx context.Context) error {
}
batch := make(model.Batch, 0, len(applied))
for etag := range applied {
batch = append(batch, model.APMEvent{Metricset: &model.Metricset{
Name: "agent_config",
batch = append(batch, model.APMEvent{
Labels: common.MapStr{"etag": etag},
Samples: map[string]model.MetricsetSample{
"agent_config_applied": {Value: 1},
Metricset: &model.Metricset{
Name: "agent_config",
Samples: map[string]model.MetricsetSample{
"agent_config_applied": {Value: 1},
},
},
}})
})
}
// Reset applied map, so that we report only configs applied
// during a given iteration.
Expand Down
4 changes: 2 additions & 2 deletions agentcfg/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ func (f fauxFetcher) Fetch(_ context.Context, q Query) (Result, error) {

type batchProcessor struct {
receivedc chan struct{}
received []*model.Metricset
received []model.APMEvent
mu sync.Mutex
}

func (p *batchProcessor) ProcessBatch(_ context.Context, b *model.Batch) error {
p.mu.Lock()
defer p.mu.Unlock()
for _, event := range *b {
p.received = append(p.received, event.Metricset)
p.received = append(p.received, event)
}
p.receivedc <- struct{}{}
return nil
Expand Down
13 changes: 4 additions & 9 deletions beater/api/profile/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/gofrs/uuid"
"github.com/google/pprof/profile"

"github.com/elastic/beats/v7/libbeat/common"

"github.com/elastic/apm-server/model"
)

Expand All @@ -36,7 +34,7 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out

// Precompute value field names for use in each event.
// TODO(axw) limit to well-known value names?
profileTimestamp := time.Unix(0, pp.TimeNanos)
baseEvent.Timestamp = time.Unix(0, pp.TimeNanos)
valueFieldNames := make([]string, len(pp.SampleType))
for i, sampleType := range pp.SampleType {
sampleUnit := normalizeUnit(sampleType.Unit)
Expand Down Expand Up @@ -88,11 +86,11 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out
}
}

var labels common.MapStr
event := baseEvent
event.Labels = event.Labels.Clone()
if n := len(sample.Label); n > 0 {
labels = make(common.MapStr, n)
for k, v := range sample.Label {
labels[k] = v
event.Labels[k] = v
}
}

Expand All @@ -101,13 +99,10 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out
values[valueFieldNames[i]] = value
}

event := baseEvent
event.ProfileSample = &model.ProfileSample{
Timestamp: profileTimestamp,
Duration: time.Duration(pp.DurationNanos),
ProfileID: profileID,
Stack: stack,
Labels: labels,
Values: values,
}
out = append(out, event)
Expand Down
9 changes: 5 additions & 4 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,17 @@ func newTestBeater(
Logger: logger,
WrapRunServer: func(runServer RunServerFunc) RunServerFunc {
var processor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error {
for _, event := range *batch {
for i := range *batch {
event := &(*batch)[i]
if event.Transaction == nil {
continue
}
// Add a label to test that everything
// goes through the wrapped reporter.
if event.Transaction.Labels == nil {
event.Transaction.Labels = common.MapStr{}
if event.Labels == nil {
event.Labels = common.MapStr{}
}
event.Transaction.Labels["wrapped_reporter"] = true
event.Labels["wrapped_reporter"] = true
}
return nil
}
Expand Down
37 changes: 20 additions & 17 deletions model/apmevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package model

import (
"context"
"time"

"github.com/elastic/apm-server/utility"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)
Expand All @@ -46,11 +48,10 @@ type APMEvent struct {
Cloud Cloud
Network Network

// Timestamp holds the event timestamp.
Timestamp time.Time

// Labels holds labels to apply to the event.
//
// TODO(axw) remove Transaction.Labels, Span.Labels, etc.,
// and merge into these labels at decoding time. There can
// be only one.
Labels common.MapStr

Transaction *Transaction
Expand All @@ -61,30 +62,32 @@ type APMEvent struct {
}

func (e *APMEvent) appendBeatEvent(ctx context.Context, out []beat.Event) []beat.Event {
var event beat.Event
var eventLabels common.MapStr
event := beat.Event{Timestamp: e.Timestamp}
switch {
case e.Transaction != nil:
event = e.Transaction.toBeatEvent()
eventLabels = e.Transaction.Labels
event.Fields = e.Transaction.fields()
case e.Span != nil:
event = e.Span.toBeatEvent(ctx)
eventLabels = e.Span.Labels
event.Fields = e.Span.fields()
case e.Metricset != nil:
event = e.Metricset.toBeatEvent()
eventLabels = e.Metricset.Labels
event.Fields = e.Metricset.fields()
case e.Error != nil:
event = e.Error.toBeatEvent(ctx)
eventLabels = e.Error.Labels
event.Fields = e.Error.fields()
case e.ProfileSample != nil:
event = e.ProfileSample.toBeatEvent()
eventLabels = e.ProfileSample.Labels
event.Fields = e.ProfileSample.fields()
default:
return out
}

// Set high resolution timestamp.
//
// TODO(axw) change @timestamp to use date_nanos, and remove this field.
if !e.Timestamp.IsZero() && (e.Transaction != nil || e.Span != nil || e.Error != nil) {
event.Fields["timestamp"] = utility.TimeAsMicros(e.Timestamp)
}

// Set fields common to all events.
fields := (*mapStr)(&event.Fields)
event.Timestamp = e.Timestamp
e.DataStream.setFields(fields)
fields.maybeSetMapStr("service", e.Service.Fields())
fields.maybeSetMapStr("agent", e.Agent.fields())
Expand All @@ -104,6 +107,6 @@ func (e *APMEvent) appendBeatEvent(ctx context.Context, out []beat.Event) []beat
fields.maybeSetMapStr("kubernetes", e.Kubernetes.fields())
fields.maybeSetMapStr("cloud", e.Cloud.fields())
fields.maybeSetMapStr("network", e.Network.fields())
maybeSetLabels(fields, e.Labels, eventLabels)
fields.maybeSetMapStr("labels", sanitizeLabels(e.Labels))
return append(out, event)
}
20 changes: 10 additions & 10 deletions model/apmevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package model
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -57,13 +58,12 @@ func TestAPMEventFields(t *testing.T) {
Hostname: hostname,
Name: host,
},
Client: Client{Domain: "client.domain"},
Process: Process{Pid: pid},
User: User{ID: uid, Email: mail},
Labels: common.MapStr{"a": "a1", "b": "b1"},
Transaction: &Transaction{
Labels: common.MapStr{"b": "b2", "c": "c2"},
},
Client: Client{Domain: "client.domain"},
Process: Process{Pid: pid},
User: User{ID: uid, Email: mail},
Labels: common.MapStr{"a": "b", "c": 123},
Transaction: &Transaction{},
Timestamp: time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, time.FixedZone("+0100", 3600)),
},
output: common.MapStr{
// common fields
Expand All @@ -79,9 +79,8 @@ func TestAPMEventFields(t *testing.T) {
"client": common.MapStr{"domain": "client.domain"},
"source": common.MapStr{"domain": "client.domain"},
"labels": common.MapStr{
"a": "a1",
"b": "b2",
"c": "c2",
"a": "b",
"c": 123,
},

// fields related to APMEvent.Transaction
Expand All @@ -90,6 +89,7 @@ func TestAPMEventFields(t *testing.T) {
"name": "transaction",
"event": "transaction",
},
"timestamp": common.MapStr{"us": int64(1546525024908596)},
"transaction": common.MapStr{
"duration": common.MapStr{"us": 0},
"sampled": false,
Expand Down
44 changes: 11 additions & 33 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,8 @@
package model

import (
"context"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/utility"
)

var (
Expand All @@ -48,11 +42,8 @@ type Error struct {
TraceID string
ParentID string

Timestamp time.Time

GroupingKey string
Culprit string
Labels common.MapStr
Page *Page
HTTP *HTTP
URL *URL
Expand Down Expand Up @@ -87,7 +78,7 @@ type Log struct {
Stacktrace Stacktrace
}

func (e *Error) toBeatEvent(ctx context.Context) beat.Event {
func (e *Error) fields() common.MapStr {
errorTransformations.Inc()

if e.Exception != nil {
Expand All @@ -97,11 +88,7 @@ func (e *Error) toBeatEvent(ctx context.Context) beat.Event {
addStacktraceCounter(e.Log.Stacktrace)
}

fields := mapStr{
"error": e.fields(),
"processor": errorProcessorEntry,
}

fields := mapStr{"processor": errorProcessorEntry}
if e.HTTP != nil {
fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields())
}
Expand All @@ -123,28 +110,19 @@ func (e *Error) toBeatEvent(ctx context.Context) beat.Event {
trace.maybeSetString("id", e.TraceID)
fields.maybeSetMapStr("parent", common.MapStr(parent))
fields.maybeSetMapStr("trace", common.MapStr(trace))
fields.maybeSetMapStr("timestamp", utility.TimeAsMicros(e.Timestamp))

return beat.Event{
Fields: common.MapStr(fields),
Timestamp: e.Timestamp,
}
}

func (e *Error) fields() common.MapStr {
var fields mapStr
fields.maybeSetString("id", e.ID)
fields.maybeSetMapStr("page", e.Page.Fields())

var errorFields mapStr
errorFields.maybeSetString("id", e.ID)
errorFields.maybeSetMapStr("page", e.Page.Fields())
exceptionChain := flattenExceptionTree(e.Exception)
if exception := e.exceptionFields(exceptionChain); len(exception) > 0 {
fields.set("exception", exception)
errorFields.set("exception", exception)
}
fields.maybeSetMapStr("log", e.logFields())

fields.maybeSetString("culprit", e.Culprit)
fields.maybeSetMapStr("custom", customFields(e.Custom))
fields.maybeSetString("grouping_key", e.GroupingKey)
errorFields.maybeSetMapStr("log", e.logFields())
errorFields.maybeSetString("culprit", e.Culprit)
errorFields.maybeSetMapStr("custom", customFields(e.Custom))
errorFields.maybeSetString("grouping_key", e.GroupingKey)
fields.set("error", common.MapStr(errorFields))
return common.MapStr(fields)
}

Expand Down
17 changes: 6 additions & 11 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package model

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -158,7 +156,6 @@ func TestEventFields(t *testing.T) {
"withFrames": {
Error: Error{
ID: id,
Timestamp: time.Now(),
Culprit: culprit,
Exception: &exception,
Log: &log,
Expand Down Expand Up @@ -191,9 +188,8 @@ func TestEventFields(t *testing.T) {

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
output := tc.Error.toBeatEvent(context.Background())
fields := output.Fields["error"]
assert.Equal(t, tc.Output, fields)
fields := tc.Error.fields()
assert.Equal(t, tc.Output, fields["error"])
})
}
}
Expand All @@ -209,9 +205,8 @@ func TestErrorTransformPage(t *testing.T) {
}{
{
Error: Error{
ID: id,
Timestamp: time.Now(),
URL: ParseURL("https://localhost:8200/", "", ""),
ID: id,
URL: ParseURL("https://localhost:8200/", "", ""),
Page: &Page{
URL: ParseURL(urlExample, "", ""),
},
Expand All @@ -229,7 +224,7 @@ func TestErrorTransformPage(t *testing.T) {
}

for idx, test := range tests {
output := test.Error.toBeatEvent(context.Background())
assert.Equal(t, test.Output, output.Fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
fields := test.Error.fields()
assert.Equal(t, test.Output, fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
}
}
Loading

0 comments on commit 3836710

Please sign in to comment.