Skip to content

Commit

Permalink
[receiver/k8sclusterreceiver] Implement conversion from Metadata to t…
Browse files Browse the repository at this point in the history
…he new entity model (#24396)

Resolves
#24394

This is part 2 of the work to move to entity events-as-log-records in
K8s cluster receiver:

#19741

This also adds the notion of entity type to the KubernetesMetadata
definition
since it is required in the new data model.

Overall design document: 

https://docs.google.com/document/d/1Tg18sIck3Nakxtd3TFFcIjrmRO_0GLMdHXylVqBQmJA/
  • Loading branch information
tigrannajaryan authored Jul 19, 2023
1 parent 806c303 commit 603d3a7
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 0 deletions.
11 changes: 11 additions & 0 deletions receiver/k8sclusterreceiver/internal/collection/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "test-pod-0-uid",
Metadata: commonPodMetadata,
},
experimentalmetricmetadata.ResourceID("container-id"): {
EntityType: "container",
ResourceIDKey: "container.id",
ResourceID: "container-id",
Metadata: map[string]string{
Expand All @@ -74,6 +76,7 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
}, testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{})),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "test-pod-0-uid",
Metadata: allPodMetadata(map[string]string{
Expand Down Expand Up @@ -111,6 +114,7 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "test-pod-0-uid",
Metadata: allPodMetadata(map[string]string{
Expand All @@ -126,6 +130,7 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
resource: testutils.NewDaemonset("1"),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-daemonset-1-uid"): {
EntityType: "k8s.daemonset",
ResourceIDKey: "k8s.daemonset.uid",
ResourceID: "test-daemonset-1-uid",
Metadata: map[string]string{
Expand All @@ -142,6 +147,7 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
resource: testutils.NewDeployment("1"),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-deployment-1-uid"): {
EntityType: "k8s.deployment",
ResourceIDKey: "k8s.deployment.uid",
ResourceID: "test-deployment-1-uid",
Metadata: map[string]string{
Expand All @@ -159,6 +165,7 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
resource: testutils.NewHPA("1"),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-hpa-1-uid"): {
EntityType: "k8s.hpa",
ResourceIDKey: "k8s.hpa.uid",
ResourceID: "test-hpa-1-uid",
Metadata: map[string]string{
Expand All @@ -175,6 +182,7 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
resource: testutils.NewJob("1"),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-job-1-uid"): {
EntityType: "k8s.job",
ResourceIDKey: "k8s.job.uid",
ResourceID: "test-job-1-uid",
Metadata: map[string]string{
Expand All @@ -193,6 +201,7 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
resource: testutils.NewNode("1"),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-node-1-uid"): {
EntityType: "k8s.node",
ResourceIDKey: "k8s.node.uid",
ResourceID: "test-node-1-uid",
Metadata: map[string]string{
Expand All @@ -210,6 +219,7 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
resource: testutils.NewReplicaSet("1"),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-replicaset-1-uid"): {
EntityType: "k8s.replicaset",
ResourceIDKey: "k8s.replicaset.uid",
ResourceID: "test-replicaset-1-uid",
Metadata: map[string]string{
Expand All @@ -234,6 +244,7 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
},
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-replicationcontroller-1-uid"): {
EntityType: "k8s.replicationcontroller",
ResourceIDKey: "k8s.replicationcontroller.uid",
ResourceID: "test-replicationcontroller-1-uid",
Metadata: map[string]string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func GetMetadata(cs corev1.ContainerStatus) *metadata.KubernetesMetadata {
}

return &metadata.KubernetesMetadata{
EntityType: "container",
ResourceIDKey: conventions.AttributeContainerID,
ResourceID: metadataPkg.ResourceID(utils.StripContainerID(cs.ContainerID)),
Metadata: mdata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestCronJobMetadata(t *testing.T) {
// Assert metadata from Pod.
require.Equal(t,
metadata.KubernetesMetadata{
EntityType: "k8s.cronjob",
ResourceIDKey: "k8s.cronjob.uid",
ResourceID: "test-cronjob-1-uid",
Metadata: map[string]string{
Expand Down
37 changes: 37 additions & 0 deletions receiver/k8sclusterreceiver/internal/metadata/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metadata // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"

import (
metadataPkg "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
)

// GetEntityEvents processes metadata updates and returns entity events that describe the metadata changes.
func GetEntityEvents(old, new map[metadataPkg.ResourceID]*KubernetesMetadata) metadataPkg.EntityEventsSlice {
out := metadataPkg.NewEntityEventsSlice()

for id, oldObj := range old {
if _, ok := new[id]; !ok {
// An object was present, but no longer is. Create a "delete" event.
entityEvent := out.AppendEmpty()
entityEvent.ID().PutStr(oldObj.ResourceIDKey, string(oldObj.ResourceID))
entityEvent.SetEntityDelete()
}
}

// All "new" are current objects. Create "state" events. "old" state does not matter.
for _, newObj := range new {
entityEvent := out.AppendEmpty()
entityEvent.ID().PutStr(newObj.ResourceIDKey, string(newObj.ResourceID))
state := entityEvent.SetEntityState()
state.SetEntityType(newObj.EntityType)

attrs := state.Attributes()
for k, v := range newObj.Metadata {
attrs.PutStr(k, v)
}
}

return out
}
207 changes: 207 additions & 0 deletions receiver/k8sclusterreceiver/internal/metadata/entities_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metadata // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

metadataPkg "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
)

func Test_GetEntityEvents(t *testing.T) {
tests := []struct {
name string
old, new map[metadataPkg.ResourceID]*KubernetesMetadata
events metadataPkg.EntityEventsSlice
}{
{
name: "new entity",
new: map[metadataPkg.ResourceID]*KubernetesMetadata{
"123": {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "123",
Metadata: map[string]string{
"label1": "value1",
},
},
},
events: func() metadataPkg.EntityEventsSlice {
out := metadataPkg.NewEntityEventsSlice()
event := out.AppendEmpty()
_ = event.ID().FromRaw(map[string]any{"k8s.pod.uid": "123"})
state := event.SetEntityState()
state.SetEntityType("k8s.pod")
_ = state.Attributes().FromRaw(map[string]any{"label1": "value1"})
return out
}(),
},
{
name: "deleted entity",
old: map[metadataPkg.ResourceID]*KubernetesMetadata{
"123": {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "123",
Metadata: map[string]string{
"label1": "value1",
},
},
},
events: func() metadataPkg.EntityEventsSlice {
out := metadataPkg.NewEntityEventsSlice()
event := out.AppendEmpty()
_ = event.ID().FromRaw(map[string]any{"k8s.pod.uid": "123"})
event.SetEntityDelete()
return out
}(),
},
{
name: "changed entity",
old: map[metadataPkg.ResourceID]*KubernetesMetadata{
"123": {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "123",
Metadata: map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
},
},
},
new: map[metadataPkg.ResourceID]*KubernetesMetadata{
"123": {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "123",
Metadata: map[string]string{
"label1": "value1",
"label2": "foo",
"new": "bar",
},
},
},
events: func() metadataPkg.EntityEventsSlice {
out := metadataPkg.NewEntityEventsSlice()
event := out.AppendEmpty()
_ = event.ID().FromRaw(map[string]any{"k8s.pod.uid": "123"})
state := event.SetEntityState()
state.SetEntityType("k8s.pod")
_ = state.Attributes().FromRaw(map[string]any{"label1": "value1", "label2": "foo", "new": "bar"})
return out
}(),
},
{
name: "unchanged entity",
old: map[metadataPkg.ResourceID]*KubernetesMetadata{
"123": {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "123",
Metadata: map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
},
},
},
new: map[metadataPkg.ResourceID]*KubernetesMetadata{
"123": {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "123",
Metadata: map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
},
},
},
events: func() metadataPkg.EntityEventsSlice {
out := metadataPkg.NewEntityEventsSlice()
event := out.AppendEmpty()
_ = event.ID().FromRaw(map[string]any{"k8s.pod.uid": "123"})
state := event.SetEntityState()
state.SetEntityType("k8s.pod")
_ = state.Attributes().FromRaw(
map[string]any{
"label1": "value1", "label2": "value2", "label3": "value3",
},
)
return out
}(),
},
{
name: "new and deleted entity",
old: map[metadataPkg.ResourceID]*KubernetesMetadata{
"123": {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "123",
Metadata: map[string]string{
"label1": "value1",
},
},
},
new: map[metadataPkg.ResourceID]*KubernetesMetadata{
"234": {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: "234",
Metadata: map[string]string{
"label2": "value2",
},
},
},
events: func() metadataPkg.EntityEventsSlice {
out := metadataPkg.NewEntityEventsSlice()

event := out.AppendEmpty()
_ = event.ID().FromRaw(map[string]any{"k8s.pod.uid": "123"})
event.SetEntityDelete()

event = out.AppendEmpty()
_ = event.ID().FromRaw(map[string]any{"k8s.pod.uid": "234"})
state := event.SetEntityState()
state.SetEntityType("k8s.pod")
_ = state.Attributes().FromRaw(map[string]any{"label2": "value2"})
return out
}(),
},
}
for _, test := range tests {
tt := test
t.Run(
tt.name, func(t *testing.T) {
// Make sure test data is correct.
for k, v := range tt.old {
assert.EqualValues(t, k, v.ResourceID)
}
for k, v := range tt.new {
assert.EqualValues(t, k, v.ResourceID)
}

// Convert and test expected events.
events := GetEntityEvents(tt.old, tt.new)
require.Equal(t, tt.events.Len(), events.Len())
for i := 0; i < events.Len(); i++ {
actual := events.At(i)
expected := tt.events.At(i)
assert.EqualValues(t, expected.EventType(), actual.EventType())
assert.EqualValues(t, expected.ID().AsRaw(), actual.ID().AsRaw())
if expected.EventType() == metadataPkg.EventTypeState {
estate := expected.EntityStateDetails()
astate := actual.EntityStateDetails()
assert.EqualValues(t, estate.EntityType(), astate.EntityType())
assert.EqualValues(t, estate.Attributes().AsRaw(), astate.Attributes().AsRaw())
}
}
},
)
}
}
7 changes: 7 additions & 0 deletions receiver/k8sclusterreceiver/internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

// KubernetesMetadata associates a resource to a set of properties.
type KubernetesMetadata struct {
// The type of the entity, e.g. k8s.pod
EntityType string
// resourceIDKey is the label key of UID label for the resource.
ResourceIDKey string
// resourceID is the Kubernetes UID of the resource. In case of
Expand Down Expand Up @@ -62,6 +64,7 @@ func GetGenericMetadata(om *v1.ObjectMeta, resourceType string) *KubernetesMetad
}

return &KubernetesMetadata{
EntityType: getOTelEntityTypeFromKind(rType),
ResourceIDKey: GetOTelUIDFromKind(rType),
ResourceID: metadataPkg.ResourceID(om.UID),
Metadata: metadata,
Expand All @@ -76,6 +79,10 @@ func GetOTelNameFromKind(kind string) string {
return fmt.Sprintf("k8s.%s.name", kind)
}

func getOTelEntityTypeFromKind(kind string) string {
return fmt.Sprintf("k8s.%s", kind)
}

// mergeKubernetesMetadataMaps merges maps of string (resource id) to
// KubernetesMetadata into a single map.
func MergeKubernetesMetadataMaps(maps ...map[metadataPkg.ResourceID]*KubernetesMetadata) map[metadataPkg.ResourceID]*KubernetesMetadata {
Expand Down
1 change: 1 addition & 0 deletions receiver/k8sclusterreceiver/internal/node/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func GetMetadata(node *corev1.Node) map[experimentalmetricmetadata.ResourceID]*m
nodeID := experimentalmetricmetadata.ResourceID(node.UID)
return map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
nodeID: {
EntityType: "k8s.node",
ResourceIDKey: conventions.AttributeK8SNodeUID,
ResourceID: nodeID,
Metadata: meta,
Expand Down
1 change: 1 addition & 0 deletions receiver/k8sclusterreceiver/internal/pod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func GetMetadata(pod *corev1.Pod, mc *metadata.Store, logger *zap.Logger) map[ex
podID := experimentalmetricmetadata.ResourceID(pod.UID)
return metadata.MergeKubernetesMetadataMaps(map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
podID: {
EntityType: "k8s.pod",
ResourceIDKey: conventions.AttributeK8SPodUID,
ResourceID: podID,
Metadata: meta,
Expand Down
1 change: 1 addition & 0 deletions receiver/k8sclusterreceiver/internal/pod/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func expectedKubernetesMetadata(to testCaseOptions) map[experimentalmetricmetada

out := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID(podUIDLabel): {
EntityType: "k8s.pod",
ResourceIDKey: "k8s.pod.uid",
ResourceID: experimentalmetricmetadata.ResourceID(podUIDLabel),
Metadata: map[string]string{
Expand Down
Loading

0 comments on commit 603d3a7

Please sign in to comment.