Skip to content

Commit

Permalink
add service name to dataset field
Browse files Browse the repository at this point in the history
  • Loading branch information
jalvz committed Feb 15, 2021
1 parent 9ad7072 commit 5833656
Show file tree
Hide file tree
Showing 43 changed files with 107 additions and 101 deletions.
1 change: 0 additions & 1 deletion datastreams/servicename.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import "strings"
// Concretely, this function will lowercase the string and replace any
// reserved characters with "_".
//
// TODO: use when Fleet supports variables in data streams (see #4492)
func NormalizeServiceName(s string) string {
s = strings.ToLower(s)
s = strings.Map(replaceReservedRune, s)
Expand Down
3 changes: 2 additions & 1 deletion model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Eve
// By storing errors in a "logs" data stream, they can be viewed in the Logs app
// in Kibana.
fields[datastreams.TypeField] = datastreams.LogsType
fields[datastreams.DatasetField] = ErrorsDataset
dataset := fmt.Sprintf("%s.%s", ErrorsDataset, datastreams.NormalizeServiceName(e.Metadata.Service.Name))
fields[datastreams.DatasetField] = dataset
}

// first set the generic metadata (order is relevant)
Expand Down
8 changes: 4 additions & 4 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func TestEvents(t *testing.T) {
Transformable: &Error{Timestamp: timestamp, Metadata: md},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.myservice",
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"error": common.MapStr{
Expand All @@ -323,7 +323,7 @@ func TestEvents(t *testing.T) {
Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.myservice",
"transaction": common.MapStr{"sampled": false},
"agent": common.MapStr{"name": "go", "version": "1.0"},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
Expand All @@ -339,7 +339,7 @@ func TestEvents(t *testing.T) {
Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionType: &transactionType},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.myservice",
"transaction": common.MapStr{"type": "request"},
"error": common.MapStr{
"grouping_key": "d41d8cd98f00b204e9800998ecf8427e",
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestEvents(t *testing.T) {

Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.myservice",
"labels": common.MapStr{"key": true, "label": 101},
"service": common.MapStr{"name": "myservice", "version": "1.0"},
"agent": common.MapStr{"name": "go", "version": "1.0"},
Expand Down
7 changes: 4 additions & 3 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,16 @@ func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []bea
fields["processor"] = metricsetProcessorEntry

if cfg.DataStreams {
dataset := AppMetricsDataset
// Metrics are stored in "metrics" data streams.
if isInternal {
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
// be stored separately from application and runtime metrics.
fields[datastreams.DatasetField] = InternalMetricsDataset
} else {
fields[datastreams.DatasetField] = AppMetricsDataset
dataset = InternalMetricsDataset
}
dataset += "." + datastreams.NormalizeServiceName(me.Metadata.Service.Name)
fields[datastreams.DatasetField] = dataset
fields[datastreams.TypeField] = datastreams.MetricsType
}

Expand Down
10 changes: 5 additions & 5 deletions model/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestTransform(t *testing.T) {
Output: []common.MapStr{
{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.app",
"data_stream.dataset": "apm.app.myservice",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{
"name": "myservice",
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestTransform(t *testing.T) {
Output: []common.MapStr{
{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.app",
"data_stream.dataset": "apm.app.myservice",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"labels": common.MapStr{"a_b": "a.b.value"},
Expand All @@ -116,7 +116,7 @@ func TestTransform(t *testing.T) {
Output: []common.MapStr{
{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.internal",
"data_stream.dataset": "apm.internal.myservice",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"transaction": common.MapStr{"type": trType, "name": trName},
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestTransform(t *testing.T) {
Output: []common.MapStr{
{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.internal",
"data_stream.dataset": "apm.internal.myservice",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"event": common.MapStr{"outcome": eventOutcome},
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestTransform(t *testing.T) {
Output: []common.MapStr{
{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.internal",
"data_stream.dataset": "apm.internal.myservice",
"processor": common.MapStr{"event": "metric", "name": "metric"},
"service": common.MapStr{"name": "myservice"},
"span": common.MapStr{"type": spType, "subtype": spSubtype,
Expand Down
3 changes: 2 additions & 1 deletion model/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []b
}
if cfg.DataStreams {
event.Fields[datastreams.TypeField] = datastreams.MetricsType
event.Fields[datastreams.DatasetField] = ProfilesDataset
dataset := fmt.Sprintf("%s.%s", ProfilesDataset, datastreams.NormalizeServiceName(pp.Metadata.Service.Name))
event.Fields[datastreams.DatasetField] = dataset
}
var profileLabels common.MapStr
if len(sample.Label) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion model/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestPprofProfileTransform(t *testing.T) {
Timestamp: timestamp,
Fields: common.MapStr{
"data_stream.type": "metrics",
"data_stream.dataset": "apm.profiling",
"data_stream.dataset": "apm.profiling.myservice",
"processor": common.MapStr{"event": "profile", "name": "profile"},
"service": common.MapStr{
"name": "myService",
Expand Down
4 changes: 3 additions & 1 deletion model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package model

import (
"context"
"fmt"
"net"
"time"

Expand Down Expand Up @@ -198,7 +199,8 @@ func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Even
if cfg.DataStreams {
// Spans are stored in a "traces" data stream along with transactions.
fields[datastreams.TypeField] = datastreams.TracesType
fields[datastreams.DatasetField] = TracesDataset
dataset := fmt.Sprintf("%s.%s", TracesDataset, datastreams.NormalizeServiceName(e.Metadata.Service.Name))
fields[datastreams.DatasetField] = dataset
}

// first set the generic metadata
Expand Down
6 changes: 3 additions & 3 deletions model/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestSpanTransform(t *testing.T) {
Span: Span{Timestamp: timestamp, Metadata: metadata},
Output: common.MapStr{
"data_stream.type": "traces",
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.myservice",
"processor": common.MapStr{"event": "span", "name": "transaction"},
"service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion},
"span": common.MapStr{
Expand All @@ -77,7 +77,7 @@ func TestSpanTransform(t *testing.T) {
Span: Span{Timestamp: timestamp, Metadata: metadata, Outcome: "success"},
Output: common.MapStr{
"data_stream.type": "traces",
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.myservice",
"processor": common.MapStr{"event": "span", "name": "transaction"},
"service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion},
"span": common.MapStr{
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestSpanTransform(t *testing.T) {
},
Output: common.MapStr{
"data_stream.type": "traces",
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.myservice",
"span": common.MapStr{
"id": hexID,
"duration": common.MapStr{"us": 1200},
Expand Down
4 changes: 3 additions & 1 deletion model/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package model

import (
"context"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -121,7 +122,8 @@ func (e *Transaction) Transform(_ context.Context, cfg *transform.Config) []beat
if cfg.DataStreams {
// Transactions are stored in a "traces" data stream along with spans.
fields[datastreams.TypeField] = datastreams.TracesType
fields[datastreams.DatasetField] = TracesDataset
dataset := fmt.Sprintf("%s.%s", TracesDataset, datastreams.NormalizeServiceName(e.Metadata.Service.Name))
fields[datastreams.DatasetField] = dataset
}

// first set generic metadata (order is relevant)
Expand Down
2 changes: 1 addition & 1 deletion model/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestEventsTransformWithMetadata(t *testing.T) {
require.Len(t, events, 1)
assert.Equal(t, events[0].Fields, common.MapStr{
"data_stream.type": "traces",
"data_stream.dataset": "apm",
"data_stream.dataset": "apm." + serviceName,
"user": common.MapStr{"id": "123", "name": "jane"},
"client": common.MapStr{"ip": ip},
"source": common.MapStr{"ip": ip},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
Expand Down Expand Up @@ -46,7 +46,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
Expand Down Expand Up @@ -90,7 +90,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "3.4.12"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger/PHP",
"version": "3.4.12"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
Expand Down
2 changes: 1 addition & 1 deletion processor/otel/test_approved/metadata_jaeger.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"name": "Jaeger/C++",
"version": "3.2.1"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.foo",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
Expand Down
2 changes: 1 addition & 1 deletion processor/otel/test_approved/span_jaeger_db.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"destination": {
"address": "db",
Expand Down
14 changes: 7 additions & 7 deletions processor/otel/test_approved/span_jaeger_http.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"destination": {
"address": "foo.bar.com",
Expand Down Expand Up @@ -80,7 +80,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.unknown",
"data_stream.type": "logs",
"error": {
"exception": [
Expand Down Expand Up @@ -141,7 +141,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.unknown",
"data_stream.type": "logs",
"error": {
"grouping_key": "23b7ac1bdf1ca957f9f581cfadee467c",
Expand Down Expand Up @@ -197,7 +197,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.unknown",
"data_stream.type": "logs",
"error": {
"exception": [
Expand Down Expand Up @@ -255,7 +255,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.unknown",
"data_stream.type": "logs",
"error": {
"exception": [
Expand Down Expand Up @@ -313,7 +313,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.unknown",
"data_stream.type": "logs",
"error": {
"exception": [
Expand Down Expand Up @@ -371,7 +371,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm.error",
"data_stream.dataset": "apm.error.unknown",
"data_stream.type": "logs",
"error": {
"grouping_key": "c9221918248f05433f6b81c46a666aee",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"destination": {
"address": "foo.bar.com",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"destination": {
"address": "foo.bar.com",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"destination": {
"address": "mq",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "Jaeger",
"version": "unknown"
},
"data_stream.dataset": "apm",
"data_stream.dataset": "apm.unknown",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
Expand Down
Loading

0 comments on commit 5833656

Please sign in to comment.