Skip to content

Commit

Permalink
[receiver/k8sobjects] Fix empty event.name attribute when using wat…
Browse files Browse the repository at this point in the history
…ch mode (#16543)
  • Loading branch information
hvaghani221 authored Dec 16, 2022
1 parent fdc8d20 commit 99da2a8
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 11 deletions.
15 changes: 15 additions & 0 deletions .chloggen/k8sobjects-fix-empty-event-name.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobjects

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Fix empty `event.name` attribute when using watch mode"

# One or more tracking issues related to the change
issues: [16542]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 2 additions & 2 deletions receiver/k8sobjectsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
if err != nil {
kr.setting.Logger.Error("error in pulling object", zap.String("resource", config.gvr.String()), zap.Error(err))
} else if len(objects.Items) > 0 {
logs := unstructuredListToLogData(objects)
logs := pullObjectsToLogData(objects, config)
obsCtx := kr.obsrecv.StartLogsOp(ctx)
err = kr.consumer.ConsumeLogs(obsCtx, logs)
kr.obsrecv.EndLogsOp(obsCtx, typeStr, logs.LogRecordCount(), err)
Expand Down Expand Up @@ -162,7 +162,7 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
kr.setting.Logger.Warn("Watch channel closed unexpectedly", zap.String("resource", config.gvr.String()))
return
}
logs := watchEventToLogData(&data)
logs := watchObjectsToLogData(&data, config)

obsCtx := kr.obsrecv.StartLogsOp(ctx)
err := kr.consumer.ConsumeLogs(obsCtx, logs)
Expand Down
29 changes: 23 additions & 6 deletions receiver/k8sobjectsreceiver/unstructured_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package k8sobjectsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
semconv "go.opentelemetry.io/collector/semconv/v1.9.0"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)

func watchEventToLogData(event *watch.Event) plog.Logs {
type attrUpdaterFunc func(pcommon.Map)

func watchObjectsToLogData(event *watch.Event, config *K8sObjectsConfig) plog.Logs {
udata := event.Object.(*unstructured.Unstructured)
ul := unstructured.UnstructuredList{
Items: []unstructured.Unstructured{{
Expand All @@ -31,10 +34,22 @@ func watchEventToLogData(event *watch.Event) plog.Logs {
},
}},
}
return unstructuredListToLogData(&ul)

return unstructuredListToLogData(&ul, config, func(attrs pcommon.Map) {
objectMeta := udata.Object["metadata"].(map[string]interface{})
name := objectMeta["name"].(string)
if name != "" {
attrs.PutStr("event.domain", "k8s")
attrs.PutStr("event.name", name)
}
})
}

func unstructuredListToLogData(event *unstructured.UnstructuredList) plog.Logs {
func pullObjectsToLogData(event *unstructured.UnstructuredList, config *K8sObjectsConfig) plog.Logs {
return unstructuredListToLogData(event, config)
}

func unstructuredListToLogData(event *unstructured.UnstructuredList, config *K8sObjectsConfig, attrUpdaters ...attrUpdaterFunc) plog.Logs {
out := plog.NewLogs()
resourceLogs := out.ResourceLogs()
namespaceResourceMap := make(map[string]plog.LogRecordSlice)
Expand All @@ -54,9 +69,11 @@ func unstructuredListToLogData(event *unstructured.UnstructuredList) plog.Logs {
record := logSlice.AppendEmpty()

attrs := record.Attributes()
attrs.EnsureCapacity(2)
attrs.PutStr("event.domain", "k8s")
attrs.PutStr("event.name", e.GetKind())
attrs.PutStr("k8s.resource.name", config.gvr.Resource)

for _, attrUpdate := range attrUpdaters {
attrUpdate(attrs)
}

dest := record.Body()
destMap := dest.SetEmptyMap()
Expand Down
66 changes: 63 additions & 3 deletions receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
semconv "go.opentelemetry.io/collector/semconv/v1.9.0"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
)

func TestUnstructuredListToLogData(t *testing.T) {
Expand All @@ -38,7 +41,15 @@ func TestUnstructuredListToLogData(t *testing.T) {
object.SetName(fmt.Sprintf("pod-%d", i))
objects.Items = append(objects.Items, object)
}
logs := unstructuredListToLogData(&objects)

config := &K8sObjectsConfig{
gvr: &schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
},
}
logs := pullObjectsToLogData(&objects, config)

assert.Equal(t, logs.LogRecordCount(), 4)

Expand Down Expand Up @@ -67,18 +78,67 @@ func TestUnstructuredListToLogData(t *testing.T) {
objects.Items = append(objects.Items, object)
}

logs := unstructuredListToLogData(&objects)
config := &K8sObjectsConfig{
gvr: &schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "nodes",
},
}

logs := pullObjectsToLogData(&objects, config)

assert.Equal(t, logs.LogRecordCount(), 3)

resourceLogs := logs.ResourceLogs()
assert.Equal(t, resourceLogs.Len(), 1)
rl := resourceLogs.At(0)
resourceAttributes := rl.Resource().Attributes()
logRecords := rl.ScopeLogs().At(0).LogRecords()
_, ok := resourceAttributes.Get(semconv.AttributeK8SNamespaceName)
assert.Equal(t, ok, false)
assert.Equal(t, rl.ScopeLogs().Len(), 1)
assert.Equal(t, rl.ScopeLogs().At(0).LogRecords().Len(), 3)
assert.Equal(t, logRecords.Len(), 3)

})

t.Run("Test event.name in watch events", func(t *testing.T) {
config := &K8sObjectsConfig{
gvr: &schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "events",
},
}
event := &watch.Event{
Type: watch.Added,
Object: &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": "Event",
"apiVersion": "v1",
"metadata": map[string]interface{}{
"name": "generic-name",
},
},
},
}

logs := watchObjectsToLogData(event, config)

assert.Equal(t, logs.LogRecordCount(), 1)

resourceLogs := logs.ResourceLogs()
assert.Equal(t, resourceLogs.Len(), 1)
rl := resourceLogs.At(0)
logRecords := rl.ScopeLogs().At(0).LogRecords()
assert.Equal(t, rl.ScopeLogs().Len(), 1)
assert.Equal(t, logRecords.Len(), 1)

attrs := logRecords.At(0).Attributes()
eventName, ok := attrs.Get("event.name")
require.True(t, ok)
assert.EqualValues(t, "generic-name", eventName.AsRaw())

})

}

0 comments on commit 99da2a8

Please sign in to comment.