From e8000a62157fd232c0a1297f2fb6973b66195298 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Fri, 8 Dec 2023 19:28:04 +0100 Subject: [PATCH 01/11] Replace Truncate() with Round() During a testing session on 8.11.2, we noticed some skipped collections on one of the testing agents. Debug information revealed the metricset skipped some collections due to a 1-second difference between the reference time in the current collection, and the reference time in the previous collection, making the collection period is 1 second shorter (299s instead of 300s). Collection skip may happen due to reference time rounding. For example, the timestamp 2023-12-08T10:58:32.999Z may become 2023-12-08T10:58:32.000Z due to the truncation. As of today, this problem is happening on one agent only, but the problem is real, and we should replace the truncate(1s) with a round(1s) to eliminate fluctuations. --- x-pack/metricbeat/module/azure/azure.go | 10 +- x-pack/metricbeat/module/azure/client.go | 104 -------------- .../module/azure/metric_registry.go | 135 ++++++++++++++++++ .../module/azure/metric_registry_test.go | 106 ++++++++++++++ 4 files changed, 248 insertions(+), 107 deletions(-) create mode 100644 x-pack/metricbeat/module/azure/metric_registry.go create mode 100644 x-pack/metricbeat/module/azure/metric_registry_test.go diff --git a/x-pack/metricbeat/module/azure/azure.go b/x-pack/metricbeat/module/azure/azure.go index 7812feed838c..0920c66023e7 100644 --- a/x-pack/metricbeat/module/azure/azure.go +++ b/x-pack/metricbeat/module/azure/azure.go @@ -96,9 +96,13 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { // depending on metric time grain (check `MetricRegistry` // for more information). // - // We truncate the reference time to the second to avoid millisecond - // variations in the collection period causing skipped collections. - referenceTime := time.Now().UTC().Truncate(time.Second) + // We round the reference time to the nearest second to avoid + // millisecond variations in the collection period causing + // skipped collections. + // + // See "Round outer limits" and "Round inner limits" tests in + // the metric_registry_test.go for more information. + referenceTime := time.Now().UTC().Round(time.Second) // Initialize cloud resources and monitor metrics // information. diff --git a/x-pack/metricbeat/module/azure/client.go b/x-pack/metricbeat/module/azure/client.go index ce9a6cb824fc..3b22a5713cd3 100644 --- a/x-pack/metricbeat/module/azure/client.go +++ b/x-pack/metricbeat/module/azure/client.go @@ -16,110 +16,6 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -// NewMetricRegistry instantiates a new metric registry. -func NewMetricRegistry(logger *logp.Logger) *MetricRegistry { - return &MetricRegistry{ - logger: logger, - collectionsInfo: make(map[string]MetricCollectionInfo), - } -} - -// MetricRegistry keeps track of the last time a metric was collected and -// the time grain used. -// -// This is used to avoid collecting the same metric values over and over again -// when the time grain is larger than the collection interval. -type MetricRegistry struct { - logger *logp.Logger - collectionsInfo map[string]MetricCollectionInfo -} - -// Update updates the metric registry with the latest timestamp and -// time grain for the given metric. -func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) { - m.collectionsInfo[m.buildMetricKey(metric)] = info -} - -// NeedsUpdate returns true if the metric needs to be collected again -// for the given `referenceTime`. -func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool { - // Build a key to store the metric in the registry. - // The key is a combination of the namespace, - // resource ID and metric names. - metricKey := m.buildMetricKey(metric) - - // Get the now time in UTC, only to be used for logging. - // It's interesting to see when the registry evaluate each - // metric in relation to the reference time. - now := time.Now().UTC() - - if collection, exists := m.collectionsInfo[metricKey]; exists { - // Turn the time grain into a duration (for example, PT5M -> 5 minutes). - timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain) - - // Calculate the start time of the time grain in relation to - // the reference time. - timeGrainStartTime := referenceTime.Add(-timeGrainDuration) - - // If the last collection time is after the start time of the time grain, - // it means that we already have a value for the given time grain. - // - // In this case, the metricset does not need to collect the metric - // values again. - if collection.timestamp.After(timeGrainStartTime) { - m.logger.Debugw( - "MetricRegistry: Metric does not need an update", - "needs_update", false, - "reference_time", referenceTime, - "now", now, - "time_grain_start_time", timeGrainStartTime, - "last_collection_at", collection.timestamp, - ) - - return false - } - - // The last collection time is before the start time of the time grain, - // it means that the metricset needs to collect the metric values again. - m.logger.Debugw( - "MetricRegistry: Metric needs an update", - "needs_update", true, - "reference_time", referenceTime, - "now", now, - "time_grain_start_time", timeGrainStartTime, - "last_collection_at", collection.timestamp, - ) - - return true - } - - // If the metric is not in the registry, it means that it has never - // been collected before. - // - // In this case, we need to collect the metric. - m.logger.Debugw( - "MetricRegistry: Metric needs an update", - "needs_update", true, - "reference_time", referenceTime, - "now", now, - ) - - return true -} - -// buildMetricKey builds a key for the metric registry. -// -// The key is a combination of the namespace, resource ID and metric names. -func (m *MetricRegistry) buildMetricKey(metric Metric) string { - keyComponents := []string{ - metric.Namespace, - metric.ResourceId, - } - keyComponents = append(keyComponents, metric.Names...) - - return strings.Join(keyComponents, ",") -} - // MetricCollectionInfo contains information about the last time // a metric was collected and the time grain used. type MetricCollectionInfo struct { diff --git a/x-pack/metricbeat/module/azure/metric_registry.go b/x-pack/metricbeat/module/azure/metric_registry.go new file mode 100644 index 000000000000..e105657561dc --- /dev/null +++ b/x-pack/metricbeat/module/azure/metric_registry.go @@ -0,0 +1,135 @@ +package azure + +import ( + "github.com/elastic/elastic-agent-libs/logp" + "strings" + "time" +) + +// NewMetricRegistry instantiates a new metric registry. +func NewMetricRegistry(logger *logp.Logger) *MetricRegistry { + return &MetricRegistry{ + logger: logger, + collectionsInfo: make(map[string]MetricCollectionInfo), + } +} + +// MetricRegistry keeps track of the last time a metric was collected and +// the time grain used. +// +// This is used to avoid collecting the same metric values over and over again +// when the time grain is larger than the collection interval. +type MetricRegistry struct { + logger *logp.Logger + collectionsInfo map[string]MetricCollectionInfo +} + +// Update updates the metric registry with the latest timestamp and +// time grain for the given metric. +func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) { + m.collectionsInfo[m.buildMetricKey(metric)] = info +} + +// NeedsUpdate returns true if the metric needs to be collected again +// for the given `referenceTime`. +func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool { + // Build a key to store the metric in the registry. + // The key is a combination of the namespace, + // resource ID and metric names. + metricKey := m.buildMetricKey(metric) + + // Get the now time in UTC, only to be used for logging. + // It's interesting to see when the registry evaluate each + // metric in relation to the reference time. + now := time.Now().UTC() + + if collection, exists := m.collectionsInfo[metricKey]; exists { + // Turn the time grain into a duration (for example, PT5M -> 5 minutes). + timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain) + + // Calculate the start time of the time grain in relation to + // the reference time. + timeGrainStartTime := referenceTime.Add(-timeGrainDuration) + + // The collection period can be jittered by a few seconds. + // We introduce a small jitter to avoid skipping collections + // when the collection period is close (1-2 seconds) to the + // time grain. + //jitter := 3 * time.Second + + // The time elapsed since the last collection, only to be + // used for logging. + elapsed := referenceTime.Sub(collection.timestamp) + + // If the last collection time is after the start time of the time grain, + // it means that we already have a value for the given time grain. + // + // In this case, the metricset does not need to collect the metric + // values again. + if collection.timestamp.After(timeGrainStartTime) { + m.logger.Debugw( + "MetricRegistry: Metric does not need an update", + "needs_update", false, + "reference_time", referenceTime, + "now", now, + "time_grain_start_time", timeGrainStartTime, + "last_collection_at", collection.timestamp, + "time_grain", metric.TimeGrain, + "resource_id", metric.ResourceId, + "namespace", metric.Namespace, + "names", strings.Join(metric.Names, ","), + "elapsed", elapsed, + ) + + return false + } + + // The last collection time is before the start time of the time grain, + // it means that the metricset needs to collect the metric values again. + m.logger.Debugw( + "MetricRegistry: Metric needs an update", + "needs_update", true, + "reference_time", referenceTime, + "now", now, + "time_grain_start_time", timeGrainStartTime, + "last_collection_at", collection.timestamp, + "time_grain", metric.TimeGrain, + "resource_id", metric.ResourceId, + "namespace", metric.Namespace, + "names", strings.Join(metric.Names, ","), + "elapsed", elapsed, + ) + + return true + } + + // If the metric is not in the registry, it means that it has never + // been collected before. + // + // In this case, we need to collect the metric. + m.logger.Debugw( + "MetricRegistry: Metric needs an update", + "needs_update", true, + "reference_time", referenceTime, + "now", now, + "time_grain", metric.TimeGrain, + "resource_id", metric.ResourceId, + "namespace", metric.Namespace, + "names", strings.Join(metric.Names, ","), + ) + + return true +} + +// buildMetricKey builds a key for the metric registry. +// +// The key is a combination of the namespace, resource ID and metric names. +func (m *MetricRegistry) buildMetricKey(metric Metric) string { + keyComponents := []string{ + metric.Namespace, + metric.ResourceId, + } + keyComponents = append(keyComponents, metric.Names...) + + return strings.Join(keyComponents, ",") +} diff --git a/x-pack/metricbeat/module/azure/metric_registry_test.go b/x-pack/metricbeat/module/azure/metric_registry_test.go new file mode 100644 index 000000000000..e8d438f5a978 --- /dev/null +++ b/x-pack/metricbeat/module/azure/metric_registry_test.go @@ -0,0 +1,106 @@ +package azure + +import ( + "github.com/elastic/elastic-agent-libs/logp" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestNewMetricRegistry(t *testing.T) { + logger := logp.NewLogger("test azure monitor") + + t.Run("Collect metrics with a regular 5 minutes period", func(t *testing.T) { + metricRegistry := NewMetricRegistry(logger) + + // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z") + + // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:50.000Z") + + metric := Metric{ + ResourceId: "test", + Namespace: "test", + } + metricCollectionInfo := MetricCollectionInfo{ + timeGrain: "PT5M", + timestamp: lastCollectionAt, + } + + metricRegistry.Update(metric, metricCollectionInfo) + + needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + + assert.True(t, needsUpdate, "metric should need update") + }) + + t.Run("Collect metrics using a period 3 seconds longer than previous", func(t *testing.T) { + metricRegistry := NewMetricRegistry(logger) + + // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z") + + // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:53.000Z") + + metric := Metric{ + ResourceId: "test", + Namespace: "test", + } + metricCollectionInfo := MetricCollectionInfo{ + timeGrain: "PT5M", + timestamp: lastCollectionAt, + } + + metricRegistry.Update(metric, metricCollectionInfo) + + needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + + assert.True(t, needsUpdate, "metric should need update") + }) + + t.Run("Collect metrics using a period (1 second) shorter than previous", func(t *testing.T) { + metricRegistry := NewMetricRegistry(logger) + + // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z") + + // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T10:53:34.000Z") + + metric := Metric{ + ResourceId: "test", + Namespace: "test", + } + metricCollectionInfo := MetricCollectionInfo{ + timeGrain: "PT5M", + timestamp: lastCollectionAt, + } + + metricRegistry.Update(metric, metricCollectionInfo) + + needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + + assert.False(t, needsUpdate, "metric should not need update") + }) + + // + // These tests document the limits of the time.Round function used + // to round the reference time. + // + + t.Run("Round outer limits", func(t *testing.T) { + referenceTime1, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:32.500Z") + referenceTime2, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.499Z") + + assert.Equal(t, referenceTime1.Round(time.Second), referenceTime2.Round(time.Second)) + }) + + t.Run("Round inner limits", func(t *testing.T) { + referenceTime1, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:32.999Z") + referenceTime2, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.001Z") + + assert.Equal(t, referenceTime1.Round(time.Second), referenceTime2.Round(time.Second)) + }) +} From 44da10121e2b27651193f79f7c2d72604acc1216 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Fri, 8 Dec 2023 19:34:00 +0100 Subject: [PATCH 02/11] Add missing license header --- x-pack/metricbeat/module/azure/metric_registry.go | 4 ++++ x-pack/metricbeat/module/azure/metric_registry_test.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/x-pack/metricbeat/module/azure/metric_registry.go b/x-pack/metricbeat/module/azure/metric_registry.go index e105657561dc..a94fa1fcf295 100644 --- a/x-pack/metricbeat/module/azure/metric_registry.go +++ b/x-pack/metricbeat/module/azure/metric_registry.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package azure import ( diff --git a/x-pack/metricbeat/module/azure/metric_registry_test.go b/x-pack/metricbeat/module/azure/metric_registry_test.go index e8d438f5a978..b17b3d31acd9 100644 --- a/x-pack/metricbeat/module/azure/metric_registry_test.go +++ b/x-pack/metricbeat/module/azure/metric_registry_test.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package azure import ( From 142f7fbf6a257b82c5054834f34291cb44f2c8fe Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Fri, 8 Dec 2023 20:11:59 +0100 Subject: [PATCH 03/11] Fix linter objections --- x-pack/metricbeat/module/azure/metric_registry.go | 3 ++- x-pack/metricbeat/module/azure/metric_registry_test.go | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/metricbeat/module/azure/metric_registry.go b/x-pack/metricbeat/module/azure/metric_registry.go index a94fa1fcf295..00a1513a627c 100644 --- a/x-pack/metricbeat/module/azure/metric_registry.go +++ b/x-pack/metricbeat/module/azure/metric_registry.go @@ -5,9 +5,10 @@ package azure import ( - "github.com/elastic/elastic-agent-libs/logp" "strings" "time" + + "github.com/elastic/elastic-agent-libs/logp" ) // NewMetricRegistry instantiates a new metric registry. diff --git a/x-pack/metricbeat/module/azure/metric_registry_test.go b/x-pack/metricbeat/module/azure/metric_registry_test.go index b17b3d31acd9..bf7abd47743b 100644 --- a/x-pack/metricbeat/module/azure/metric_registry_test.go +++ b/x-pack/metricbeat/module/azure/metric_registry_test.go @@ -5,10 +5,12 @@ package azure import ( - "github.com/elastic/elastic-agent-libs/logp" - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" ) func TestNewMetricRegistry(t *testing.T) { From a86072aadd1b3a753a2dea74129099b99be86080 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Fri, 8 Dec 2023 20:55:15 +0100 Subject: [PATCH 04/11] Tests: look for precise nearest second value Not just equal, I want to check the value is the expected one. --- .../metricbeat/module/azure/metric_registry_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/x-pack/metricbeat/module/azure/metric_registry_test.go b/x-pack/metricbeat/module/azure/metric_registry_test.go index bf7abd47743b..657bdd98074f 100644 --- a/x-pack/metricbeat/module/azure/metric_registry_test.go +++ b/x-pack/metricbeat/module/azure/metric_registry_test.go @@ -93,20 +93,26 @@ func TestNewMetricRegistry(t *testing.T) { // // These tests document the limits of the time.Round function used - // to round the reference time. + // to round the reference time to the nearest second. // t.Run("Round outer limits", func(t *testing.T) { referenceTime1, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:32.500Z") referenceTime2, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.499Z") - assert.Equal(t, referenceTime1.Round(time.Second), referenceTime2.Round(time.Second)) + expected, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z") + + assert.Equal(t, expected, referenceTime1.Round(time.Second)) + assert.Equal(t, expected, referenceTime2.Round(time.Second)) }) t.Run("Round inner limits", func(t *testing.T) { referenceTime1, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:32.999Z") referenceTime2, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.001Z") - assert.Equal(t, referenceTime1.Round(time.Second), referenceTime2.Round(time.Second)) + expected, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z") + + assert.Equal(t, expected, referenceTime1.Round(time.Second)) + assert.Equal(t, expected, referenceTime2.Round(time.Second)) }) } From b3402d9e4161aa89ec550d0c3819573910e054ae Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 12 Dec 2023 15:08:25 +0100 Subject: [PATCH 05/11] Switch from round to compare with jitter Instead of truncating or rounding `referenceTime` to a value, I am opting to keep the `referenceTime` value intact and using a jitter when comparing it with the last collected time. Pros: - avoid having the thresholds we have with truncating or rounding, where a 1ms difference can flip the final result to the next or previous second. - using a jitter gives us more flexibility (we can make it configurable) - keeping the `referenceTime` value intact helps with troubleshooting --- x-pack/metricbeat/module/azure/azure.go | 3 +- .../module/azure/metric_registry.go | 48 +++++++++++-------- .../module/azure/metric_registry_test.go | 27 ++++++++++- 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/x-pack/metricbeat/module/azure/azure.go b/x-pack/metricbeat/module/azure/azure.go index 0920c66023e7..dd7f121b2697 100644 --- a/x-pack/metricbeat/module/azure/azure.go +++ b/x-pack/metricbeat/module/azure/azure.go @@ -102,7 +102,8 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { // // See "Round outer limits" and "Round inner limits" tests in // the metric_registry_test.go for more information. - referenceTime := time.Now().UTC().Round(time.Second) + //referenceTime := time.Now().UTC().Round(time.Second) + referenceTime := time.Now().UTC() // Initialize cloud resources and monitor metrics // information. diff --git a/x-pack/metricbeat/module/azure/metric_registry.go b/x-pack/metricbeat/module/azure/metric_registry.go index 00a1513a627c..7f0ef2fdc518 100644 --- a/x-pack/metricbeat/module/azure/metric_registry.go +++ b/x-pack/metricbeat/module/azure/metric_registry.go @@ -16,6 +16,7 @@ func NewMetricRegistry(logger *logp.Logger) *MetricRegistry { return &MetricRegistry{ logger: logger, collectionsInfo: make(map[string]MetricCollectionInfo), + jitter: 1 * time.Second, } } @@ -27,6 +28,11 @@ func NewMetricRegistry(logger *logp.Logger) *MetricRegistry { type MetricRegistry struct { logger *logp.Logger collectionsInfo map[string]MetricCollectionInfo + // The collection period can be jittered by a second. + // We introduce a small jitter to avoid skipping collections + // when the collection period is close (usually < 1s) to the + // time grain start time. + jitter time.Duration } // Update updates the metric registry with the latest timestamp and @@ -48,42 +54,43 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo // metric in relation to the reference time. now := time.Now().UTC() - if collection, exists := m.collectionsInfo[metricKey]; exists { + if lastCollection, exists := m.collectionsInfo[metricKey]; exists { // Turn the time grain into a duration (for example, PT5M -> 5 minutes). - timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain) + timeGrainDuration := convertTimeGrainToDuration(lastCollection.timeGrain) // Calculate the start time of the time grain in relation to // the reference time. timeGrainStartTime := referenceTime.Add(-timeGrainDuration) - // The collection period can be jittered by a few seconds. - // We introduce a small jitter to avoid skipping collections - // when the collection period is close (1-2 seconds) to the - // time grain. - //jitter := 3 * time.Second - - // The time elapsed since the last collection, only to be - // used for logging. - elapsed := referenceTime.Sub(collection.timestamp) + // Only to be used for logging. + // + // The time elapsed since the last collection, and the time + // distance between last collection and the start of time + // grain. + elapsed := referenceTime.Sub(lastCollection.timestamp) + distance := lastCollection.timestamp.Sub(timeGrainStartTime) // If the last collection time is after the start time of the time grain, // it means that we already have a value for the given time grain. // // In this case, the metricset does not need to collect the metric // values again. - if collection.timestamp.After(timeGrainStartTime) { + // + if lastCollection.timestamp.After(timeGrainStartTime.Add(m.jitter)) { m.logger.Debugw( "MetricRegistry: Metric does not need an update", "needs_update", false, "reference_time", referenceTime, "now", now, "time_grain_start_time", timeGrainStartTime, - "last_collection_at", collection.timestamp, - "time_grain", metric.TimeGrain, + "last_collection_at", lastCollection.timestamp, + "time_grain", lastCollection.timeGrain, "resource_id", metric.ResourceId, "namespace", metric.Namespace, "names", strings.Join(metric.Names, ","), - "elapsed", elapsed, + "elapsed", elapsed.String(), + "jitter", m.jitter.String(), + "distance", distance.String(), ) return false @@ -97,12 +104,14 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo "reference_time", referenceTime, "now", now, "time_grain_start_time", timeGrainStartTime, - "last_collection_at", collection.timestamp, - "time_grain", metric.TimeGrain, + "last_collection_at", lastCollection.timestamp, + "time_grain", lastCollection.timeGrain, "resource_id", metric.ResourceId, "namespace", metric.Namespace, "names", strings.Join(metric.Names, ","), - "elapsed", elapsed, + "elapsed", elapsed.String(), + "jitter", m.jitter.String(), + "distance", distance.String(), ) return true @@ -113,7 +122,7 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo // // In this case, we need to collect the metric. m.logger.Debugw( - "MetricRegistry: Metric needs an update", + "MetricRegistry: Metric needs an update (no collection info in the metric registry)", "needs_update", true, "reference_time", referenceTime, "now", now, @@ -121,6 +130,7 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo "resource_id", metric.ResourceId, "namespace", metric.Namespace, "names", strings.Join(metric.Names, ","), + "jitter", m.jitter.String(), ) return true diff --git a/x-pack/metricbeat/module/azure/metric_registry_test.go b/x-pack/metricbeat/module/azure/metric_registry_test.go index 657bdd98074f..1c8fc56f1333 100644 --- a/x-pack/metricbeat/module/azure/metric_registry_test.go +++ b/x-pack/metricbeat/module/azure/metric_registry_test.go @@ -88,9 +88,34 @@ func TestNewMetricRegistry(t *testing.T) { needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) - assert.False(t, needsUpdate, "metric should not need update") + assert.True(t, needsUpdate, "metric should not need update") }) + //t.Run("Collect metrics using a period (1 second) shorter than previous", func(t *testing.T) { + // metricRegistry := NewMetricRegistry(logger) + // + // // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + // referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z") + // + // // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + // lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T10:53:34.000Z") + // + // metric := Metric{ + // ResourceId: "test", + // Namespace: "test", + // } + // metricCollectionInfo := MetricCollectionInfo{ + // timeGrain: "PT5M", + // timestamp: lastCollectionAt, + // } + // + // metricRegistry.Update(metric, metricCollectionInfo) + // + // needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + // + // assert.False(t, needsUpdate, "metric should not need update") + //}) + // // These tests document the limits of the time.Round function used // to round the reference time to the nearest second. From b6e400734082360342fbb1f1f58f69c6d1307564 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 14 Dec 2023 08:08:42 +0100 Subject: [PATCH 06/11] Compare elapsed with timegrain duration --- x-pack/metricbeat/module/azure/client.go | 1 + .../module/azure/metric_registry.go | 70 +++++++++++-------- 2 files changed, 42 insertions(+), 29 deletions(-) diff --git a/x-pack/metricbeat/module/azure/client.go b/x-pack/metricbeat/module/azure/client.go index 3b22a5713cd3..a708866a9fd9 100644 --- a/x-pack/metricbeat/module/azure/client.go +++ b/x-pack/metricbeat/module/azure/client.go @@ -192,6 +192,7 @@ func (client *Client) GetMetricValues(referenceTime time.Time, metrics []Metric, client.MetricRegistry.Update(metric, MetricCollectionInfo{ timeGrain: timeGrain, timestamp: referenceTime, + //timestamp: time.Now().UTC(), }) for i, currentMetric := range client.ResourceConfigurations.Metrics { diff --git a/x-pack/metricbeat/module/azure/metric_registry.go b/x-pack/metricbeat/module/azure/metric_registry.go index 7f0ef2fdc518..ae9ca9fa7e04 100644 --- a/x-pack/metricbeat/module/azure/metric_registry.go +++ b/x-pack/metricbeat/module/azure/metric_registry.go @@ -49,26 +49,26 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo // resource ID and metric names. metricKey := m.buildMetricKey(metric) - // Get the now time in UTC, only to be used for logging. - // It's interesting to see when the registry evaluate each - // metric in relation to the reference time. - now := time.Now().UTC() + //// Get the now time in UTC, only to be used for logging. + //// It's interesting to see when the registry evaluate each + //// metric in relation to the reference time. + //now := time.Now().UTC() if lastCollection, exists := m.collectionsInfo[metricKey]; exists { // Turn the time grain into a duration (for example, PT5M -> 5 minutes). timeGrainDuration := convertTimeGrainToDuration(lastCollection.timeGrain) - // Calculate the start time of the time grain in relation to - // the reference time. - timeGrainStartTime := referenceTime.Add(-timeGrainDuration) + //// Calculate the start time of the time grain in relation to + //// the reference time. + //timeGrainStartTime := referenceTime.Add(-timeGrainDuration) - // Only to be used for logging. - // - // The time elapsed since the last collection, and the time - // distance between last collection and the start of time - // grain. - elapsed := referenceTime.Sub(lastCollection.timestamp) - distance := lastCollection.timestamp.Sub(timeGrainStartTime) + //// Only to be used for logging. + //// + //// The time elapsed since the last collection, and the time + //// distance between last collection and the start of time + //// grain. + //elapsed := referenceTime.Sub(lastCollection.timestamp) + //distance := lastCollection.timestamp.Sub(timeGrainStartTime) // If the last collection time is after the start time of the time grain, // it means that we already have a value for the given time grain. @@ -76,21 +76,29 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo // In this case, the metricset does not need to collect the metric // values again. // - if lastCollection.timestamp.After(timeGrainStartTime.Add(m.jitter)) { + // if time.Since(metricsByGrain.metricsValuesUpdated).Seconds() < float64(timeGrains[compositeKey.timeGrain]) { + //if lastCollection.timestamp.After(timeGrainStartTime.Add(m.jitter)) { + lastCollectionSeconds := time.Since(lastCollection.timestamp).Seconds() + timeGrainSeconds := timeGrainDuration.Seconds() + + if time.Since(lastCollection.timestamp).Seconds() < timeGrainDuration.Seconds() { m.logger.Debugw( "MetricRegistry: Metric does not need an update", "needs_update", false, "reference_time", referenceTime, - "now", now, - "time_grain_start_time", timeGrainStartTime, - "last_collection_at", lastCollection.timestamp, + //"now", now, + //"time_grain_start_time", timeGrainStartTime, + "last_collection_time", lastCollection.timestamp, "time_grain", lastCollection.timeGrain, "resource_id", metric.ResourceId, "namespace", metric.Namespace, + "aggregation", metric.Aggregations, "names", strings.Join(metric.Names, ","), - "elapsed", elapsed.String(), - "jitter", m.jitter.String(), - "distance", distance.String(), + //"elapsed", elapsed.String(), + //"jitter", m.jitter.String(), + //"distance", distance.String(), + "last_collection_seconds", lastCollectionSeconds, + "time_grain_seconds", timeGrainSeconds, ) return false @@ -102,16 +110,19 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo "MetricRegistry: Metric needs an update", "needs_update", true, "reference_time", referenceTime, - "now", now, - "time_grain_start_time", timeGrainStartTime, - "last_collection_at", lastCollection.timestamp, + //"now", now, + //"time_grain_start_time", timeGrainStartTime, + "last_collection_time", lastCollection.timestamp, "time_grain", lastCollection.timeGrain, "resource_id", metric.ResourceId, "namespace", metric.Namespace, + "aggregation", metric.Aggregations, "names", strings.Join(metric.Names, ","), - "elapsed", elapsed.String(), - "jitter", m.jitter.String(), - "distance", distance.String(), + //"elapsed", elapsed.String(), + //"jitter", m.jitter.String(), + //"distance", distance.String(), + "last_collection_seconds", lastCollectionSeconds, + "time_grain_seconds", timeGrainSeconds, ) return true @@ -125,12 +136,13 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo "MetricRegistry: Metric needs an update (no collection info in the metric registry)", "needs_update", true, "reference_time", referenceTime, - "now", now, + //"now", now, "time_grain", metric.TimeGrain, "resource_id", metric.ResourceId, "namespace", metric.Namespace, + "aggregation", metric.Aggregations, "names", strings.Join(metric.Names, ","), - "jitter", m.jitter.String(), + //"jitter", m.jitter.String(), ) return true From a9f7104d7a66df52e076a42d54daa216cde051e5 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 28 Dec 2023 20:10:51 +0100 Subject: [PATCH 07/11] Cleanup --- .../metricbeat/module/azure/client_utils.go | 4 +- .../module/azure/metric_registry.go | 59 ++++--------------- 2 files changed, 13 insertions(+), 50 deletions(-) diff --git a/x-pack/metricbeat/module/azure/client_utils.go b/x-pack/metricbeat/module/azure/client_utils.go index 986125ba6b68..114ccd95baf2 100644 --- a/x-pack/metricbeat/module/azure/client_utils.go +++ b/x-pack/metricbeat/module/azure/client_utils.go @@ -135,14 +135,14 @@ func compareMetricValues(metVal *float64, metricVal *float64) bool { return false } -// convertTimeGrainToDuration converts the Azure time grain options to the equivalent +// asDuration converts the Azure time grain options to the equivalent // `time.Duration` value. // // For example, converts "PT1M" to `time.Minute`. // // See https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported#time-grain // for more information. -func convertTimeGrainToDuration(timeGrain string) time.Duration { +func asDuration(timeGrain string) time.Duration { var duration time.Duration switch timeGrain { case "PT1M": diff --git a/x-pack/metricbeat/module/azure/metric_registry.go b/x-pack/metricbeat/module/azure/metric_registry.go index ae9ca9fa7e04..cdaa9496b5d6 100644 --- a/x-pack/metricbeat/module/azure/metric_registry.go +++ b/x-pack/metricbeat/module/azure/metric_registry.go @@ -49,56 +49,27 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo // resource ID and metric names. metricKey := m.buildMetricKey(metric) - //// Get the now time in UTC, only to be used for logging. - //// It's interesting to see when the registry evaluate each - //// metric in relation to the reference time. - //now := time.Now().UTC() - if lastCollection, exists := m.collectionsInfo[metricKey]; exists { // Turn the time grain into a duration (for example, PT5M -> 5 minutes). - timeGrainDuration := convertTimeGrainToDuration(lastCollection.timeGrain) - - //// Calculate the start time of the time grain in relation to - //// the reference time. - //timeGrainStartTime := referenceTime.Add(-timeGrainDuration) - - //// Only to be used for logging. - //// - //// The time elapsed since the last collection, and the time - //// distance between last collection and the start of time - //// grain. - //elapsed := referenceTime.Sub(lastCollection.timestamp) - //distance := lastCollection.timestamp.Sub(timeGrainStartTime) - - // If the last collection time is after the start time of the time grain, - // it means that we already have a value for the given time grain. - // - // In this case, the metricset does not need to collect the metric - // values again. - // - // if time.Since(metricsByGrain.metricsValuesUpdated).Seconds() < float64(timeGrains[compositeKey.timeGrain]) { - //if lastCollection.timestamp.After(timeGrainStartTime.Add(m.jitter)) { - lastCollectionSeconds := time.Since(lastCollection.timestamp).Seconds() - timeGrainSeconds := timeGrainDuration.Seconds() - - if time.Since(lastCollection.timestamp).Seconds() < timeGrainDuration.Seconds() { + timeGrainDuration := asDuration(lastCollection.timeGrain) + + // Adjust the last collection time by adding a small jitter to avoid + // skipping collections when the collection period is close (usually < 1s). + timeSinceLastCollection := time.Since(lastCollection.timestamp) + m.jitter + + if timeSinceLastCollection < timeGrainDuration { m.logger.Debugw( "MetricRegistry: Metric does not need an update", "needs_update", false, "reference_time", referenceTime, - //"now", now, - //"time_grain_start_time", timeGrainStartTime, "last_collection_time", lastCollection.timestamp, + "time_since_last_collection_seconds", timeSinceLastCollection.Seconds(), "time_grain", lastCollection.timeGrain, + "time_grain_duration_seconds", timeGrainDuration.Seconds(), "resource_id", metric.ResourceId, "namespace", metric.Namespace, "aggregation", metric.Aggregations, "names", strings.Join(metric.Names, ","), - //"elapsed", elapsed.String(), - //"jitter", m.jitter.String(), - //"distance", distance.String(), - "last_collection_seconds", lastCollectionSeconds, - "time_grain_seconds", timeGrainSeconds, ) return false @@ -110,19 +81,14 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo "MetricRegistry: Metric needs an update", "needs_update", true, "reference_time", referenceTime, - //"now", now, - //"time_grain_start_time", timeGrainStartTime, "last_collection_time", lastCollection.timestamp, + "time_since_last_collection_seconds", timeSinceLastCollection.Seconds(), "time_grain", lastCollection.timeGrain, + "time_grain_duration_seconds", timeGrainDuration.Seconds(), "resource_id", metric.ResourceId, "namespace", metric.Namespace, "aggregation", metric.Aggregations, "names", strings.Join(metric.Names, ","), - //"elapsed", elapsed.String(), - //"jitter", m.jitter.String(), - //"distance", distance.String(), - "last_collection_seconds", lastCollectionSeconds, - "time_grain_seconds", timeGrainSeconds, ) return true @@ -136,13 +102,10 @@ func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) boo "MetricRegistry: Metric needs an update (no collection info in the metric registry)", "needs_update", true, "reference_time", referenceTime, - //"now", now, - "time_grain", metric.TimeGrain, "resource_id", metric.ResourceId, "namespace", metric.Namespace, "aggregation", metric.Aggregations, "names", strings.Join(metric.Names, ","), - //"jitter", m.jitter.String(), ) return true From 1ad769307997df82f822610b763fcfe994ed6abf Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 4 Jan 2024 18:19:20 +0100 Subject: [PATCH 08/11] Cleanup Remove outdated tests --- .../module/azure/metric_registry_test.go | 50 ------------------- 1 file changed, 50 deletions(-) diff --git a/x-pack/metricbeat/module/azure/metric_registry_test.go b/x-pack/metricbeat/module/azure/metric_registry_test.go index 1c8fc56f1333..a0ecdc84b85d 100644 --- a/x-pack/metricbeat/module/azure/metric_registry_test.go +++ b/x-pack/metricbeat/module/azure/metric_registry_test.go @@ -90,54 +90,4 @@ func TestNewMetricRegistry(t *testing.T) { assert.True(t, needsUpdate, "metric should not need update") }) - - //t.Run("Collect metrics using a period (1 second) shorter than previous", func(t *testing.T) { - // metricRegistry := NewMetricRegistry(logger) - // - // // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time - // referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z") - // - // // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time - // lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T10:53:34.000Z") - // - // metric := Metric{ - // ResourceId: "test", - // Namespace: "test", - // } - // metricCollectionInfo := MetricCollectionInfo{ - // timeGrain: "PT5M", - // timestamp: lastCollectionAt, - // } - // - // metricRegistry.Update(metric, metricCollectionInfo) - // - // needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) - // - // assert.False(t, needsUpdate, "metric should not need update") - //}) - - // - // These tests document the limits of the time.Round function used - // to round the reference time to the nearest second. - // - - t.Run("Round outer limits", func(t *testing.T) { - referenceTime1, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:32.500Z") - referenceTime2, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.499Z") - - expected, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z") - - assert.Equal(t, expected, referenceTime1.Round(time.Second)) - assert.Equal(t, expected, referenceTime2.Round(time.Second)) - }) - - t.Run("Round inner limits", func(t *testing.T) { - referenceTime1, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:32.999Z") - referenceTime2, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.001Z") - - expected, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z") - - assert.Equal(t, expected, referenceTime1.Round(time.Second)) - assert.Equal(t, expected, referenceTime2.Round(time.Second)) - }) } From 6b4f3ab5bd7a09f685c3d5cb2b9a79d0634194f1 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 4 Jan 2024 18:23:02 +0100 Subject: [PATCH 09/11] Clean up Drop another commented line of code. --- x-pack/metricbeat/module/azure/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/metricbeat/module/azure/client.go b/x-pack/metricbeat/module/azure/client.go index a708866a9fd9..3b22a5713cd3 100644 --- a/x-pack/metricbeat/module/azure/client.go +++ b/x-pack/metricbeat/module/azure/client.go @@ -192,7 +192,6 @@ func (client *Client) GetMetricValues(referenceTime time.Time, metrics []Metric, client.MetricRegistry.Update(metric, MetricCollectionInfo{ timeGrain: timeGrain, timestamp: referenceTime, - //timestamp: time.Now().UTC(), }) for i, currentMetric := range client.ResourceConfigurations.Metrics { From 6277f0d316c7b06f77d03f914b4e67db9254e700 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Fri, 5 Jan 2024 16:10:36 +0100 Subject: [PATCH 10/11] Fix typo --- x-pack/metricbeat/module/azure/resources.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/metricbeat/module/azure/resources.go b/x-pack/metricbeat/module/azure/resources.go index 0a723c82bd5a..6a633663cb1c 100644 --- a/x-pack/metricbeat/module/azure/resources.go +++ b/x-pack/metricbeat/module/azure/resources.go @@ -38,7 +38,7 @@ type Metric struct { Values []MetricValue TimeGrain string ResourceId string - // ResourceSubId is used for the metric values api as namespaces can apply to sub resrouces ex. storage account: container, blob, vm scaleset: vms + // ResourceSubId is used for the metric values api as namespaces can apply to sub resources ex. storage account: container, blob, vm scaleset: vms ResourceSubId string } From cbf7167925c6c3b7d0695317cb99d19c2d586cea Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Fri, 5 Jan 2024 16:10:47 +0100 Subject: [PATCH 11/11] Add changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 76540fbc48c3..6875c33bb879 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix the "api-version query parameter (?api-version=) is required for all requests" error in Azure Billing. {pull}37158[37158] - Add memory hard limit from container metadata and remove usage percentage in AWS Fargate. {pull}37194[37194] - Ignore parser errors from unsupported metrics types on Prometheus client and continue parsing until EOF is reached {pull}37383[37383] +- Fix the reference time rounding on Azure Metrics {issue}37204[37204] {pull}37365[37365] *Osquerybeat*