From 2daab45277c52ab9dcdcf2dc8b7c84499eefa0a9 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Thu, 20 Jul 2023 14:02:58 -0400 Subject: [PATCH] [receiver/k8sclusterreceiver] Begin emitting entity events as logs Resolves https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24400 This is part 3 of the work to move to entity events-as-log-records in K8s cluster receiver: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/19741 Overall design document: https://docs.google.com/document/d/1Tg18sIck3Nakxtd3TFFcIjrmRO_0GLMdHXylVqBQmJA/ Example log record emitted when a pod changes: ``` ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC Timestamp: 1970-01-01 00:00:00 +0000 UTC SeverityText: SeverityNumber: Unspecified(0) Body: Empty() Attributes: -> otel.entity.id: Map({"k8s.pod.uid":"07cc87d9-8e76-4472-b5ee-c9ecbad94ea9"}) -> otel.entity.event.type: Str(entity_state) -> otel.entity.type: Str(k8s.pod) -> otel.entity.attributes: Map({"k8s-app":"kubernetes-dashboard","k8s.deployment.name":"kubernetes-dashboard","k8s.deployment.uid":"4c1ee765-906b-498b-80b5-bea67a714fce","k8s.replicaset.name":"kubernetes-dashboard-6c7ccbcf87","k8s.replicasetuid":"e8c052b4-c1db-43bd-806d-f85d8a861f5b","k8s.service.kubernetes-dashboard":"","k8s.workload.kind":"Deployment","k8s.workload.name":"kubernetes-dashboard","pod-template-hash":"6c7ccbcf87","podcreation_timestamp":"2023-06-30T11:32:00-04:00"}) Trace ID: Span ID: Flags: 0 ``` --- ...k8scluster-receiver-emit-entityevents.yaml | 20 +++++ .../entity_events.go | 18 +++++ .../entity_events_test.go | 54 ++++++++++++++ receiver/k8sclusterreceiver/README.md | 11 ++- receiver/k8sclusterreceiver/factory.go | 11 ++- receiver/k8sclusterreceiver/factory_test.go | 28 ++++++- receiver/k8sclusterreceiver/go.mod | 3 + .../internal/metadata/generated_status.go | 1 + receiver/k8sclusterreceiver/metadata.yaml | 1 + receiver/k8sclusterreceiver/receiver.go | 73 +++++++++++++++---- receiver/k8sclusterreceiver/receiver_test.go | 36 ++++++--- receiver/k8sclusterreceiver/watcher.go | 54 ++++++++++++-- receiver/k8sclusterreceiver/watcher_test.go | 73 +++++++++++++++++++ 13 files changed, 349 insertions(+), 34 deletions(-) create mode 100644 .chloggen/k8scluster-receiver-emit-entityevents.yaml diff --git a/.chloggen/k8scluster-receiver-emit-entityevents.yaml b/.chloggen/k8scluster-receiver-emit-entityevents.yaml new file mode 100644 index 000000000000..5a46871ec80f --- /dev/null +++ b/.chloggen/k8scluster-receiver-emit-entityevents.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sclusterreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: k8sclusterreceiver - Begin emitting entity events as logs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24400] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/experimentalmetricmetadata/entity_events.go b/pkg/experimentalmetricmetadata/entity_events.go index acaa6d76e078..5edb703cc8aa 100644 --- a/pkg/experimentalmetricmetadata/entity_events.go +++ b/pkg/experimentalmetricmetadata/entity_events.go @@ -19,6 +19,8 @@ const ( semconvOtelEntityID = "otel.entity.id" semconvOtelEntityType = "otel.entity.type" semconvOtelEntityAttributes = "otel.entity.attributes" + + semconvOtelEntityEventAsScope = "otel.entity.event_as_log" ) // EntityEventsSlice is a slice of EntityEvent. @@ -52,6 +54,22 @@ func (s EntityEventsSlice) At(i int) EntityEvent { return EntityEvent{orig: s.orig.At(i)} } +// ConvertAndMoveToLogs converts entity events to log representation and moves them +// from this EntityEventsSlice into plog.Logs. This slice becomes empty after this call. +func (s EntityEventsSlice) ConvertAndMoveToLogs() plog.Logs { + logs := plog.NewLogs() + + scopeLogs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() + + // Set the scope marker. + scopeLogs.Scope().Attributes().PutBool(semconvOtelEntityEventAsScope, true) + + // Move all events. Note that this remove all + s.orig.MoveAndAppendTo(scopeLogs.LogRecords()) + + return logs +} + // EntityEvent is an entity event. type EntityEvent struct { orig plog.LogRecord diff --git a/pkg/experimentalmetricmetadata/entity_events_test.go b/pkg/experimentalmetricmetadata/entity_events_test.go index 83a00fbee142..7ded1c7bb503 100644 --- a/pkg/experimentalmetricmetadata/entity_events_test.go +++ b/pkg/experimentalmetricmetadata/entity_events_test.go @@ -58,6 +58,60 @@ func Test_EntityEventsSlice(t *testing.T) { assert.Equal(t, 1, slice.Len()) } +func Test_EntityEventsSlice_ConvertAndMoveToLogs(t *testing.T) { + // Prepare an event slice. + slice := NewEntityEventsSlice() + event := slice.AppendEmpty() + + event.ID().PutStr("k8s.pod.uid", "123") + state := event.SetEntityState() + state.SetEntityType("k8s.pod") + state.Attributes().PutStr("label1", "value1") + + event = slice.AppendEmpty() + event.ID().PutStr("k8s.node.uid", "abc") + event.SetEntityDelete() + + // Convert to logs. + logs := slice.ConvertAndMoveToLogs() + + // Check that all 2 events are moved. + assert.Equal(t, 0, slice.Len()) + assert.Equal(t, 2, logs.LogRecordCount()) + + assert.Equal(t, 1, logs.ResourceLogs().Len()) + + scopeLogs := logs.ResourceLogs().At(0).ScopeLogs().At(0) + + // Check the Scope + v, ok := scopeLogs.Scope().Attributes().Get(semconvOtelEntityEventAsScope) + assert.True(t, ok) + assert.Equal(t, true, v.Bool()) + + records := scopeLogs.LogRecords() + assert.Equal(t, 2, records.Len()) + + // Check the first event. + attrs := records.At(0).Attributes().AsRaw() + assert.EqualValues( + t, map[string]any{ + semconvOtelEntityEventName: semconvEventEntityEventState, + semconvOtelEntityType: "k8s.pod", + semconvOtelEntityID: map[string]any{"k8s.pod.uid": "123"}, + semconvOtelEntityAttributes: map[string]any{"label1": "value1"}, + }, attrs, + ) + + // Check the second event. + attrs = records.At(1).Attributes().AsRaw() + assert.EqualValues( + t, map[string]any{ + semconvOtelEntityEventName: semconvEventEntityEventDelete, + semconvOtelEntityID: map[string]any{"k8s.node.uid": "abc"}, + }, attrs, + ) +} + func Test_EntityEventType(t *testing.T) { lr := plog.NewLogRecord() e := EntityEvent{lr} diff --git a/receiver/k8sclusterreceiver/README.md b/receiver/k8sclusterreceiver/README.md index d5a75f52c228..e82292bd8b71 100644 --- a/receiver/k8sclusterreceiver/README.md +++ b/receiver/k8sclusterreceiver/README.md @@ -4,17 +4,19 @@ | Status | | | ------------- |-----------| | Stability | [beta]: metrics | +| | [development]: logs | | Distributions | [contrib], [observiq], [splunk], [sumo] | | Issues | ![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fk8scluster%20&label=open&color=orange&logo=opentelemetry) ![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fk8scluster%20&label=closed&color=blue&logo=opentelemetry) | [beta]: https://github.com/open-telemetry/opentelemetry-collector#beta +[development]: https://github.com/open-telemetry/opentelemetry-collector#development [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib [observiq]: https://github.com/observIQ/observiq-otel-collector [splunk]: https://github.com/signalfx/splunk-otel-collector [sumo]: https://github.com/SumoLogic/sumologic-otel-collector -The Kubernetes Cluster receiver collects cluster-level metrics from the Kubernetes +The Kubernetes Cluster receiver collects cluster-level metrics and entity events from the Kubernetes API server. It uses the K8s API to listen for updates. A single instance of this receiver can be used to monitor a cluster. @@ -107,6 +109,10 @@ type MetadataDelta struct { See [here](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/experimentalmetricmetadata/metadata.go) for details about the above types. +The same metadata will be also emitted as entity events in the form of log records if +this receiver is connected to a logs pipeline. +See [here](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23565) +for the format of emitted log records. ## Example @@ -139,6 +145,9 @@ data: metrics: receivers: [k8s_cluster] exporters: [logging] + logs/entity_events: + receivers: [k8s_cluster] + exporters: [logging] EOF ``` diff --git a/receiver/k8sclusterreceiver/factory.go b/receiver/k8sclusterreceiver/factory.go index d72c515301fd..bc516001c6d0 100644 --- a/receiver/k8sclusterreceiver/factory.go +++ b/receiver/k8sclusterreceiver/factory.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/receiver" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) @@ -41,5 +42,13 @@ func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithMetrics(newReceiver, metadata.MetricsStability)) + receiver.WithMetrics(newMetricsReceiver, metadata.MetricsStability), + receiver.WithLogs(newLogsReceiver, metadata.MetricsStability), + ) } + +// This is the map of already created k8scluster receivers for particular configurations. +// We maintain this map because the Factory is asked log and metric receivers separately +// when it gets CreateLogsReceiver() and CreateMetricsReceiver() but they must not +// create separate objects, they must use one receiver object per configuration. +var receivers = sharedcomponent.NewSharedComponents() diff --git a/receiver/k8sclusterreceiver/factory_test.go b/receiver/k8sclusterreceiver/factory_test.go index a1c4128bd7d1..13677bbfd1d9 100644 --- a/receiver/k8sclusterreceiver/factory_test.go +++ b/receiver/k8sclusterreceiver/factory_test.go @@ -10,6 +10,7 @@ import ( quotaclientset "github.com/openshift/client-go/quota/clientset/versioned" fakeQuota "github.com/openshift/client-go/quota/clientset/versioned/fake" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -19,6 +20,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" ) func TestFactory(t *testing.T) { @@ -80,7 +82,7 @@ func TestFactoryDistributions(t *testing.T) { } func newTestReceiver(t *testing.T, cfg *Config) *kubernetesReceiver { - r, err := newReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + r, err := newReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg) require.NoError(t, err) require.NotNil(t, r) rcvr, ok := r.(*kubernetesReceiver) @@ -111,3 +113,27 @@ func (n *nopHostWithExporters) GetExporters() map[component.DataType]map[compone }, } } + +func TestNewSharedReceiver(t *testing.T) { + f := NewFactory() + cfg := f.CreateDefaultConfig() + + mc := consumertest.NewNop() + mr, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, mc) + require.NoError(t, err) + + // Verify that the metric consumer is correctly set. + kr := mr.(*sharedcomponent.SharedComponent).Unwrap().(*kubernetesReceiver) + assert.Equal(t, mc, kr.metricsConsumer) + + lc := consumertest.NewNop() + lr, err := newLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, lc) + require.NoError(t, err) + + // Verify that the log consumer is correct set. + kr = lr.(*sharedcomponent.SharedComponent).Unwrap().(*kubernetesReceiver) + assert.Equal(t, lc, kr.resourceWatcher.entityLogConsumer) + + // Make sure only one receiver is created both for metrics and logs. + assert.Equal(t, mr, lr) +} diff --git a/receiver/k8sclusterreceiver/go.mod b/receiver/k8sclusterreceiver/go.mod index b7dadf3b6d6e..6a9fd3b9c257 100644 --- a/receiver/k8sclusterreceiver/go.mod +++ b/receiver/k8sclusterreceiver/go.mod @@ -11,6 +11,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.81.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.81.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest v0.81.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.81.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.81.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.81.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.81.0 @@ -160,3 +161,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37 replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest => ../../internal/k8stest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent diff --git a/receiver/k8sclusterreceiver/internal/metadata/generated_status.go b/receiver/k8sclusterreceiver/internal/metadata/generated_status.go index 2b55ed15641c..1eaeb822b837 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/generated_status.go +++ b/receiver/k8sclusterreceiver/internal/metadata/generated_status.go @@ -9,4 +9,5 @@ import ( const ( Type = "k8s_cluster" MetricsStability = component.StabilityLevelBeta + LogsStability = component.StabilityLevelDevelopment ) diff --git a/receiver/k8sclusterreceiver/metadata.yaml b/receiver/k8sclusterreceiver/metadata.yaml index e88c6fd31027..b27e538518f9 100644 --- a/receiver/k8sclusterreceiver/metadata.yaml +++ b/receiver/k8sclusterreceiver/metadata.yaml @@ -4,5 +4,6 @@ status: class: receiver stability: beta: [metrics] + development: [logs] distributions: [contrib, splunk, observiq, sumo] diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index e2c040acc7a7..ea8977b35683 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -27,11 +27,11 @@ var _ receiver.Metrics = (*kubernetesReceiver)(nil) type kubernetesReceiver struct { resourceWatcher *resourceWatcher - config *Config - settings receiver.CreateSettings - consumer consumer.Metrics - cancel context.CancelFunc - obsrecv *obsreport.Receiver + config *Config + settings receiver.CreateSettings + metricsConsumer consumer.Metrics + cancel context.CancelFunc + obsrecv *obsreport.Receiver } func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) error { @@ -97,25 +97,69 @@ func (kr *kubernetesReceiver) Shutdown(context.Context) error { } func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) { + if kr.metricsConsumer == nil { + // Metric collection is not enabled. + return + } + now := time.Now() mds := kr.resourceWatcher.dataCollector.CollectMetricData(now) c := kr.obsrecv.StartMetricsOp(ctx) numPoints := mds.DataPointCount() - err := kr.consumer.ConsumeMetrics(c, mds) + err := kr.metricsConsumer.ConsumeMetrics(c, mds) kr.obsrecv.EndMetricsOp(c, metadata.Type, numPoints, err) } -// newReceiver creates the Kubernetes cluster receiver with the given configuration. -func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) { - rCfg := cfg.(*Config) +// newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration. +func newMetricsReceiver( + ctx context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics, +) (receiver.Metrics, error) { + var err error + r := receivers.GetOrAdd( + cfg, func() component.Component { + var rcv component.Component + rcv, err = newReceiver(ctx, set, cfg) + return rcv + }, + ) + if err != nil { + return nil, err + } + r.Unwrap().(*kubernetesReceiver).metricsConsumer = consumer + return r, nil +} - obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ - ReceiverID: set.ID, - Transport: transport, - ReceiverCreateSettings: set, - }) +// newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration. +func newLogsReceiver( + ctx context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Logs, +) (receiver.Logs, error) { + var err error + r := receivers.GetOrAdd( + cfg, func() component.Component { + var rcv component.Component + rcv, err = newReceiver(ctx, set, cfg) + return rcv + }, + ) + if err != nil { + return nil, err + } + r.Unwrap().(*kubernetesReceiver).resourceWatcher.entityLogConsumer = consumer + return r, nil +} + +// newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration. +func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) { + rCfg := cfg.(*Config) + obsrecv, err := obsreport.NewReceiver( + obsreport.ReceiverSettings{ + ReceiverID: set.ID, + Transport: transport, + ReceiverCreateSettings: set, + }, + ) if err != nil { return nil, err } @@ -123,7 +167,6 @@ func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.C resourceWatcher: newResourceWatcher(set, rCfg), settings: set, config: rCfg, - consumer: consumer, obsrecv: obsrecv, }, nil } diff --git a/receiver/k8sclusterreceiver/receiver_test.go b/receiver/k8sclusterreceiver/receiver_test.go index 4419f49c6e3e..b82bb286e4a0 100644 --- a/receiver/k8sclusterreceiver/receiver_test.go +++ b/receiver/k8sclusterreceiver/receiver_test.go @@ -39,7 +39,7 @@ func TestReceiver(t *testing.T) { osQuotaClient := fakeQuota.NewSimpleClientset() sink := new(consumertest.MetricsSink) - r := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt) + r := setupReceiver(client, osQuotaClient, sink, nil, 10*time.Second, tt) // Setup k8s resources. numPods := 2 @@ -87,7 +87,7 @@ func TestReceiverTimesOutAfterStartup(t *testing.T) { client := newFakeClientWithAllResources() // Mock initial cache sync timing out, using a small timeout. - r := setupReceiver(client, nil, consumertest.NewNop(), 1*time.Millisecond, tt) + r := setupReceiver(client, nil, consumertest.NewNop(), nil, 1*time.Millisecond, tt) createPods(t, client, 1) @@ -110,7 +110,7 @@ func TestReceiverWithManyResources(t *testing.T) { osQuotaClient := fakeQuota.NewSimpleClientset() sink := new(consumertest.MetricsSink) - r := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt) + r := setupReceiver(client, osQuotaClient, sink, nil, 10*time.Second, tt) numPods := 1000 numQuotas := 2 @@ -145,10 +145,12 @@ func TestReceiverWithMetadata(t *testing.T) { }() client := newFakeClientWithAllResources() - next := &mockExporterWithK8sMetadata{MetricsSink: new(consumertest.MetricsSink)} + metricsConsumer := &mockExporterWithK8sMetadata{MetricsSink: new(consumertest.MetricsSink)} numCalls = &atomic.Int32{} - r := setupReceiver(client, nil, next, 10*time.Second, tt) + logsConsumer := new(consumertest.LogsSink) + + r := setupReceiver(client, nil, metricsConsumer, logsConsumer, 10*time.Second, tt) r.config.MetadataExporters = []string{"nop/withmetadata"} // Setup k8s resources. @@ -163,18 +165,29 @@ func TestReceiverWithMetadata(t *testing.T) { updatedPod := getUpdatedPod(pods[0]) r.resourceWatcher.onUpdate(pods[0], updatedPod) - // Should not result in ConsumerKubernetesMetadata invocation. - r.resourceWatcher.onUpdate(pods[0], pods[0]) + // Should not result in ConsumerKubernetesMetadata invocation since the pod + // is not changed. Should result in entity event because they are emitted even + // if the entity is not changed. + r.resourceWatcher.onUpdate(updatedPod, updatedPod) deletePods(t, client, 1) // Ensure ConsumeKubernetesMetadata is called twice, once for the add and - // then for the update. + // then for the update. Note the second update does not result in metatada call + // since the pod is not changed. require.Eventually(t, func() bool { return int(numCalls.Load()) == 2 }, 10*time.Second, 100*time.Millisecond, "metadata not collected") + // Must have 3 entity events: once for the add, followed by an update and + // then another update, which unlike metadata calls actually happens since + // even unchanged entities trigger an event. + require.Eventually(t, func() bool { + return logsConsumer.LogRecordCount() == 3 + }, 10*time.Second, 100*time.Millisecond, + "entity events not collected") + require.NoError(t, r.Shutdown(ctx)) } @@ -194,7 +207,8 @@ func getUpdatedPod(pod *corev1.Pod) interface{} { func setupReceiver( client *fake.Clientset, osQuotaClient quotaclientset.Interface, - consumer consumer.Metrics, + metricsConsumer consumer.Metrics, + logsConsumer consumer.Logs, initialSyncTimeout time.Duration, tt obsreporttest.TestTelemetry) *kubernetesReceiver { @@ -210,8 +224,9 @@ func setupReceiver( Distribution: distribution, } - r, _ := newReceiver(context.Background(), tt.ToReceiverCreateSettings(), config, consumer) + r, _ := newReceiver(context.Background(), tt.ToReceiverCreateSettings(), config) kr := r.(*kubernetesReceiver) + kr.metricsConsumer = metricsConsumer kr.resourceWatcher.makeClient = func(_ k8sconfig.APIConfig) (kubernetes.Interface, error) { return client, nil } @@ -219,6 +234,7 @@ func setupReceiver( return osQuotaClient, nil } kr.resourceWatcher.initialTimeout = initialSyncTimeout + kr.resourceWatcher.entityLogConsumer = logsConsumer return kr } diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 029cacd56c81..ada240927c60 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -13,8 +13,10 @@ import ( quotaclientset "github.com/openshift/client-go/quota/clientset/versioned" quotainformersv1 "github.com/openshift/client-go/quota/informers/externalversions" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + "go.uber.org/zap/zapcore" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" @@ -40,11 +42,13 @@ type resourceWatcher struct { informerFactories []sharedInformer dataCollector *collection.DataCollector logger *zap.Logger + sampledLogger *zap.Logger metadataConsumers []metadataConsumer initialTimeout time.Duration initialSyncDone *atomic.Bool initialSyncTimedOut *atomic.Bool config *Config + entityLogConsumer consumer.Logs // For mocking. makeClient func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) @@ -55,8 +59,18 @@ type metadataConsumer func(metadata []*experimentalmetricmetadata.MetadataUpdate // newResourceWatcher creates a Kubernetes resource watcher. func newResourceWatcher(set receiver.CreateSettings, cfg *Config) *resourceWatcher { + // Create a sampled logger for error messages. + core := zapcore.NewSamplerWithOptions( + set.Logger.Core(), + 1*time.Second, + 1, // 1 per second initially + 1000, // then 1/1000 of messages + ) + sampledLogger := zap.New(core) + return &resourceWatcher{ logger: set.Logger, + sampledLogger: sampledLogger, dataCollector: collection.NewDataCollector(set, cfg.NodeConditionTypesToReport, cfg.AllocatableTypesToReport), initialSyncDone: &atomic.Bool{}, initialSyncTimedOut: &atomic.Bool{}, @@ -237,7 +251,7 @@ func (rw *resourceWatcher) onAdd(obj interface{}) { rw.dataCollector.SyncMetrics(obj) // Sync metadata only if there's at least one destination for it to sent. - if len(rw.metadataConsumers) == 0 { + if !rw.hasDestination() { return } @@ -250,13 +264,17 @@ func (rw *resourceWatcher) onDelete(obj interface{}) { rw.dataCollector.RemoveFromMetricsStore(obj) } +func (rw *resourceWatcher) hasDestination() bool { + return len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil +} + func (rw *resourceWatcher) onUpdate(oldObj, newObj interface{}) { rw.waitForInitialInformerSync() // Sync metrics from the new object rw.dataCollector.SyncMetrics(newObj) // Sync metadata only if there's at least one destination for it to sent. - if len(rw.metadataConsumers) == 0 { + if !rw.hasDestination() { return } @@ -326,11 +344,35 @@ func validateMetadataExporters(metadataExporters map[string]bool, exporters map[ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata) { metadataUpdate := metadata.GetMetadataUpdate(oldMetadata, newMetadata) - if len(metadataUpdate) == 0 { - return + if len(metadataUpdate) != 0 { + for _, consume := range rw.metadataConsumers { + _ = consume(metadataUpdate) + } } - for _, consume := range rw.metadataConsumers { - _ = consume(metadataUpdate) + if rw.entityLogConsumer != nil { + // Represent metadata update as entity events. + // TODO: record the timestamp in the events. + entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata) + + // Convert entity events to log representation. + logs := entityEvents.ConvertAndMoveToLogs() + + if logs.LogRecordCount() != 0 { + err := rw.entityLogConsumer.ConsumeLogs(context.Background(), logs) + if err != nil { + rw.sampledLogger.Error("Error sending entity events to the consumer", zap.Error(err)) + + // Note: receiver contract says that we need to retry sending if the + // returned error is not Permanent. However, we are not doing it here. + // Instead, we rely on the fact the metadata is collected periodically + // and the entity events will be delivered on the next cycle. This is + // fine because we deliver cumulative entity state. + // This allows us to avoid stressing the Collector or its destination + // unnecessarily (typically non-Permanent errors happen in stressed conditions). + // The periodic collection will be implemented later, see + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24413 + } + } } } diff --git a/receiver/k8sclusterreceiver/watcher_test.go b/receiver/k8sclusterreceiver/watcher_test.go index 5639fe2a35f1..28ec630023e1 100644 --- a/receiver/k8sclusterreceiver/watcher_test.go +++ b/receiver/k8sclusterreceiver/watcher_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -220,3 +221,75 @@ func TestSetupInformerForKind(t *testing.T) { assert.Equal(t, 1, logs.Len()) assert.Equal(t, "Could not setup an informer for provided group version kind", logs.All()[0].Entry.Message) } + +func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { + client := newFakeClientWithAllResources() + + logsConsumer := new(consumertest.LogsSink) + + // Setup k8s resources. + pods := createPods(t, client, 1) + + origPod := pods[0] + updatedPod := getUpdatedPod(origPod) + + rw := newResourceWatcher(receivertest.NewNopCreateSettings(), &Config{}) + rw.entityLogConsumer = logsConsumer + + // Make some changes to the pod. Each change should result in an entity event represented + // as a log record. + + // Pod is created. + rw.syncMetadataUpdate(nil, rw.dataCollector.SyncMetadata(origPod)) + + // Pod is updated. + rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), rw.dataCollector.SyncMetadata(updatedPod)) + + // Pod is updated again, but nothing changed in the pod. + // Should still result in entity event because they are emitted even + // if the entity is not changed. + rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(updatedPod)) + + // Change pod's state back to original + rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(origPod)) + + // Delete the pod + rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), nil) + + // Must have 5 entity events. + require.EqualValues(t, 5, logsConsumer.LogRecordCount()) + + // Event 1 should contain the initial state of the pod. + lr := logsConsumer.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + expected := map[string]any{ + "otel.entity.event.type": "entity_state", + "otel.entity.type": "k8s.pod", + "otel.entity.id": map[string]any{"k8s.pod.uid": "pod0"}, + "otel.entity.attributes": map[string]any{"pod.creation_timestamp": "0001-01-01T00:00:00Z"}, + } + assert.EqualValues(t, expected, lr.Attributes().AsRaw()) + + // Event 2 should contain the updated state of the pod. + lr = logsConsumer.AllLogs()[1].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + attrs := expected["otel.entity.attributes"].(map[string]any) + attrs["key"] = "value" + assert.EqualValues(t, expected, lr.Attributes().AsRaw()) + + // Event 3 should be identical to the previous one since pod state didn't change. + lr = logsConsumer.AllLogs()[2].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + assert.EqualValues(t, expected, lr.Attributes().AsRaw()) + + // Event 4 should contain the reverted state of the pod. + lr = logsConsumer.AllLogs()[3].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + attrs = expected["otel.entity.attributes"].(map[string]any) + delete(attrs, "key") + assert.EqualValues(t, expected, lr.Attributes().AsRaw()) + + // Event 5 should indicate pod deletion. + lr = logsConsumer.AllLogs()[4].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + expected = map[string]any{ + "otel.entity.event.type": "entity_delete", + "otel.entity.id": map[string]any{"k8s.pod.uid": "pod0"}, + } + assert.EqualValues(t, expected, lr.Attributes().AsRaw()) +}