Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.11](backport #37203) Fix unintended skip in metric collection on Azure Monitor #37222

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add remaining dimensions for azure storage account to make them available for tsdb enablement. {pull}36331[36331]
- Fix memory leak on Windows {issue}37142[37142] {pull}37171[37171]
- Enhanced Azure Metrics metricset with refined grouping logic and resolved duplication issues for TSDB compatibility {pull}36823[36823]
- Fix unintended skip in metric collection on Azure Monitor {issue}37204[37204] {pull}37203[37203]

*Osquerybeat*

Expand Down
15 changes: 14 additions & 1 deletion x-pack/metricbeat/module/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azure

import (
"fmt"
"time"

"github.com/elastic/beats/v7/metricbeat/mb"
)
Expand Down Expand Up @@ -87,6 +88,18 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
// It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Set the reference time for the current fetch.
//
// The reference time is used to calculate time intervals
// and compare with collection info in the metric
// registry to decide whether to collect metrics or not,
// depending on metric time grain (check `MetricRegistry`
// for more information).
//
// We truncate the reference time to the second to avoid millisecond
// variations in the collection period causing skipped collections.
referenceTime := time.Now().UTC().Truncate(time.Second)

// Initialize cloud resources and monitor metrics
// information.
//
Expand Down Expand Up @@ -116,7 +129,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {

for _, metricsDefinition := range metricsByResourceId {
// Fetch metric values for each resource.
metricValues := m.Client.GetMetricValues(metricsDefinition, report)
metricValues := m.Client.GetMetricValues(referenceTime, metricsDefinition, report)

// Turns metric values into events and sends them to Elasticsearch.
if err := mapToEvents(metricValues, m.Client, report); err != nil {
Expand Down
78 changes: 63 additions & 15 deletions x-pack/metricbeat/module/azure/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
)

// NewMetricRegistry instantiates a new metric registry.
func NewMetricRegistry() *MetricRegistry {
func NewMetricRegistry(logger *logp.Logger) *MetricRegistry {
return &MetricRegistry{
logger: logger,
collectionsInfo: make(map[string]MetricCollectionInfo),
}
}
Expand All @@ -29,6 +30,7 @@ func NewMetricRegistry() *MetricRegistry {
// This is used to avoid collecting the same metric values over and over again
// when the time grain is larger than the collection interval.
type MetricRegistry struct {
logger *logp.Logger
collectionsInfo map[string]MetricCollectionInfo
}

Expand All @@ -38,26 +40,70 @@ func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) {
m.collectionsInfo[m.buildMetricKey(metric)] = info
}

// NeedsUpdate returns true if the metric needs to be updated.
func (m *MetricRegistry) NeedsUpdate(metric Metric) bool {
// NeedsUpdate returns true if the metric needs to be collected again
// for the given `referenceTime`.
func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool {
// Build a key to store the metric in the registry.
// The key is a combination of the namespace,
// resource ID and metric names.
metricKey := m.buildMetricKey(metric)

if info, exists := m.collectionsInfo[metricKey]; exists {
duration := convertTimeGrainToDuration(info.timeGrain)
// Get the now time in UTC, only to be used for logging.
// It's interesting to see when the registry evaluate each
// metric in relation to the reference time.
now := time.Now().UTC()

if collection, exists := m.collectionsInfo[metricKey]; exists {
// Turn the time grain into a duration (for example, PT5M -> 5 minutes).
timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain)

// Calculate the start time of the time grain in relation to
// the reference time.
timeGrainStartTime := referenceTime.Add(-timeGrainDuration)

// If the last collection time is after the start time of the time grain,
// it means that we already have a value for the given time grain.
//
// In this case, the metricset does not need to collect the metric
// values again.
if collection.timestamp.After(timeGrainStartTime) {
m.logger.Debugw(
"MetricRegistry: Metric does not need an update",
"needs_update", false,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

// Check if the metric has been collected within a
// time period defined by the time grain.
if info.timestamp.After(time.Now().Add(duration * (-1))) {
return false
}

// The last collection time is before the start time of the time grain,
// it means that the metricset needs to collect the metric values again.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

return true
}

// If the metric is not in the registry, it means that it has never
// been collected before.
//
// In this case, we need to collect the metric.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
)

return true
}

Expand Down Expand Up @@ -101,11 +147,13 @@ func NewClient(config Config) (*Client, error) {
return nil, err
}

logger := logp.NewLogger("azure monitor client")

client := &Client{
AzureMonitorService: azureMonitorService,
Config: config,
Log: logp.NewLogger("azure monitor client"),
MetricRegistry: NewMetricRegistry(),
Log: logger,
MetricRegistry: NewMetricRegistry(logger),
}

client.ResourceConfigurations.RefreshInterval = config.RefreshListInterval
Expand Down Expand Up @@ -176,11 +224,10 @@ func (client *Client) InitResources(fn mapResourceMetrics) error {
}

// GetMetricValues returns the metric values for the given cloud resources.
func (client *Client) GetMetricValues(metrics []Metric, reporter mb.ReporterV2) []Metric {
func (client *Client) GetMetricValues(referenceTime time.Time, metrics []Metric, reporter mb.ReporterV2) []Metric {
var result []Metric

// Same end time for all metrics in the same batch.
referenceTime := time.Now().UTC()
interval := client.Config.Period

// Fetch in the range [{-2 x INTERVAL},{-1 x INTERVAL}) with a delay of {INTERVAL}.
Expand Down Expand Up @@ -208,7 +255,7 @@ func (client *Client) GetMetricValues(metrics []Metric, reporter mb.ReporterV2)
// the time grain of the metric, we can determine if the metric needs
// to be collected again, or if we can skip it.
//
if !client.MetricRegistry.NeedsUpdate(metric) {
if !client.MetricRegistry.NeedsUpdate(referenceTime, metric) {
continue
}

Expand Down Expand Up @@ -413,11 +460,12 @@ func (client *Client) AddVmToResource(resourceId string, vm VmResource) {
// NewMockClient instantiates a new client with the mock azure service
func NewMockClient() *Client {
azureMockService := new(MockService)
logger := logp.NewLogger("test azure monitor")
client := &Client{
AzureMonitorService: azureMockService,
Config: Config{},
Log: logp.NewLogger("test azure monitor"),
MetricRegistry: NewMetricRegistry(),
Log: logger,
MetricRegistry: NewMetricRegistry(logger),
}
return client
}
7 changes: 5 additions & 2 deletions x-pack/metricbeat/module/azure/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package azure
import (
"errors"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
Expand Down Expand Up @@ -66,6 +67,7 @@ func TestGetMetricValues(t *testing.T) {
client.Config = resourceIDConfig

t.Run("return no error when no metric values are returned but log and send event", func(t *testing.T) {
referenceTime := time.Now().UTC().Truncate(time.Second)
client.ResourceConfigurations = ResourceConfiguration{
Metrics: []Metric{
{
Expand All @@ -82,12 +84,13 @@ func TestGetMetricValues(t *testing.T) {
client.AzureMonitorService = m
mr := MockReporterV2{}
mr.On("Error", mock.Anything).Return(true)
metrics := client.GetMetricValues(client.ResourceConfigurations.Metrics, &mr)
metrics := client.GetMetricValues(referenceTime, client.ResourceConfigurations.Metrics, &mr)
assert.Equal(t, len(metrics), 0)
assert.Equal(t, len(client.ResourceConfigurations.Metrics[0].Values), 0)
m.AssertExpectations(t)
})
t.Run("return metric values", func(t *testing.T) {
referenceTime := time.Now().UTC().Truncate(time.Second)
client.ResourceConfigurations = ResourceConfiguration{
Metrics: []Metric{
{
Expand All @@ -104,7 +107,7 @@ func TestGetMetricValues(t *testing.T) {
client.AzureMonitorService = m
mr := MockReporterV2{}
mr.On("Error", mock.Anything).Return(true)
metricValues := client.GetMetricValues(client.ResourceConfigurations.Metrics, &mr)
metricValues := client.GetMetricValues(referenceTime, client.ResourceConfigurations.Metrics, &mr)
assert.Equal(t, len(metricValues), 0)
assert.Equal(t, len(client.ResourceConfigurations.Metrics[0].Values), 0)
m.AssertExpectations(t)
Expand Down
Loading