Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the reference time rounding on Azure Metrics #37365

Merged
merged 11 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix the "api-version query parameter (?api-version=) is required for all requests" error in Azure Billing. {pull}37158[37158]
- Add memory hard limit from container metadata and remove usage percentage in AWS Fargate. {pull}37194[37194]
- Ignore parser errors from unsupported metrics types on Prometheus client and continue parsing until EOF is reached {pull}37383[37383]
- Fix the reference time rounding on Azure Metrics {issue}37204[37204] {pull}37365[37365]

*Osquerybeat*

Expand Down
11 changes: 8 additions & 3 deletions x-pack/metricbeat/module/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// depending on metric time grain (check `MetricRegistry`
// for more information).
//
// We truncate the reference time to the second to avoid millisecond
// variations in the collection period causing skipped collections.
referenceTime := time.Now().UTC().Truncate(time.Second)
// We round the reference time to the nearest second to avoid
// millisecond variations in the collection period causing
// skipped collections.
//
// See "Round outer limits" and "Round inner limits" tests in
// the metric_registry_test.go for more information.
//referenceTime := time.Now().UTC().Round(time.Second)
referenceTime := time.Now().UTC()

// Initialize cloud resources and monitor metrics
// information.
Expand Down
104 changes: 0 additions & 104 deletions x-pack/metricbeat/module/azure/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,110 +16,6 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

// NewMetricRegistry instantiates a new metric registry.
func NewMetricRegistry(logger *logp.Logger) *MetricRegistry {
return &MetricRegistry{
logger: logger,
collectionsInfo: make(map[string]MetricCollectionInfo),
}
}

// MetricRegistry keeps track of the last time a metric was collected and
// the time grain used.
//
// This is used to avoid collecting the same metric values over and over again
// when the time grain is larger than the collection interval.
type MetricRegistry struct {
logger *logp.Logger
collectionsInfo map[string]MetricCollectionInfo
}

// Update updates the metric registry with the latest timestamp and
// time grain for the given metric.
func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) {
m.collectionsInfo[m.buildMetricKey(metric)] = info
}

// NeedsUpdate returns true if the metric needs to be collected again
// for the given `referenceTime`.
func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool {
// Build a key to store the metric in the registry.
// The key is a combination of the namespace,
// resource ID and metric names.
metricKey := m.buildMetricKey(metric)

// Get the now time in UTC, only to be used for logging.
// It's interesting to see when the registry evaluate each
// metric in relation to the reference time.
now := time.Now().UTC()

if collection, exists := m.collectionsInfo[metricKey]; exists {
// Turn the time grain into a duration (for example, PT5M -> 5 minutes).
timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain)

// Calculate the start time of the time grain in relation to
// the reference time.
timeGrainStartTime := referenceTime.Add(-timeGrainDuration)

// If the last collection time is after the start time of the time grain,
// it means that we already have a value for the given time grain.
//
// In this case, the metricset does not need to collect the metric
// values again.
if collection.timestamp.After(timeGrainStartTime) {
m.logger.Debugw(
"MetricRegistry: Metric does not need an update",
"needs_update", false,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

return false
}

// The last collection time is before the start time of the time grain,
// it means that the metricset needs to collect the metric values again.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

return true
}

// If the metric is not in the registry, it means that it has never
// been collected before.
//
// In this case, we need to collect the metric.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
)

return true
}

// buildMetricKey builds a key for the metric registry.
//
// The key is a combination of the namespace, resource ID and metric names.
func (m *MetricRegistry) buildMetricKey(metric Metric) string {
keyComponents := []string{
metric.Namespace,
metric.ResourceId,
}
keyComponents = append(keyComponents, metric.Names...)

return strings.Join(keyComponents, ",")
}

// MetricCollectionInfo contains information about the last time
// a metric was collected and the time grain used.
type MetricCollectionInfo struct {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/azure/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ func compareMetricValues(metVal *float64, metricVal *float64) bool {
return false
}

// convertTimeGrainToDuration converts the Azure time grain options to the equivalent
// asDuration converts the Azure time grain options to the equivalent
// `time.Duration` value.
//
// For example, converts "PT1M" to `time.Minute`.
//
// See https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported#time-grain
// for more information.
func convertTimeGrainToDuration(timeGrain string) time.Duration {
func asDuration(timeGrain string) time.Duration {
var duration time.Duration
switch timeGrain {
case "PT1M":
Expand Down
125 changes: 125 additions & 0 deletions x-pack/metricbeat/module/azure/metric_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package azure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a clean copy-paste?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied and pasted the license header from another file; let me check if I picked a bad one 👀


import (
"strings"
"time"

"github.com/elastic/elastic-agent-libs/logp"
)

// NewMetricRegistry instantiates a new metric registry.
func NewMetricRegistry(logger *logp.Logger) *MetricRegistry {
return &MetricRegistry{
logger: logger,
collectionsInfo: make(map[string]MetricCollectionInfo),
jitter: 1 * time.Second,
}
}

// MetricRegistry keeps track of the last time a metric was collected and
// the time grain used.
//
// This is used to avoid collecting the same metric values over and over again
// when the time grain is larger than the collection interval.
type MetricRegistry struct {
logger *logp.Logger
collectionsInfo map[string]MetricCollectionInfo
// The collection period can be jittered by a second.
// We introduce a small jitter to avoid skipping collections
// when the collection period is close (usually < 1s) to the
// time grain start time.
jitter time.Duration
}

// Update updates the metric registry with the latest timestamp and
// time grain for the given metric.
func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) {
m.collectionsInfo[m.buildMetricKey(metric)] = info
}

// NeedsUpdate returns true if the metric needs to be collected again
// for the given `referenceTime`.
func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool {
// Build a key to store the metric in the registry.
// The key is a combination of the namespace,
// resource ID and metric names.
metricKey := m.buildMetricKey(metric)

if lastCollection, exists := m.collectionsInfo[metricKey]; exists {
// Turn the time grain into a duration (for example, PT5M -> 5 minutes).
timeGrainDuration := asDuration(lastCollection.timeGrain)

// Adjust the last collection time by adding a small jitter to avoid
// skipping collections when the collection period is close (usually < 1s).
timeSinceLastCollection := time.Since(lastCollection.timestamp) + m.jitter

if timeSinceLastCollection < timeGrainDuration {
m.logger.Debugw(
"MetricRegistry: Metric does not need an update",
"needs_update", false,
"reference_time", referenceTime,
"last_collection_time", lastCollection.timestamp,
"time_since_last_collection_seconds", timeSinceLastCollection.Seconds(),
"time_grain", lastCollection.timeGrain,
"time_grain_duration_seconds", timeGrainDuration.Seconds(),
"resource_id", metric.ResourceId,
"namespace", metric.Namespace,
"aggregation", metric.Aggregations,
"names", strings.Join(metric.Names, ","),
)

return false
}

// The last collection time is before the start time of the time grain,
// it means that the metricset needs to collect the metric values again.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"last_collection_time", lastCollection.timestamp,
"time_since_last_collection_seconds", timeSinceLastCollection.Seconds(),
"time_grain", lastCollection.timeGrain,
"time_grain_duration_seconds", timeGrainDuration.Seconds(),
"resource_id", metric.ResourceId,
"namespace", metric.Namespace,
"aggregation", metric.Aggregations,
"names", strings.Join(metric.Names, ","),
)

return true
}

// If the metric is not in the registry, it means that it has never
// been collected before.
//
// In this case, we need to collect the metric.
m.logger.Debugw(
"MetricRegistry: Metric needs an update (no collection info in the metric registry)",
"needs_update", true,
"reference_time", referenceTime,
"resource_id", metric.ResourceId,
"namespace", metric.Namespace,
"aggregation", metric.Aggregations,
"names", strings.Join(metric.Names, ","),
)

return true
}

// buildMetricKey builds a key for the metric registry.
//
// The key is a combination of the namespace, resource ID and metric names.
func (m *MetricRegistry) buildMetricKey(metric Metric) string {
keyComponents := []string{
metric.Namespace,
metric.ResourceId,
}
keyComponents = append(keyComponents, metric.Names...)

return strings.Join(keyComponents, ",")
}
93 changes: 93 additions & 0 deletions x-pack/metricbeat/module/azure/metric_registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package azure

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
)

func TestNewMetricRegistry(t *testing.T) {
logger := logp.NewLogger("test azure monitor")

t.Run("Collect metrics with a regular 5 minutes period", func(t *testing.T) {
metricRegistry := NewMetricRegistry(logger)

// Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time
lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z")

// Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time
referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:50.000Z")

metric := Metric{
ResourceId: "test",
Namespace: "test",
}
metricCollectionInfo := MetricCollectionInfo{
timeGrain: "PT5M",
timestamp: lastCollectionAt,
}

metricRegistry.Update(metric, metricCollectionInfo)

needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric)

assert.True(t, needsUpdate, "metric should need update")
})

t.Run("Collect metrics using a period 3 seconds longer than previous", func(t *testing.T) {
metricRegistry := NewMetricRegistry(logger)

// Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time
lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z")

// Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time
referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:53.000Z")

metric := Metric{
ResourceId: "test",
Namespace: "test",
}
metricCollectionInfo := MetricCollectionInfo{
timeGrain: "PT5M",
timestamp: lastCollectionAt,
}

metricRegistry.Update(metric, metricCollectionInfo)

needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric)

assert.True(t, needsUpdate, "metric should need update")
})

t.Run("Collect metrics using a period (1 second) shorter than previous", func(t *testing.T) {
metricRegistry := NewMetricRegistry(logger)

// Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time
referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z")

// Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time
lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T10:53:34.000Z")

metric := Metric{
ResourceId: "test",
Namespace: "test",
}
metricCollectionInfo := MetricCollectionInfo{
timeGrain: "PT5M",
timestamp: lastCollectionAt,
}

metricRegistry.Update(metric, metricCollectionInfo)

needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric)

assert.True(t, needsUpdate, "metric should not need update")
})
}
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/azure/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Metric struct {
Values []MetricValue
TimeGrain string
ResourceId string
// ResourceSubId is used for the metric values api as namespaces can apply to sub resrouces ex. storage account: container, blob, vm scaleset: vms
// ResourceSubId is used for the metric values api as namespaces can apply to sub resources ex. storage account: container, blob, vm scaleset: vms
ResourceSubId string
}

Expand Down
Loading