Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/k8sclusterreceiver] Begin emitting entity events as logs #24419

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .chloggen/k8scluster-receiver-emit-entityevents.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sclusterreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: k8sclusterreceiver - Begin emitting entity events as logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24400]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
18 changes: 18 additions & 0 deletions pkg/experimentalmetricmetadata/entity_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const (
semconvOtelEntityID = "otel.entity.id"
semconvOtelEntityType = "otel.entity.type"
semconvOtelEntityAttributes = "otel.entity.attributes"

semconvOtelEntityEventAsScope = "otel.entity.event_as_log"
)

// EntityEventsSlice is a slice of EntityEvent.
Expand Down Expand Up @@ -52,6 +54,22 @@ func (s EntityEventsSlice) At(i int) EntityEvent {
return EntityEvent{orig: s.orig.At(i)}
}

// ConvertAndMoveToLogs converts entity events to log representation and moves them
// from this EntityEventsSlice into plog.Logs. This slice becomes empty after this call.
func (s EntityEventsSlice) ConvertAndMoveToLogs() plog.Logs {
logs := plog.NewLogs()

scopeLogs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty()

// Set the scope marker.
scopeLogs.Scope().Attributes().PutBool(semconvOtelEntityEventAsScope, true)

// Move all events. Note that this remove all
s.orig.MoveAndAppendTo(scopeLogs.LogRecords())

return logs
}

// EntityEvent is an entity event.
type EntityEvent struct {
orig plog.LogRecord
Expand Down
54 changes: 54 additions & 0 deletions pkg/experimentalmetricmetadata/entity_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,60 @@ func Test_EntityEventsSlice(t *testing.T) {
assert.Equal(t, 1, slice.Len())
}

func Test_EntityEventsSlice_ConvertAndMoveToLogs(t *testing.T) {
// Prepare an event slice.
slice := NewEntityEventsSlice()
event := slice.AppendEmpty()

event.ID().PutStr("k8s.pod.uid", "123")
state := event.SetEntityState()
state.SetEntityType("k8s.pod")
state.Attributes().PutStr("label1", "value1")

event = slice.AppendEmpty()
event.ID().PutStr("k8s.node.uid", "abc")
event.SetEntityDelete()

// Convert to logs.
logs := slice.ConvertAndMoveToLogs()

// Check that all 2 events are moved.
assert.Equal(t, 0, slice.Len())
assert.Equal(t, 2, logs.LogRecordCount())

assert.Equal(t, 1, logs.ResourceLogs().Len())

scopeLogs := logs.ResourceLogs().At(0).ScopeLogs().At(0)

// Check the Scope
v, ok := scopeLogs.Scope().Attributes().Get(semconvOtelEntityEventAsScope)
assert.True(t, ok)
assert.Equal(t, true, v.Bool())

records := scopeLogs.LogRecords()
assert.Equal(t, 2, records.Len())

// Check the first event.
attrs := records.At(0).Attributes().AsRaw()
assert.EqualValues(
t, map[string]any{
semconvOtelEntityEventName: semconvEventEntityEventState,
semconvOtelEntityType: "k8s.pod",
semconvOtelEntityID: map[string]any{"k8s.pod.uid": "123"},
semconvOtelEntityAttributes: map[string]any{"label1": "value1"},
}, attrs,
)

// Check the second event.
attrs = records.At(1).Attributes().AsRaw()
assert.EqualValues(
t, map[string]any{
semconvOtelEntityEventName: semconvEventEntityEventDelete,
semconvOtelEntityID: map[string]any{"k8s.node.uid": "abc"},
}, attrs,
)
}

func Test_EntityEventType(t *testing.T) {
lr := plog.NewLogRecord()
e := EntityEvent{lr}
Expand Down
11 changes: 10 additions & 1 deletion receiver/k8sclusterreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
| Status | |
| ------------- |-----------|
| Stability | [beta]: metrics |
| | [development]: logs |
| Distributions | [contrib], [observiq], [splunk], [sumo] |
| Issues | ![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fk8scluster%20&label=open&color=orange&logo=opentelemetry) ![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fk8scluster%20&label=closed&color=blue&logo=opentelemetry) |

[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[observiq]: https://github.com/observIQ/observiq-otel-collector
[splunk]: https://github.com/signalfx/splunk-otel-collector
[sumo]: https://github.com/SumoLogic/sumologic-otel-collector
<!-- end autogenerated section -->

The Kubernetes Cluster receiver collects cluster-level metrics from the Kubernetes
The Kubernetes Cluster receiver collects cluster-level metrics and entity events from the Kubernetes
API server. It uses the K8s API to listen for updates. A single instance of this
receiver can be used to monitor a cluster.

Expand Down Expand Up @@ -107,6 +109,10 @@ type MetadataDelta struct {

See [here](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/experimentalmetricmetadata/metadata.go) for details about the above types.

The same metadata will be also emitted as entity events in the form of log records if
this receiver is connected to a logs pipeline.
See [here](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23565)
for the format of emitted log records.

## Example

Expand Down Expand Up @@ -139,6 +145,9 @@ data:
metrics:
receivers: [k8s_cluster]
exporters: [logging]
logs/entity_events:
receivers: [k8s_cluster]
exporters: [logging]
EOF
```

Expand Down
11 changes: 10 additions & 1 deletion receiver/k8sclusterreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
)

Expand Down Expand Up @@ -41,5 +42,13 @@ func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithMetrics(newReceiver, metadata.MetricsStability))
receiver.WithMetrics(newMetricsReceiver, metadata.MetricsStability),
receiver.WithLogs(newLogsReceiver, metadata.MetricsStability),
)
}

// This is the map of already created k8scluster receivers for particular configurations.
// We maintain this map because the Factory is asked log and metric receivers separately
// when it gets CreateLogsReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one receiver object per configuration.
var receivers = sharedcomponent.NewSharedComponents()
28 changes: 27 additions & 1 deletion receiver/k8sclusterreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

quotaclientset "github.com/openshift/client-go/quota/clientset/versioned"
fakeQuota "github.com/openshift/client-go/quota/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -19,6 +20,7 @@ import (
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
)

func TestFactory(t *testing.T) {
Expand Down Expand Up @@ -80,7 +82,7 @@ func TestFactoryDistributions(t *testing.T) {
}

func newTestReceiver(t *testing.T, cfg *Config) *kubernetesReceiver {
r, err := newReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, consumertest.NewNop())
r, err := newReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, r)
rcvr, ok := r.(*kubernetesReceiver)
Expand Down Expand Up @@ -111,3 +113,27 @@ func (n *nopHostWithExporters) GetExporters() map[component.DataType]map[compone
},
}
}

func TestNewSharedReceiver(t *testing.T) {
f := NewFactory()
cfg := f.CreateDefaultConfig()

mc := consumertest.NewNop()
mr, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, mc)
require.NoError(t, err)

// Verify that the metric consumer is correctly set.
kr := mr.(*sharedcomponent.SharedComponent).Unwrap().(*kubernetesReceiver)
assert.Equal(t, mc, kr.metricsConsumer)

lc := consumertest.NewNop()
lr, err := newLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, lc)
require.NoError(t, err)

// Verify that the log consumer is correct set.
kr = lr.(*sharedcomponent.SharedComponent).Unwrap().(*kubernetesReceiver)
assert.Equal(t, lc, kr.resourceWatcher.entityLogConsumer)

// Make sure only one receiver is created both for metrics and logs.
assert.Equal(t, mr, lr)
}
3 changes: 3 additions & 0 deletions receiver/k8sclusterreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.81.0
Expand Down Expand Up @@ -160,3 +161,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil
replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest => ../../internal/k8stest

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions receiver/k8sclusterreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ status:
class: receiver
stability:
beta: [metrics]
development: [logs]
distributions: [contrib, splunk, observiq, sumo]

73 changes: 58 additions & 15 deletions receiver/k8sclusterreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ var _ receiver.Metrics = (*kubernetesReceiver)(nil)
type kubernetesReceiver struct {
resourceWatcher *resourceWatcher

config *Config
settings receiver.CreateSettings
consumer consumer.Metrics
cancel context.CancelFunc
obsrecv *obsreport.Receiver
config *Config
settings receiver.CreateSettings
metricsConsumer consumer.Metrics
cancel context.CancelFunc
obsrecv *obsreport.Receiver
}

func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) error {
Expand Down Expand Up @@ -97,33 +97,76 @@ func (kr *kubernetesReceiver) Shutdown(context.Context) error {
}

func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) {
if kr.metricsConsumer == nil {
// Metric collection is not enabled.
return
}

now := time.Now()
mds := kr.resourceWatcher.dataCollector.CollectMetricData(now)

c := kr.obsrecv.StartMetricsOp(ctx)

numPoints := mds.DataPointCount()
err := kr.consumer.ConsumeMetrics(c, mds)
err := kr.metricsConsumer.ConsumeMetrics(c, mds)
kr.obsrecv.EndMetricsOp(c, metadata.Type, numPoints, err)
}

// newReceiver creates the Kubernetes cluster receiver with the given configuration.
func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) {
rCfg := cfg.(*Config)
// newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration.
func newMetricsReceiver(
ctx context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics,
) (receiver.Metrics, error) {
var err error
r := receivers.GetOrAdd(
cfg, func() component.Component {
var rcv component.Component
rcv, err = newReceiver(ctx, set, cfg)
return rcv
},
)
if err != nil {
return nil, err
}
r.Unwrap().(*kubernetesReceiver).metricsConsumer = consumer
return r, nil
}

obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: transport,
ReceiverCreateSettings: set,
})
// newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration.
func newLogsReceiver(
ctx context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Logs,
) (receiver.Logs, error) {
var err error
r := receivers.GetOrAdd(
cfg, func() component.Component {
var rcv component.Component
rcv, err = newReceiver(ctx, set, cfg)
return rcv
},
)
if err != nil {
return nil, err
}
r.Unwrap().(*kubernetesReceiver).resourceWatcher.entityLogConsumer = consumer
return r, nil
}

// newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration.
func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) {
rCfg := cfg.(*Config)
obsrecv, err := obsreport.NewReceiver(
obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: transport,
ReceiverCreateSettings: set,
},
)
if err != nil {
return nil, err
}
return &kubernetesReceiver{
resourceWatcher: newResourceWatcher(set, rCfg),
settings: set,
config: rCfg,
consumer: consumer,
obsrecv: obsrecv,
}, nil
}
Loading