Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/kubeletstatsreceiver] Emit network metrics without a direction attribute #12588

Merged
merged 12 commits into from
Aug 12, 2022
45 changes: 45 additions & 0 deletions receiver/kubeletstatsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,50 @@ The following parameters can also be specified:
The full list of settings exposed for this receiver are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).

## Metrics

Details about the metrics produced by this receiver can be found in [metadata.yaml](./metadata.yaml) with further documentation in [documentation.md](./documentation.md)

### Feature gate configurations

#### Transition from metrics with "direction" attribute

Some kubeletstats metrics reported are transitioning from being reported with a `direction` attribute to being reported with the
direction included in the metric name to adhere to the OpenTelemetry specification
(https://github.com/open-telemetry/opentelemetry-specification/pull/2617):

- `k8s.node.network.io` will become:
- `k8s.node.network.io.transmit`
- `k8s.node.network.io.receive`
- `k8s.node.network.errors` will become:
- `k8s.node.network.errors.transmit`
- `k8s.node.network.errors.receive`

The following feature gates control the transition process:

- **receiver.kubeletstatsreceiver.emitMetricsWithoutDirectionAttribute**: controls if the new metrics without `direction` attribute are emitted by the receiver.
- **receiver.kubeletstatsreceiver.emitMetricsWithDirectionAttribute**: controls if the deprecated metrics with `direction` attribute are emitted by the receiver.

##### Transition schedule:

1. v0.58.0, Aug 2022:

- The new metrics are available for all scrapers, but disabled by default, they can be enabled with the feature gates.
- The old metrics with `direction` attribute are deprecated with a warning.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning log still needs to be added

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added warnings. In doing so, I noticed that some were removed in this PR: #12105. They were supposed to be added back in before the v0.56.0 release, but it doesn't appear that happened. I can make an issue and add them back if that's something we'd like to do. I used the phrasing from the original warnings. If we'd like to make any changes, suggest them here and I can apply them when adding the warnings elsewhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mwear, sorry for the delayed response. As mentioned #11815, we would like to avoid showing the warnings for now. Can you please remove them.

Sorry for raising the contradicting comments.

- `receiver.kubeletstatsreceiver.emitMetricsWithDirectionAttribute` is enabled by default.
- `receiver.kubeletstatsreceiver.emitMetricsWithoutDirectionAttribute` is disabled by default.

2. v0.59.0, August 2022:

- The new metrics are enabled by default, deprecated metrics disabled, they can be enabled with the feature gates.
- `receiver.kubeletstatsreceiver.emitMetricsWithDirectionAttribute` is disabled by default.
- `receiver.kubeletstatsreceiver.emitMetricsWithoutDirectionAttribute` is enabled by default.

3. v0.60.0, September 2022:

- The feature gates are removed.
- The new metrics without `direction` attribute are always emitted.
- The deprecated metrics with `direction` attribute are no longer available.

dmitryax marked this conversation as resolved.
Show resolved Hide resolved
[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
8 changes: 8 additions & 0 deletions receiver/kubeletstatsreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ These are the metrics available for this scraper.
| **k8s.node.memory.usage** | Node memory usage | By | Gauge(Int) | <ul> </ul> |
| **k8s.node.memory.working_set** | Node memory working_set | By | Gauge(Int) | <ul> </ul> |
| **k8s.node.network.errors** | Node network errors | 1 | Sum(Int) | <ul> <li>interface</li> <li>direction</li> </ul> |
| **k8s.node.network.errors.receive** | Node network receive errors | 1 | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.node.network.errors.transmit** | Node network transmission errors | 1 | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.node.network.io** | Node network IO | By | Sum(Int) | <ul> <li>interface</li> <li>direction</li> </ul> |
| **k8s.node.network.io.receive** | Node network IO received | By | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.node.network.io.transmit** | Node network IO transmitted | By | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.pod.cpu.time** | Pod CPU time | s | Sum(Double) | <ul> </ul> |
| **k8s.pod.cpu.utilization** | Pod CPU utilization | 1 | Gauge(Double) | <ul> </ul> |
| **k8s.pod.filesystem.available** | Pod filesystem available | By | Gauge(Int) | <ul> </ul> |
Expand All @@ -44,7 +48,11 @@ These are the metrics available for this scraper.
| **k8s.pod.memory.usage** | Pod memory usage | By | Gauge(Int) | <ul> </ul> |
| **k8s.pod.memory.working_set** | Pod memory working_set | By | Gauge(Int) | <ul> </ul> |
| **k8s.pod.network.errors** | Pod network errors | 1 | Sum(Int) | <ul> <li>interface</li> <li>direction</li> </ul> |
| **k8s.pod.network.errors.receive** | Pod network receive errors | 1 | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.pod.network.errors.transmit** | Pod network transmission errors | 1 | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.pod.network.io** | Pod network IO | By | Sum(Int) | <ul> <li>interface</li> <li>direction</li> </ul> |
| **k8s.pod.network.io.receive** | Pod network IO received | By | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.pod.network.io.transmit** | Pod network IO transmitted | By | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.volume.available** | The number of available bytes in the volume. | By | Gauge(Int) | <ul> </ul> |
| **k8s.volume.capacity** | The total capacity in bytes of the volume. | By | Gauge(Int) | <ul> </ul> |
| **k8s.volume.inodes** | The total inodes in the filesystem. | 1 | Gauge(Int) | <ul> </ul> |
Expand Down
28 changes: 20 additions & 8 deletions receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ var ValidMetricGroups = map[MetricGroup]bool{
}

type metricDataAccumulator struct {
m []pmetric.Metrics
metadata Metadata
logger *zap.Logger
metricGroupsToCollect map[MetricGroup]bool
time time.Time
mbs *metadata.MetricsBuilders
m []pmetric.Metrics
metadata Metadata
logger *zap.Logger
metricGroupsToCollect map[MetricGroup]bool
time time.Time
mbs *metadata.MetricsBuilders
emitMetricsWithDirectionAttribute bool
emitMetricsWithoutDirectionAttribute bool
}

func (a *metricDataAccumulator) nodeStats(s stats.NodeStats) {
Expand All @@ -61,7 +63,12 @@ func (a *metricDataAccumulator) nodeStats(s stats.NodeStats) {
addCPUMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeFilesystemMetrics, s.Fs, currentTime)
addNetworkMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeNetworkMetrics, s.Network, currentTime)
if a.emitMetricsWithDirectionAttribute {
addNetworkMetricsWithDirection(a.mbs.NodeMetricsBuilder, metadata.NodeNetworkMetricsWithDirection, s.Network, currentTime)
}
if a.emitMetricsWithoutDirectionAttribute {
addNetworkMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeNetworkMetrics, s.Network, currentTime)
}
// todo s.Runtime.ImageFs

a.m = append(a.m, a.mbs.NodeMetricsBuilder.Emit(
Expand All @@ -79,7 +86,12 @@ func (a *metricDataAccumulator) podStats(s stats.PodStats) {
addCPUMetrics(a.mbs.PodMetricsBuilder, metadata.PodCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.PodMetricsBuilder, metadata.PodMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.PodMetricsBuilder, metadata.PodFilesystemMetrics, s.EphemeralStorage, currentTime)
addNetworkMetrics(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetrics, s.Network, currentTime)
if a.emitMetricsWithDirectionAttribute {
addNetworkMetricsWithDirection(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetricsWithDirection, s.Network, currentTime)
}
if a.emitMetricsWithoutDirectionAttribute {
addNetworkMetrics(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetrics, s.Network, currentTime)
}

a.m = append(a.m, a.mbs.PodMetricsBuilder.Emit(
metadata.WithStartTimeOverride(pcommon.NewTimestampFromTime(s.StartTime.Time)),
Expand Down
16 changes: 10 additions & 6 deletions receiver/kubeletstatsreceiver/internal/kubelet/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ func MetricsData(
logger *zap.Logger, summary *stats.Summary,
metadata Metadata,
metricGroupsToCollect map[MetricGroup]bool,
mbs *metadata.MetricsBuilders) []pmetric.Metrics {
mbs *metadata.MetricsBuilders,
emitMetricsWithDirectionAttribute,
emitMetricsWithoutDirectionAttribute bool) []pmetric.Metrics {
acc := &metricDataAccumulator{
metadata: metadata,
logger: logger,
metricGroupsToCollect: metricGroupsToCollect,
time: time.Now(),
mbs: mbs,
metadata: metadata,
logger: logger,
metricGroupsToCollect: metricGroupsToCollect,
time: time.Now(),
mbs: mbs,
emitMetricsWithDirectionAttribute: emitMetricsWithDirectionAttribute,
emitMetricsWithoutDirectionAttribute: emitMetricsWithoutDirectionAttribute,
}
acc.nodeStats(summary.Node)
for _, podStats := range summary.Pods {
Expand Down
60 changes: 51 additions & 9 deletions receiver/kubeletstatsreceiver/internal/kubelet/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func TestMetricAccumulator(t *testing.T) {
ContainerMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings(), componenttest.NewNopReceiverCreateSettings().BuildInfo),
OtherMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings(), componenttest.NewNopReceiverCreateSettings().BuildInfo),
}
requireMetricsOk(t, MetricsData(zap.NewNop(), summary, k8sMetadata, ValidMetricGroups, mbs))
requireMetricsOk(t, MetricsData(zap.NewNop(), summary, k8sMetadata, ValidMetricGroups, mbs, true, false))
// Disable all groups
mbs.NodeMetricsBuilder.Reset()
mbs.PodMetricsBuilder.Reset()
mbs.OtherMetricsBuilder.Reset()
require.Equal(t, 0, len(MetricsData(zap.NewNop(), summary, k8sMetadata, map[MetricGroup]bool{}, mbs)))
require.Equal(t, 0, len(MetricsData(zap.NewNop(), summary, k8sMetadata, map[MetricGroup]bool{}, mbs, true, false)))
}

func requireMetricsOk(t *testing.T, mds []pmetric.Metrics) {
Expand Down Expand Up @@ -112,7 +112,7 @@ func requireResourceOk(t *testing.T, resource pcommon.Resource) {
}

func TestWorkingSetMem(t *testing.T) {
metrics := indexedFakeMetrics()
metrics := indexedFakeMetrics(true, false)
requireContains(t, metrics, "k8s.pod.memory.working_set")
requireContains(t, metrics, "container.memory.working_set")

Expand All @@ -122,7 +122,7 @@ func TestWorkingSetMem(t *testing.T) {
}

func TestPageFaults(t *testing.T) {
metrics := indexedFakeMetrics()
metrics := indexedFakeMetrics(true, false)
requireContains(t, metrics, "k8s.pod.memory.page_faults")
requireContains(t, metrics, "container.memory.page_faults")

Expand All @@ -132,7 +132,7 @@ func TestPageFaults(t *testing.T) {
}

func TestMajorPageFaults(t *testing.T) {
metrics := indexedFakeMetrics()
metrics := indexedFakeMetrics(true, false)
requireContains(t, metrics, "k8s.pod.memory.major_page_faults")
requireContains(t, metrics, "container.memory.major_page_faults")

Expand All @@ -141,13 +141,55 @@ func TestMajorPageFaults(t *testing.T) {
require.Equal(t, int64(12), value)
}

func TestEmitMetricsWithDirectionAttribute(t *testing.T) {
metrics := indexedFakeMetrics(true, false)
metricNamesWithDirectionAttr := []string{
"k8s.node.network.io",
"k8s.node.network.errors",
"k8s.pod.network.io",
"k8s.pod.network.errors",
}
for _, name := range metricNamesWithDirectionAttr {
requireContains(t, metrics, name)
metric := metrics[name][0]
for i := 0; i < metric.Sum().DataPoints().Len(); i++ {
dp := metric.Sum().DataPoints().At(i)
_, found := dp.Attributes().Get("direction")
require.True(t, found, "expected direction attribute")
}
}
}

func TestEmitMetricsWithoutDirectionAttribute(t *testing.T) {
metrics := indexedFakeMetrics(false, true)
metricNamesWithoutDirectionAttr := []string{
"k8s.node.network.io.receive",
"k8s.node.network.io.transmit",
"k8s.node.network.errors.receive",
"k8s.node.network.errors.transmit",
"k8s.pod.network.io.receive",
"k8s.pod.network.io.transmit",
"k8s.pod.network.errors.receive",
"k8s.pod.network.errors.transmit",
}
for _, name := range metricNamesWithoutDirectionAttr {
requireContains(t, metrics, name)
metric := metrics[name][0]
for i := 0; i < metric.Sum().DataPoints().Len(); i++ {
dp := metric.Sum().DataPoints().At(i)
_, found := dp.Attributes().Get("direction")
require.False(t, found, "unexpected direction attribute")
}
}
}

func requireContains(t *testing.T, metrics map[string][]pmetric.Metric, metricName string) {
_, found := metrics[metricName]
require.True(t, found)
}

func indexedFakeMetrics() map[string][]pmetric.Metric {
mds := fakeMetrics()
func indexedFakeMetrics(emitMetricsWithDirectionAttribute, emitMetricsWithoutDirectionAttribute bool) map[string][]pmetric.Metric {
mds := fakeMetrics(emitMetricsWithDirectionAttribute, emitMetricsWithoutDirectionAttribute)
metrics := make(map[string][]pmetric.Metric)
for _, md := range mds {
for i := 0; i < md.ResourceMetrics().Len(); i++ {
Expand All @@ -167,7 +209,7 @@ func indexedFakeMetrics() map[string][]pmetric.Metric {
return metrics
}

func fakeMetrics() []pmetric.Metrics {
func fakeMetrics(emitMetricsWithDirectionAttribute, emitMetricsWithoutDirectionAttribute bool) []pmetric.Metrics {
rc := &fakeRestClient{}
statsProvider := NewStatsProvider(rc)
summary, _ := statsProvider.StatsSummary()
Expand All @@ -182,5 +224,5 @@ func fakeMetrics() []pmetric.Metrics {
ContainerMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings(), componenttest.NewNopReceiverCreateSettings().BuildInfo),
OtherMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings(), componenttest.NewNopReceiverCreateSettings().BuildInfo),
}
return MetricsData(zap.NewNop(), summary, Metadata{}, mgs, mbs)
return MetricsData(zap.NewNop(), summary, Metadata{}, mgs, mbs, emitMetricsWithDirectionAttribute, emitMetricsWithoutDirectionAttribute)
}
20 changes: 19 additions & 1 deletion receiver/kubeletstatsreceiver/internal/kubelet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,25 @@ func addNetworkMetrics(mb *metadata.MetricsBuilder, networkMetrics metadata.Netw
recordNetworkDataPoint(mb, networkMetrics.Errors, s, currentTime)
}

func recordNetworkDataPoint(mb *metadata.MetricsBuilder, recordDataPoint metadata.RecordIntDataPointWithDirectionFunc, s *stats.NetworkStats, currentTime pcommon.Timestamp) {
func recordNetworkDataPoint(mb *metadata.MetricsBuilder, r metadata.NetworkMetricsRecorder, s *stats.NetworkStats, currentTime pcommon.Timestamp) {
if s.RxBytes == nil && s.TxBytes == nil {
return
}

r.RecordReceiveDataPoint(mb, currentTime, int64(*s.RxBytes), s.Name)
r.RecordTransmitDataPoint(mb, currentTime, int64(*s.TxBytes), s.Name)
}

func addNetworkMetricsWithDirection(mb *metadata.MetricsBuilder, networkMetrics metadata.NetworkMetricsWithDirection, s *stats.NetworkStats, currentTime pcommon.Timestamp) {
if s == nil {
return
}

recordNetworkDataPointWithDirection(mb, networkMetrics.IO, s, currentTime)
recordNetworkDataPointWithDirection(mb, networkMetrics.Errors, s, currentTime)
}

func recordNetworkDataPointWithDirection(mb *metadata.MetricsBuilder, recordDataPoint metadata.RecordIntDataPointWithDirectionFunc, s *stats.NetworkStats, currentTime pcommon.Timestamp) {
if s.RxBytes == nil && s.TxBytes == nil {
return
}
Expand Down
Loading