diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8a81fe40002d..a58ef27df86f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -120,6 +120,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix CPU and memory metrics collection from privileged process on Windows {issue}17314[17314]{pull}37027[37027] - Enhanced Azure Metrics metricset with refined grouping logic and resolved duplication issues for TSDB compatibility {pull}36823[36823] - Fix memory leak on Windows {issue}37142[37142] {pull}37171[37171] +- Fix unintended skip in metric collection on Azure Monitor {issue}37204[37204] {pull}37203[37203] *Osquerybeat* diff --git a/x-pack/metricbeat/module/azure/azure.go b/x-pack/metricbeat/module/azure/azure.go index 8425e442912b..7812feed838c 100644 --- a/x-pack/metricbeat/module/azure/azure.go +++ b/x-pack/metricbeat/module/azure/azure.go @@ -6,6 +6,7 @@ package azure import ( "fmt" + "time" "github.com/elastic/beats/v7/metricbeat/mb" ) @@ -87,6 +88,18 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { // It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(report mb.ReporterV2) error { + // Set the reference time for the current fetch. + // + // The reference time is used to calculate time intervals + // and compare with collection info in the metric + // registry to decide whether to collect metrics or not, + // depending on metric time grain (check `MetricRegistry` + // for more information). + // + // We truncate the reference time to the second to avoid millisecond + // variations in the collection period causing skipped collections. + referenceTime := time.Now().UTC().Truncate(time.Second) + // Initialize cloud resources and monitor metrics // information. // @@ -116,7 +129,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { for _, metricsDefinition := range metricsByResourceId { // Fetch metric values for each resource. - metricValues := m.Client.GetMetricValues(metricsDefinition, report) + metricValues := m.Client.GetMetricValues(referenceTime, metricsDefinition, report) // Turns metric values into events and sends them to Elasticsearch. if err := mapToEvents(metricValues, m.Client, report); err != nil { diff --git a/x-pack/metricbeat/module/azure/client.go b/x-pack/metricbeat/module/azure/client.go index 13e2d65ec5b6..ce9a6cb824fc 100644 --- a/x-pack/metricbeat/module/azure/client.go +++ b/x-pack/metricbeat/module/azure/client.go @@ -17,8 +17,9 @@ import ( ) // NewMetricRegistry instantiates a new metric registry. -func NewMetricRegistry() *MetricRegistry { +func NewMetricRegistry(logger *logp.Logger) *MetricRegistry { return &MetricRegistry{ + logger: logger, collectionsInfo: make(map[string]MetricCollectionInfo), } } @@ -29,6 +30,7 @@ func NewMetricRegistry() *MetricRegistry { // This is used to avoid collecting the same metric values over and over again // when the time grain is larger than the collection interval. type MetricRegistry struct { + logger *logp.Logger collectionsInfo map[string]MetricCollectionInfo } @@ -38,26 +40,70 @@ func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) { m.collectionsInfo[m.buildMetricKey(metric)] = info } -// NeedsUpdate returns true if the metric needs to be updated. -func (m *MetricRegistry) NeedsUpdate(metric Metric) bool { +// NeedsUpdate returns true if the metric needs to be collected again +// for the given `referenceTime`. +func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool { + // Build a key to store the metric in the registry. // The key is a combination of the namespace, // resource ID and metric names. metricKey := m.buildMetricKey(metric) - if info, exists := m.collectionsInfo[metricKey]; exists { - duration := convertTimeGrainToDuration(info.timeGrain) + // Get the now time in UTC, only to be used for logging. + // It's interesting to see when the registry evaluate each + // metric in relation to the reference time. + now := time.Now().UTC() + + if collection, exists := m.collectionsInfo[metricKey]; exists { + // Turn the time grain into a duration (for example, PT5M -> 5 minutes). + timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain) + + // Calculate the start time of the time grain in relation to + // the reference time. + timeGrainStartTime := referenceTime.Add(-timeGrainDuration) + + // If the last collection time is after the start time of the time grain, + // it means that we already have a value for the given time grain. + // + // In this case, the metricset does not need to collect the metric + // values again. + if collection.timestamp.After(timeGrainStartTime) { + m.logger.Debugw( + "MetricRegistry: Metric does not need an update", + "needs_update", false, + "reference_time", referenceTime, + "now", now, + "time_grain_start_time", timeGrainStartTime, + "last_collection_at", collection.timestamp, + ) - // Check if the metric has been collected within a - // time period defined by the time grain. - if info.timestamp.After(time.Now().Add(duration * (-1))) { return false } + + // The last collection time is before the start time of the time grain, + // it means that the metricset needs to collect the metric values again. + m.logger.Debugw( + "MetricRegistry: Metric needs an update", + "needs_update", true, + "reference_time", referenceTime, + "now", now, + "time_grain_start_time", timeGrainStartTime, + "last_collection_at", collection.timestamp, + ) + + return true } // If the metric is not in the registry, it means that it has never // been collected before. // // In this case, we need to collect the metric. + m.logger.Debugw( + "MetricRegistry: Metric needs an update", + "needs_update", true, + "reference_time", referenceTime, + "now", now, + ) + return true } @@ -101,11 +147,13 @@ func NewClient(config Config) (*Client, error) { return nil, err } + logger := logp.NewLogger("azure monitor client") + client := &Client{ AzureMonitorService: azureMonitorService, Config: config, - Log: logp.NewLogger("azure monitor client"), - MetricRegistry: NewMetricRegistry(), + Log: logger, + MetricRegistry: NewMetricRegistry(logger), } client.ResourceConfigurations.RefreshInterval = config.RefreshListInterval @@ -176,11 +224,10 @@ func (client *Client) InitResources(fn mapResourceMetrics) error { } // GetMetricValues returns the metric values for the given cloud resources. -func (client *Client) GetMetricValues(metrics []Metric, reporter mb.ReporterV2) []Metric { +func (client *Client) GetMetricValues(referenceTime time.Time, metrics []Metric, reporter mb.ReporterV2) []Metric { var result []Metric // Same end time for all metrics in the same batch. - referenceTime := time.Now().UTC() interval := client.Config.Period // Fetch in the range [{-2 x INTERVAL},{-1 x INTERVAL}) with a delay of {INTERVAL}. @@ -208,7 +255,7 @@ func (client *Client) GetMetricValues(metrics []Metric, reporter mb.ReporterV2) // the time grain of the metric, we can determine if the metric needs // to be collected again, or if we can skip it. // - if !client.MetricRegistry.NeedsUpdate(metric) { + if !client.MetricRegistry.NeedsUpdate(referenceTime, metric) { continue } @@ -413,11 +460,12 @@ func (client *Client) AddVmToResource(resourceId string, vm VmResource) { // NewMockClient instantiates a new client with the mock azure service func NewMockClient() *Client { azureMockService := new(MockService) + logger := logp.NewLogger("test azure monitor") client := &Client{ AzureMonitorService: azureMockService, Config: Config{}, - Log: logp.NewLogger("test azure monitor"), - MetricRegistry: NewMetricRegistry(), + Log: logger, + MetricRegistry: NewMetricRegistry(logger), } return client } diff --git a/x-pack/metricbeat/module/azure/client_test.go b/x-pack/metricbeat/module/azure/client_test.go index 1f95604e3b89..79b1742ded0f 100644 --- a/x-pack/metricbeat/module/azure/client_test.go +++ b/x-pack/metricbeat/module/azure/client_test.go @@ -7,6 +7,7 @@ package azure import ( "errors" "testing" + "time" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources" @@ -66,6 +67,7 @@ func TestGetMetricValues(t *testing.T) { client.Config = resourceIDConfig t.Run("return no error when no metric values are returned but log and send event", func(t *testing.T) { + referenceTime := time.Now().UTC().Truncate(time.Second) client.ResourceConfigurations = ResourceConfiguration{ Metrics: []Metric{ { @@ -82,12 +84,13 @@ func TestGetMetricValues(t *testing.T) { client.AzureMonitorService = m mr := MockReporterV2{} mr.On("Error", mock.Anything).Return(true) - metrics := client.GetMetricValues(client.ResourceConfigurations.Metrics, &mr) + metrics := client.GetMetricValues(referenceTime, client.ResourceConfigurations.Metrics, &mr) assert.Equal(t, len(metrics), 0) assert.Equal(t, len(client.ResourceConfigurations.Metrics[0].Values), 0) m.AssertExpectations(t) }) t.Run("return metric values", func(t *testing.T) { + referenceTime := time.Now().UTC().Truncate(time.Second) client.ResourceConfigurations = ResourceConfiguration{ Metrics: []Metric{ { @@ -104,7 +107,7 @@ func TestGetMetricValues(t *testing.T) { client.AzureMonitorService = m mr := MockReporterV2{} mr.On("Error", mock.Anything).Return(true) - metricValues := client.GetMetricValues(client.ResourceConfigurations.Metrics, &mr) + metricValues := client.GetMetricValues(referenceTime, client.ResourceConfigurations.Metrics, &mr) assert.Equal(t, len(metricValues), 0) assert.Equal(t, len(client.ResourceConfigurations.Metrics[0].Values), 0) m.AssertExpectations(t)