Skip to content

Commit

Permalink
[processor/k8sattributes] Add support for k8s.node.uid metadata (#31637)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

This PR adds support for including the `k8s.node.uid` as part of the
metadata added by the processor.

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

[`TestAddNodeUID`](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/31637/files#diff-87c4a82beb768dc32991725c2cb8430e9daed511e97acecfaa6843bd1970cb2fR883)

**Documentation:** <Describe the documentation added.>
Updated
[`README.md`](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/31637/files#diff-57b0d6448d516c0c80b326a7f7f95c3256b2b774e496870dc1dd35dcf8bfc256R162)

Signed-off-by: ChrsMark <[email protected]>
  • Loading branch information
ChrsMark authored Mar 13, 2024
1 parent 77a9540 commit 4700e4b
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 9 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_node_uid_k8s_attrib.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/k8sattributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for `k8s.node.uid` metadata

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31637]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ k8sattributes/2:
## Cluster-scoped RBAC
If you'd like to set up the k8sattributesprocessor to receive telemetry from across namespaces, it will need `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When extracting metadatas from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources.
If you'd like to set up the k8sattributesprocessor to receive telemetry from across namespaces, it will need `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When using `k8s.node.uid` or extracting metadata from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources.

Here is an example of a `ClusterRole` to give a `ServiceAccount` the necessary permissions for all pods, nodes, and namespaces in the cluster (replace `<OTEL_COL_NAMESPACE>` with a namespace where collector is deployed):

Expand Down
17 changes: 11 additions & 6 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,17 @@ func (cfg *Config) Validate() error {
for _, field := range cfg.Extract.Metadata {
switch field {
case conventions.AttributeK8SNamespaceName, conventions.AttributeK8SPodName, conventions.AttributeK8SPodUID,
specPodHostName, metadataPodStartTime, conventions.AttributeK8SDeploymentName, conventions.AttributeK8SDeploymentUID,
conventions.AttributeK8SReplicaSetName, conventions.AttributeK8SReplicaSetUID, conventions.AttributeK8SDaemonSetName,
conventions.AttributeK8SDaemonSetUID, conventions.AttributeK8SStatefulSetName, conventions.AttributeK8SStatefulSetUID,
conventions.AttributeK8SContainerName, conventions.AttributeK8SJobName, conventions.AttributeK8SJobUID,
conventions.AttributeK8SCronJobName, conventions.AttributeK8SNodeName, conventions.AttributeContainerID,
conventions.AttributeContainerImageName, conventions.AttributeContainerImageTag, clusterUID:
specPodHostName, metadataPodStartTime,
conventions.AttributeK8SDeploymentName, conventions.AttributeK8SDeploymentUID,
conventions.AttributeK8SReplicaSetName, conventions.AttributeK8SReplicaSetUID,
conventions.AttributeK8SDaemonSetName, conventions.AttributeK8SDaemonSetUID,
conventions.AttributeK8SStatefulSetName, conventions.AttributeK8SStatefulSetUID,
conventions.AttributeK8SJobName, conventions.AttributeK8SJobUID,
conventions.AttributeK8SCronJobName,
conventions.AttributeK8SNodeName, conventions.AttributeK8SNodeUID,
conventions.AttributeK8SContainerName, conventions.AttributeContainerID,
conventions.AttributeContainerImageName, conventions.AttributeContainerImageTag,
clusterUID:
default:
return fmt.Errorf("\"%s\" is not a supported metadata field", field)
}
Expand Down
6 changes: 5 additions & 1 deletion processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
}
}

if c.extractNodeLabelsAnnotations() {
if c.extractNodeLabelsAnnotations() || c.extractNodeUID() {
c.nodeInformer = newNodeSharedInformer(c.kc, c.Filters.Node)
}

Expand Down Expand Up @@ -930,6 +930,10 @@ func (c *WatchClient) extractNodeLabelsAnnotations() bool {
return false
}

func (c *WatchClient) extractNodeUID() bool {
return c.Rules.NodeUID
}

func (c *WatchClient) addOrUpdateNode(node *api_v1.Node) {
newNode := &Node{
Name: node.Name,
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/internal/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ type ExtractionRules struct {
StatefulSetUID bool
StatefulSetName bool
Node bool
NodeUID bool
StartTime bool
ContainerName bool
ContainerID bool
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ all_set:
enabled: true
k8s.node.name:
enabled: true
k8s.node.uid:
enabled: true
k8s.pod.hostname:
enabled: true
k8s.pod.name:
Expand Down Expand Up @@ -75,6 +77,8 @@ none_set:
enabled: false
k8s.node.name:
enabled: false
k8s.node.uid:
enabled: false
k8s.pod.hostname:
enabled: false
k8s.pod.name:
Expand Down
4 changes: 4 additions & 0 deletions processor/k8sattributesprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ resource_attributes:
description: The name of the Node.
type: string
enabled: true
k8s.node.uid:
description: The UID of the Node.
type: string
enabled: false
container.id:
description: Container ID. Usually a UUID, as for example used to identify Docker containers. The UUID might be abbreviated. Requires k8s.container.restart_count.
type: string
Expand Down
5 changes: 5 additions & 0 deletions processor/k8sattributesprocessor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func enabledAttributes() (attributes []string) {
if defaultConfig.K8sNodeName.Enabled {
attributes = append(attributes, conventions.AttributeK8SNodeName)
}
if defaultConfig.K8sNodeUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SNodeUID)
}
if defaultConfig.K8sPodHostname.Enabled {
attributes = append(attributes, specPodHostName)
}
Expand Down Expand Up @@ -163,6 +166,8 @@ func withExtractMetadata(fields ...string) option {
p.rules.CronJobName = true
case conventions.AttributeK8SNodeName:
p.rules.Node = true
case conventions.AttributeK8SNodeUID:
p.rules.NodeUID = true
case conventions.AttributeContainerID:
p.rules.ContainerID = true
case conventions.AttributeContainerImageName:
Expand Down
14 changes: 14 additions & 0 deletions processor/k8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
resource.Attributes().PutStr(key, val)
}
}
nodeUID := kp.getUIDForPodsNode(nodeName)
if nodeUID != "" {
if _, found := resource.Attributes().Get(conventions.AttributeK8SNodeUID); !found {
resource.Attributes().PutStr(conventions.AttributeK8SNodeUID, nodeUID)
}
}
}
}

Expand Down Expand Up @@ -263,6 +269,14 @@ func (kp *kubernetesprocessor) getAttributesForPodsNode(nodeName string) map[str
return node.Attributes
}

func (kp *kubernetesprocessor) getUIDForPodsNode(nodeName string) string {
node, ok := kp.kc.GetNode(nodeName)
if !ok {
return ""
}
return node.NodeUID
}

// intFromAttribute extracts int value from an attribute stored as string or int
func intFromAttribute(val pcommon.Value) (int, error) {
switch val.Type() {
Expand Down
66 changes: 66 additions & 0 deletions processor/k8sattributesprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,72 @@ func TestAddNodeLabels(t *testing.T) {
})
}

func TestAddNodeUID(t *testing.T) {
nodeUID := "asdfasdf-asdfasdf-asdf"
m := newMultiTest(
t,
func() component.Config {
cfg := createDefaultConfig().(*Config)
cfg.Extract.Metadata = []string{"k8s.node.uid"}
cfg.Extract.Labels = []FieldExtractConfig{}
return cfg
}(),
nil,
)

podIP := "1.1.1.1"
nodes := map[string]map[string]string{
"node-1": {
"nodelabel": "1",
},
}
m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
kp.podAssociations = []kube.Association{
{
Sources: []kube.AssociationSource{
{
From: "connection",
},
},
},
}
})

m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
pi := kube.PodIdentifier{
kube.PodIdentifierAttributeFromConnection(podIP),
}
kp.kc.(*fakeClient).Pods[pi] = &kube.Pod{Name: "test-2323", NodeName: "node-1"}
kp.kc.(*fakeClient).Nodes = make(map[string]*kube.Node)
for ns, labels := range nodes {
kp.kc.(*fakeClient).Nodes[ns] = &kube.Node{Attributes: labels, NodeUID: nodeUID}
}
})

ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{
IP: net.ParseIP(podIP),
},
})
m.testConsume(
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})

m.assertBatchesLen(1)
m.assertResourceObjectLen(0)
m.assertResource(0, func(res pcommon.Resource) {
assert.Equal(t, 3, res.Attributes().Len())
assertResourceHasStringAttribute(t, res, "k8s.pod.ip", podIP)
assertResourceHasStringAttribute(t, res, "k8s.node.uid", nodeUID)
assertResourceHasStringAttribute(t, res, "nodelabel", "1")
})
}

func TestProcessorAddContainerAttributes(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 4700e4b

Please sign in to comment.