Skip to content

Commit

Permalink
Reverting Neuron changes for filtering out redundant attributes (#1283)
Browse files Browse the repository at this point in the history
  • Loading branch information
mitali-salvi authored Aug 8, 2024
1 parent bcecbc1 commit dbf51d8
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 694 deletions.
1 change: 0 additions & 1 deletion RELEASE_NOTES
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Bug Fixes:
* [Logs/Windows Event] Add windows event log service restart detection and resubscribe
* [Metrics/JMX, Metrics/Net, Metrics/DiskIO] Change cumulative to delta conversion to drop initial value
* [Metrics/JMX] Suppress sessions unit warning
* [ContainerInsights] Filtering out redundant attributes for Neuron metrics

Enhancements:
* [Metrics/JMX] Add cumulative to delta conversion for JMX metrics
Expand Down
1 change: 0 additions & 1 deletion internal/containerinsightscommon/k8sconst.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const (
PodOwnersKey = "PodOwners"
HostKey = "host"
K8sKey = "kubernetes"
K8sLabelsKey = "labels"

RunningPodCount = "number_of_running_pods"
RunningContainerCount = "number_of_running_containers"
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package internal
import (
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

Expand Down Expand Up @@ -45,7 +46,6 @@ const (
Kubernetes = "kubernetes"
Region = "region"
SubnetId = "subnet_id"
RuntimeTagOverride = "DEFAULT"
NeuronExecutionErrorsAggregatedMetric = containerinsightscommon.NeuronExecutionErrors + "_total"
NeuronDeviceHardwareEccEventsAggregatedMetric = containerinsightscommon.NeuronDeviceHardwareEccEvents + "_total"
)
Expand Down Expand Up @@ -99,6 +99,26 @@ var (
"sram_ecc_corrected": NeuronDeviceHardwareEccEventsAggregatedMetric,
"sram_ecc_uncorrected": NeuronDeviceHardwareEccEventsAggregatedMetric},
}

MetricAttributesToKeep = map[string]struct{}{
ClusterName: {},
ContainerName: {},
FullPodName: {},
InstanceId: {},
InstanceType: {},
K8sPodName: {},
Namespace: {},
NeuronDevice: {},
NodeName: {},
PodName: {},
Service: {},
AvailabilityZone: {},
Kubernetes: {},
Region: {},
RuntimeTag: {},
SubnetId: {},
NeuronCore: {},
}
)

func NewMetricModifier(logger *zap.Logger) *AwsNeuronMetricModifier {
Expand All @@ -122,7 +142,7 @@ func (md *AwsNeuronMetricModifier) ModifyMetric(originalMetric pmetric.Metric, m
}
// Neuron metrics sent by the neuron monitor don't have any units so we add them in the agent.
addUnit(originalMetric)
updateCoreDeviceRuntimeLabels(originalMetric)
prefixCoreAndDeviceLabels(originalMetric)
resetStaleDatapoints(originalMetric)

originalMetricName := originalMetric.Name()
Expand All @@ -136,6 +156,7 @@ func (md *AwsNeuronMetricModifier) ModifyMetric(originalMetric pmetric.Metric, m
}

modifiedMetricSlice := md.extractDatapointsAsMetricsAndAggregate(originalMetric)
filterLabels(modifiedMetricSlice, originalMetricName)
md.duplicateMetrics(modifiedMetricSlice, originalMetricName, originalMetric.Sum().DataPoints(), metrics)
}

Expand Down Expand Up @@ -230,6 +251,7 @@ func (md *AwsNeuronMetricModifier) extractDatapointsAsMetricsAndAggregate(origin

// Creating body for the aggregated metric and add it to the new newMetricSlice for each runtime
for aggregatedMetricMetadata, value := range aggregatedValuesPerRuntimeTag {
// Aggregated metric for neuron device ecc events is not required
aggregatedMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), aggregatedMetricMetadata.aggregatedMetricName, originalMetric.Unit())

originalMetricDatapoints.At(0).CopyTo(aggregatedMetric.SetEmptySum().DataPoints().AppendEmpty())
Expand All @@ -247,9 +269,33 @@ func (md *AwsNeuronMetricModifier) extractDatapointsAsMetricsAndAggregate(origin
return newMetricSlice
}

// This method removes the attribute keys which are not required. The removal is necessary so that the metrics are grouped together
func filterLabels(slice pmetric.MetricSlice, originalMetricName string) {
_, exists := metricModificationsMap[originalMetricName]
if !exists {
return
}

for i := 0; i < slice.Len(); i++ {
m := slice.At(i)

dps := m.Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
attributes := dps.At(j).Attributes()
attributes.RemoveIf(func(label string, value pcommon.Value) bool {
_, exists := MetricAttributesToKeep[label]
if !exists {
return true
}
return false
})
}
}
}

// This method prefixes NeuronCore and NeuronDevice values with `core` and `device` respectively
// to make the attribute values more verbose
func updateCoreDeviceRuntimeLabels(originalMetric pmetric.Metric) {
func prefixCoreAndDeviceLabels(originalMetric pmetric.Metric) {
dps := originalMetric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
Expand All @@ -258,7 +304,6 @@ func updateCoreDeviceRuntimeLabels(originalMetric pmetric.Metric) {
dp.Attributes().PutStr(attributeKey, attributeValuePrefix+value.Str())
}
}
dp.Attributes().PutStr(RuntimeTag, RuntimeTagOverride)
}
}

Expand Down Expand Up @@ -315,7 +360,7 @@ func resetStaleDatapoints(originalMetric pmetric.Metric) {
dp := dps.At(i)
if dp.ValueType() == pmetric.NumberDataPointValueTypeEmpty || dp.Flags().NoRecordedValue() {
dp.SetDoubleValue(dp.DoubleValue())
dp.Attributes().PutStr(RuntimeTag, RuntimeTagOverride)
dp.Attributes().PutStr(RuntimeTag, "default")
dp.SetFlags(dp.Flags().WithNoRecordedValue(false))
}
}
Expand Down
Loading

0 comments on commit dbf51d8

Please sign in to comment.