Skip to content

Commit

Permalink
Revert "Reverting Neuron changes for filtering out redundant attribut…
Browse files Browse the repository at this point in the history
…es (#1283)"

This reverts commit dbf51d8.
  • Loading branch information
sky333999 committed Aug 14, 2024
1 parent 246c262 commit 7cf4fea
Show file tree
Hide file tree
Showing 9 changed files with 694 additions and 221 deletions.
1 change: 1 addition & 0 deletions RELEASE_NOTES
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Bug Fixes:
* [Logs/Windows Event] Add windows event log service restart detection and resubscribe
* [Metrics/JMX, Metrics/Net, Metrics/DiskIO] Change cumulative to delta conversion to drop initial value
* [Metrics/JMX] Suppress sessions unit warning
* [ContainerInsights] Filtering out redundant attributes for Neuron metrics

Enhancements:
* [Metrics/JMX] Add cumulative to delta conversion for JMX metrics
Expand Down
1 change: 1 addition & 0 deletions internal/containerinsightscommon/k8sconst.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
PodOwnersKey = "PodOwners"
HostKey = "host"
K8sKey = "kubernetes"
K8sLabelsKey = "labels"

RunningPodCount = "number_of_running_pods"
RunningContainerCount = "number_of_running_containers"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package internal

import (
"regexp"
)

const (
PROCESSED_NEURON_METRIC_PATTERN = "^(container|node|pod)_(neuroncore_|neurondevice_).*|^node_neuron_.*"
)

type AwsNeuronMetricChecker struct {
}

func NewAwsNeuronMetricChecker() *AwsNeuronMetricChecker {
return &AwsNeuronMetricChecker{}
}

func (md *AwsNeuronMetricChecker) IsProcessedNeuronMetric(name string) bool {
matched, err := regexp.MatchString(PROCESSED_NEURON_METRIC_PATTERN, name)
if err != nil {
print(err)
return false
}
return matched
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package internal

import (
"testing"
)

func TestAwsNeuronMetricModifier_IsProcessedNeuronMetric(t *testing.T) {
tests := []struct {
name string
input string
expected bool
}{
{
name: "container_neuroncore_prefix",
input: "container_neuroncore_metric",
expected: true,
},
{
name: "pod_neuroncore_prefix",
input: "pod_neuroncore_metric",
expected: true,
},
{
name: "node_neuroncore_prefix",
input: "node_neuroncore_metric",
expected: true,
},
{
name: "container_neurondevice_prefix",
input: "container_neurondevice_metric",
expected: true,
},
{
name: "pod_neurondevice_prefix",
input: "pod_neurondevice_metric",
expected: true,
},
{
name: "node_neurondevice_prefix",
input: "node_neurondevice_metric",
expected: true,
},
{
name: "node_neuron_prefix",
input: "node_neuron_metric",
expected: true,
},
{
name: "container_neuron_prefix",
input: "container_neuron_metric",
expected: false,
},
{
name: "other_prefix",
input: "other_metric",
expected: false,
},
}

md := NewAwsNeuronMetricChecker()

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := md.IsProcessedNeuronMetric(test.input)
if result != test.expected {
t.Errorf("IsProcessedNeuronMetric(%q) = %v, expected %v", test.input, result, test.expected)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package internal
import (
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

Expand Down Expand Up @@ -46,6 +45,7 @@ const (
Kubernetes = "kubernetes"
Region = "region"
SubnetId = "subnet_id"
RuntimeTagOverride = "DEFAULT"
NeuronExecutionErrorsAggregatedMetric = containerinsightscommon.NeuronExecutionErrors + "_total"
NeuronDeviceHardwareEccEventsAggregatedMetric = containerinsightscommon.NeuronDeviceHardwareEccEvents + "_total"
)
Expand Down Expand Up @@ -99,26 +99,6 @@ var (
"sram_ecc_corrected": NeuronDeviceHardwareEccEventsAggregatedMetric,
"sram_ecc_uncorrected": NeuronDeviceHardwareEccEventsAggregatedMetric},
}

MetricAttributesToKeep = map[string]struct{}{
ClusterName: {},
ContainerName: {},
FullPodName: {},
InstanceId: {},
InstanceType: {},
K8sPodName: {},
Namespace: {},
NeuronDevice: {},
NodeName: {},
PodName: {},
Service: {},
AvailabilityZone: {},
Kubernetes: {},
Region: {},
RuntimeTag: {},
SubnetId: {},
NeuronCore: {},
}
)

func NewMetricModifier(logger *zap.Logger) *AwsNeuronMetricModifier {
Expand All @@ -142,7 +122,7 @@ func (md *AwsNeuronMetricModifier) ModifyMetric(originalMetric pmetric.Metric, m
}
// Neuron metrics sent by the neuron monitor don't have any units so we add them in the agent.
addUnit(originalMetric)
prefixCoreAndDeviceLabels(originalMetric)
updateCoreDeviceRuntimeLabels(originalMetric)
resetStaleDatapoints(originalMetric)

originalMetricName := originalMetric.Name()
Expand All @@ -156,7 +136,6 @@ func (md *AwsNeuronMetricModifier) ModifyMetric(originalMetric pmetric.Metric, m
}

modifiedMetricSlice := md.extractDatapointsAsMetricsAndAggregate(originalMetric)
filterLabels(modifiedMetricSlice, originalMetricName)
md.duplicateMetrics(modifiedMetricSlice, originalMetricName, originalMetric.Sum().DataPoints(), metrics)
}

Expand Down Expand Up @@ -251,7 +230,6 @@ func (md *AwsNeuronMetricModifier) extractDatapointsAsMetricsAndAggregate(origin

// Creating body for the aggregated metric and add it to the new newMetricSlice for each runtime
for aggregatedMetricMetadata, value := range aggregatedValuesPerRuntimeTag {
// Aggregated metric for neuron device ecc events is not required
aggregatedMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), aggregatedMetricMetadata.aggregatedMetricName, originalMetric.Unit())

originalMetricDatapoints.At(0).CopyTo(aggregatedMetric.SetEmptySum().DataPoints().AppendEmpty())
Expand All @@ -269,33 +247,9 @@ func (md *AwsNeuronMetricModifier) extractDatapointsAsMetricsAndAggregate(origin
return newMetricSlice
}

// This method removes the attribute keys which are not required. The removal is necessary so that the metrics are grouped together
func filterLabels(slice pmetric.MetricSlice, originalMetricName string) {
_, exists := metricModificationsMap[originalMetricName]
if !exists {
return
}

for i := 0; i < slice.Len(); i++ {
m := slice.At(i)

dps := m.Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
attributes := dps.At(j).Attributes()
attributes.RemoveIf(func(label string, value pcommon.Value) bool {
_, exists := MetricAttributesToKeep[label]
if !exists {
return true
}
return false
})
}
}
}

// This method prefixes NeuronCore and NeuronDevice values with `core` and `device` respectively
// to make the attribute values more verbose
func prefixCoreAndDeviceLabels(originalMetric pmetric.Metric) {
func updateCoreDeviceRuntimeLabels(originalMetric pmetric.Metric) {
dps := originalMetric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
Expand All @@ -304,6 +258,7 @@ func prefixCoreAndDeviceLabels(originalMetric pmetric.Metric) {
dp.Attributes().PutStr(attributeKey, attributeValuePrefix+value.Str())
}
}
dp.Attributes().PutStr(RuntimeTag, RuntimeTagOverride)
}
}

Expand Down Expand Up @@ -360,7 +315,7 @@ func resetStaleDatapoints(originalMetric pmetric.Metric) {
dp := dps.At(i)
if dp.ValueType() == pmetric.NumberDataPointValueTypeEmpty || dp.Flags().NoRecordedValue() {
dp.SetDoubleValue(dp.DoubleValue())
dp.Attributes().PutStr(RuntimeTag, "default")
dp.Attributes().PutStr(RuntimeTag, RuntimeTagOverride)
dp.SetFlags(dp.Flags().WithNoRecordedValue(false))
}
}
Expand Down
Loading

0 comments on commit 7cf4fea

Please sign in to comment.