Skip to content

Commit

Permalink
[receiver/k8sclusterreceiver] Begin emitting entity events as logs
Browse files Browse the repository at this point in the history
Resolves open-telemetry#24400

This is part 3 of the work to move to entity events-as-log-records in K8s cluster receiver:
open-telemetry#19741

Overall design document:
https://docs.google.com/document/d/1Tg18sIck3Nakxtd3TFFcIjrmRO_0GLMdHXylVqBQmJA/

Example log record emitted when a pod changes:
```
ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 1970-01-01 00:00:00 +0000 UTC
SeverityText:
SeverityNumber: Unspecified(0)
Body: Empty()
Attributes:
     -> otel.entity.id: Map({"k8s.pod.uid":"07cc87d9-8e76-4472-b5ee-c9ecbad94ea9"})
     -> otel.entity.event.type: Str(entity_state)
     -> otel.entity.type: Str(k8s.pod)
     -> otel.entity.attributes: Map({"k8s-app":"kubernetes-dashboard","k8s.deployment.name":"kubernetes-dashboard","k8s.deployment.uid":"4c1ee765-906b-498b-80b5-bea67a714fce","k8s.replicaset.name":"kubernetes-dashboard-6c7ccbcf87","k8s.replicasetuid":"e8c052b4-c1db-43bd-806d-f85d8a861f5b","k8s.service.kubernetes-dashboard":"","k8s.workload.kind":"Deployment","k8s.workload.name":"kubernetes-dashboard","pod-template-hash":"6c7ccbcf87","podcreation_timestamp":"2023-06-30T11:32:00-04:00"})
Trace ID:
Span ID:
Flags: 0
```
  • Loading branch information
tigrannajaryan committed Jul 20, 2023
1 parent 247e2bb commit 2daab45
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 34 deletions.
20 changes: 20 additions & 0 deletions .chloggen/k8scluster-receiver-emit-entityevents.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sclusterreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: k8sclusterreceiver - Begin emitting entity events as logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24400]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
18 changes: 18 additions & 0 deletions pkg/experimentalmetricmetadata/entity_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const (
semconvOtelEntityID = "otel.entity.id"
semconvOtelEntityType = "otel.entity.type"
semconvOtelEntityAttributes = "otel.entity.attributes"

semconvOtelEntityEventAsScope = "otel.entity.event_as_log"
)

// EntityEventsSlice is a slice of EntityEvent.
Expand Down Expand Up @@ -52,6 +54,22 @@ func (s EntityEventsSlice) At(i int) EntityEvent {
return EntityEvent{orig: s.orig.At(i)}
}

// ConvertAndMoveToLogs converts entity events to log representation and moves them
// from this EntityEventsSlice into plog.Logs. This slice becomes empty after this call.
func (s EntityEventsSlice) ConvertAndMoveToLogs() plog.Logs {
logs := plog.NewLogs()

scopeLogs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty()

// Set the scope marker.
scopeLogs.Scope().Attributes().PutBool(semconvOtelEntityEventAsScope, true)

// Move all events. Note that this remove all
s.orig.MoveAndAppendTo(scopeLogs.LogRecords())

return logs
}

// EntityEvent is an entity event.
type EntityEvent struct {
orig plog.LogRecord
Expand Down
54 changes: 54 additions & 0 deletions pkg/experimentalmetricmetadata/entity_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,60 @@ func Test_EntityEventsSlice(t *testing.T) {
assert.Equal(t, 1, slice.Len())
}

func Test_EntityEventsSlice_ConvertAndMoveToLogs(t *testing.T) {
// Prepare an event slice.
slice := NewEntityEventsSlice()
event := slice.AppendEmpty()

event.ID().PutStr("k8s.pod.uid", "123")
state := event.SetEntityState()
state.SetEntityType("k8s.pod")
state.Attributes().PutStr("label1", "value1")

event = slice.AppendEmpty()
event.ID().PutStr("k8s.node.uid", "abc")
event.SetEntityDelete()

// Convert to logs.
logs := slice.ConvertAndMoveToLogs()

// Check that all 2 events are moved.
assert.Equal(t, 0, slice.Len())
assert.Equal(t, 2, logs.LogRecordCount())

assert.Equal(t, 1, logs.ResourceLogs().Len())

scopeLogs := logs.ResourceLogs().At(0).ScopeLogs().At(0)

// Check the Scope
v, ok := scopeLogs.Scope().Attributes().Get(semconvOtelEntityEventAsScope)
assert.True(t, ok)
assert.Equal(t, true, v.Bool())

records := scopeLogs.LogRecords()
assert.Equal(t, 2, records.Len())

// Check the first event.
attrs := records.At(0).Attributes().AsRaw()
assert.EqualValues(
t, map[string]any{
semconvOtelEntityEventName: semconvEventEntityEventState,
semconvOtelEntityType: "k8s.pod",
semconvOtelEntityID: map[string]any{"k8s.pod.uid": "123"},
semconvOtelEntityAttributes: map[string]any{"label1": "value1"},
}, attrs,
)

// Check the second event.
attrs = records.At(1).Attributes().AsRaw()
assert.EqualValues(
t, map[string]any{
semconvOtelEntityEventName: semconvEventEntityEventDelete,
semconvOtelEntityID: map[string]any{"k8s.node.uid": "abc"},
}, attrs,
)
}

func Test_EntityEventType(t *testing.T) {
lr := plog.NewLogRecord()
e := EntityEvent{lr}
Expand Down
11 changes: 10 additions & 1 deletion receiver/k8sclusterreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
| Status | |
| ------------- |-----------|
| Stability | [beta]: metrics |
| | [development]: logs |
| Distributions | [contrib], [observiq], [splunk], [sumo] |
| Issues | ![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fk8scluster%20&label=open&color=orange&logo=opentelemetry) ![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fk8scluster%20&label=closed&color=blue&logo=opentelemetry) |

[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[observiq]: https://github.com/observIQ/observiq-otel-collector
[splunk]: https://github.com/signalfx/splunk-otel-collector
[sumo]: https://github.com/SumoLogic/sumologic-otel-collector
<!-- end autogenerated section -->

The Kubernetes Cluster receiver collects cluster-level metrics from the Kubernetes
The Kubernetes Cluster receiver collects cluster-level metrics and entity events from the Kubernetes
API server. It uses the K8s API to listen for updates. A single instance of this
receiver can be used to monitor a cluster.

Expand Down Expand Up @@ -107,6 +109,10 @@ type MetadataDelta struct {

See [here](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/experimentalmetricmetadata/metadata.go) for details about the above types.

The same metadata will be also emitted as entity events in the form of log records if
this receiver is connected to a logs pipeline.
See [here](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23565)
for the format of emitted log records.

## Example

Expand Down Expand Up @@ -139,6 +145,9 @@ data:
metrics:
receivers: [k8s_cluster]
exporters: [logging]
logs/entity_events:
receivers: [k8s_cluster]
exporters: [logging]
EOF
```

Expand Down
11 changes: 10 additions & 1 deletion receiver/k8sclusterreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
)

Expand Down Expand Up @@ -41,5 +42,13 @@ func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithMetrics(newReceiver, metadata.MetricsStability))
receiver.WithMetrics(newMetricsReceiver, metadata.MetricsStability),
receiver.WithLogs(newLogsReceiver, metadata.MetricsStability),
)
}

// This is the map of already created k8scluster receivers for particular configurations.
// We maintain this map because the Factory is asked log and metric receivers separately
// when it gets CreateLogsReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one receiver object per configuration.
var receivers = sharedcomponent.NewSharedComponents()
28 changes: 27 additions & 1 deletion receiver/k8sclusterreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

quotaclientset "github.com/openshift/client-go/quota/clientset/versioned"
fakeQuota "github.com/openshift/client-go/quota/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -19,6 +20,7 @@ import (
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
)

func TestFactory(t *testing.T) {
Expand Down Expand Up @@ -80,7 +82,7 @@ func TestFactoryDistributions(t *testing.T) {
}

func newTestReceiver(t *testing.T, cfg *Config) *kubernetesReceiver {
r, err := newReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, consumertest.NewNop())
r, err := newReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, r)
rcvr, ok := r.(*kubernetesReceiver)
Expand Down Expand Up @@ -111,3 +113,27 @@ func (n *nopHostWithExporters) GetExporters() map[component.DataType]map[compone
},
}
}

func TestNewSharedReceiver(t *testing.T) {
f := NewFactory()
cfg := f.CreateDefaultConfig()

mc := consumertest.NewNop()
mr, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, mc)
require.NoError(t, err)

// Verify that the metric consumer is correctly set.
kr := mr.(*sharedcomponent.SharedComponent).Unwrap().(*kubernetesReceiver)
assert.Equal(t, mc, kr.metricsConsumer)

lc := consumertest.NewNop()
lr, err := newLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, lc)
require.NoError(t, err)

// Verify that the log consumer is correct set.
kr = lr.(*sharedcomponent.SharedComponent).Unwrap().(*kubernetesReceiver)
assert.Equal(t, lc, kr.resourceWatcher.entityLogConsumer)

// Make sure only one receiver is created both for metrics and logs.
assert.Equal(t, mr, lr)
}
3 changes: 3 additions & 0 deletions receiver/k8sclusterreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.81.0
Expand Down Expand Up @@ -160,3 +161,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil
replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest => ../../internal/k8stest

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions receiver/k8sclusterreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ status:
class: receiver
stability:
beta: [metrics]
development: [logs]
distributions: [contrib, splunk, observiq, sumo]

73 changes: 58 additions & 15 deletions receiver/k8sclusterreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ var _ receiver.Metrics = (*kubernetesReceiver)(nil)
type kubernetesReceiver struct {
resourceWatcher *resourceWatcher

config *Config
settings receiver.CreateSettings
consumer consumer.Metrics
cancel context.CancelFunc
obsrecv *obsreport.Receiver
config *Config
settings receiver.CreateSettings
metricsConsumer consumer.Metrics
cancel context.CancelFunc
obsrecv *obsreport.Receiver
}

func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) error {
Expand Down Expand Up @@ -97,33 +97,76 @@ func (kr *kubernetesReceiver) Shutdown(context.Context) error {
}

func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) {
if kr.metricsConsumer == nil {
// Metric collection is not enabled.
return
}

now := time.Now()
mds := kr.resourceWatcher.dataCollector.CollectMetricData(now)

c := kr.obsrecv.StartMetricsOp(ctx)

numPoints := mds.DataPointCount()
err := kr.consumer.ConsumeMetrics(c, mds)
err := kr.metricsConsumer.ConsumeMetrics(c, mds)
kr.obsrecv.EndMetricsOp(c, metadata.Type, numPoints, err)
}

// newReceiver creates the Kubernetes cluster receiver with the given configuration.
func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) {
rCfg := cfg.(*Config)
// newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration.
func newMetricsReceiver(
ctx context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics,
) (receiver.Metrics, error) {
var err error
r := receivers.GetOrAdd(
cfg, func() component.Component {
var rcv component.Component
rcv, err = newReceiver(ctx, set, cfg)
return rcv
},
)
if err != nil {
return nil, err
}
r.Unwrap().(*kubernetesReceiver).metricsConsumer = consumer
return r, nil
}

obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: transport,
ReceiverCreateSettings: set,
})
// newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration.
func newLogsReceiver(
ctx context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Logs,
) (receiver.Logs, error) {
var err error
r := receivers.GetOrAdd(
cfg, func() component.Component {
var rcv component.Component
rcv, err = newReceiver(ctx, set, cfg)
return rcv
},
)
if err != nil {
return nil, err
}
r.Unwrap().(*kubernetesReceiver).resourceWatcher.entityLogConsumer = consumer
return r, nil
}

// newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration.
func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) {
rCfg := cfg.(*Config)
obsrecv, err := obsreport.NewReceiver(
obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: transport,
ReceiverCreateSettings: set,
},
)
if err != nil {
return nil, err
}
return &kubernetesReceiver{
resourceWatcher: newResourceWatcher(set, rCfg),
settings: set,
config: rCfg,
consumer: consumer,
obsrecv: obsrecv,
}, nil
}
Loading

0 comments on commit 2daab45

Please sign in to comment.