Skip to content

Commit

Permalink
Merge branch 'main' into zmoog/url-encode-next-link-in-azure-billing
Browse files Browse the repository at this point in the history
  • Loading branch information
zmoog authored Nov 29, 2023
2 parents 298de5a + 110cc31 commit 8dbccb7
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Enhanced Azure Metrics metricset with refined grouping logic and resolved duplication issues for TSDB compatibility {pull}36823[36823]
- Fix memory leak on Windows {issue}37142[37142] {pull}37171[37171]
- Fix the "api-version query parameter (?api-version=) is required for all requests" error in Azure Billing. {pull}37158[37158]
- Fix unintended skip in metric collection on Azure Monitor {issue}37204[37204] {pull}37203[37203]

*Osquerybeat*

Expand Down
6 changes: 3 additions & 3 deletions testing/environments/snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
version: '2.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0-7521d760-SNAPSHOT
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0-d6c4164b-SNAPSHOT
# When extend is used it merges healthcheck.tests, see:
# https://github.com/docker/compose/issues/8962
# healthcheck:
Expand Down Expand Up @@ -31,7 +31,7 @@ services:
- "./docker/elasticsearch/users_roles:/usr/share/elasticsearch/config/users_roles"

logstash:
image: docker.elastic.co/logstash/logstash:8.12.0-7521d760-SNAPSHOT
image: docker.elastic.co/logstash/logstash:8.12.0-d6c4164b-SNAPSHOT
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"]
retries: 600
Expand All @@ -44,7 +44,7 @@ services:
- 5055:5055

kibana:
image: docker.elastic.co/kibana/kibana:8.12.0-7521d760-SNAPSHOT
image: docker.elastic.co/kibana/kibana:8.12.0-d6c4164b-SNAPSHOT
environment:
- "ELASTICSEARCH_USERNAME=kibana_system_user"
- "ELASTICSEARCH_PASSWORD=testing"
Expand Down
15 changes: 14 additions & 1 deletion x-pack/metricbeat/module/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azure

import (
"fmt"
"time"

"github.com/elastic/beats/v7/metricbeat/mb"
)
Expand Down Expand Up @@ -87,6 +88,18 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
// It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Set the reference time for the current fetch.
//
// The reference time is used to calculate time intervals
// and compare with collection info in the metric
// registry to decide whether to collect metrics or not,
// depending on metric time grain (check `MetricRegistry`
// for more information).
//
// We truncate the reference time to the second to avoid millisecond
// variations in the collection period causing skipped collections.
referenceTime := time.Now().UTC().Truncate(time.Second)

// Initialize cloud resources and monitor metrics
// information.
//
Expand Down Expand Up @@ -116,7 +129,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {

for _, metricsDefinition := range metricsByResourceId {
// Fetch metric values for each resource.
metricValues := m.Client.GetMetricValues(metricsDefinition, report)
metricValues := m.Client.GetMetricValues(referenceTime, metricsDefinition, report)

// Turns metric values into events and sends them to Elasticsearch.
if err := mapToEvents(metricValues, m.Client, report); err != nil {
Expand Down
78 changes: 63 additions & 15 deletions x-pack/metricbeat/module/azure/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
)

// NewMetricRegistry instantiates a new metric registry.
func NewMetricRegistry() *MetricRegistry {
func NewMetricRegistry(logger *logp.Logger) *MetricRegistry {
return &MetricRegistry{
logger: logger,
collectionsInfo: make(map[string]MetricCollectionInfo),
}
}
Expand All @@ -29,6 +30,7 @@ func NewMetricRegistry() *MetricRegistry {
// This is used to avoid collecting the same metric values over and over again
// when the time grain is larger than the collection interval.
type MetricRegistry struct {
logger *logp.Logger
collectionsInfo map[string]MetricCollectionInfo
}

Expand All @@ -38,26 +40,70 @@ func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) {
m.collectionsInfo[m.buildMetricKey(metric)] = info
}

// NeedsUpdate returns true if the metric needs to be updated.
func (m *MetricRegistry) NeedsUpdate(metric Metric) bool {
// NeedsUpdate returns true if the metric needs to be collected again
// for the given `referenceTime`.
func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool {
// Build a key to store the metric in the registry.
// The key is a combination of the namespace,
// resource ID and metric names.
metricKey := m.buildMetricKey(metric)

if info, exists := m.collectionsInfo[metricKey]; exists {
duration := convertTimeGrainToDuration(info.timeGrain)
// Get the now time in UTC, only to be used for logging.
// It's interesting to see when the registry evaluate each
// metric in relation to the reference time.
now := time.Now().UTC()

if collection, exists := m.collectionsInfo[metricKey]; exists {
// Turn the time grain into a duration (for example, PT5M -> 5 minutes).
timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain)

// Calculate the start time of the time grain in relation to
// the reference time.
timeGrainStartTime := referenceTime.Add(-timeGrainDuration)

// If the last collection time is after the start time of the time grain,
// it means that we already have a value for the given time grain.
//
// In this case, the metricset does not need to collect the metric
// values again.
if collection.timestamp.After(timeGrainStartTime) {
m.logger.Debugw(
"MetricRegistry: Metric does not need an update",
"needs_update", false,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

// Check if the metric has been collected within a
// time period defined by the time grain.
if info.timestamp.After(time.Now().Add(duration * (-1))) {
return false
}

// The last collection time is before the start time of the time grain,
// it means that the metricset needs to collect the metric values again.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

return true
}

// If the metric is not in the registry, it means that it has never
// been collected before.
//
// In this case, we need to collect the metric.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
)

return true
}

Expand Down Expand Up @@ -101,11 +147,13 @@ func NewClient(config Config) (*Client, error) {
return nil, err
}

logger := logp.NewLogger("azure monitor client")

client := &Client{
AzureMonitorService: azureMonitorService,
Config: config,
Log: logp.NewLogger("azure monitor client"),
MetricRegistry: NewMetricRegistry(),
Log: logger,
MetricRegistry: NewMetricRegistry(logger),
}

client.ResourceConfigurations.RefreshInterval = config.RefreshListInterval
Expand Down Expand Up @@ -176,11 +224,10 @@ func (client *Client) InitResources(fn mapResourceMetrics) error {
}

// GetMetricValues returns the metric values for the given cloud resources.
func (client *Client) GetMetricValues(metrics []Metric, reporter mb.ReporterV2) []Metric {
func (client *Client) GetMetricValues(referenceTime time.Time, metrics []Metric, reporter mb.ReporterV2) []Metric {
var result []Metric

// Same end time for all metrics in the same batch.
referenceTime := time.Now().UTC()
interval := client.Config.Period

// Fetch in the range [{-2 x INTERVAL},{-1 x INTERVAL}) with a delay of {INTERVAL}.
Expand Down Expand Up @@ -208,7 +255,7 @@ func (client *Client) GetMetricValues(metrics []Metric, reporter mb.ReporterV2)
// the time grain of the metric, we can determine if the metric needs
// to be collected again, or if we can skip it.
//
if !client.MetricRegistry.NeedsUpdate(metric) {
if !client.MetricRegistry.NeedsUpdate(referenceTime, metric) {
continue
}

Expand Down Expand Up @@ -413,11 +460,12 @@ func (client *Client) AddVmToResource(resourceId string, vm VmResource) {
// NewMockClient instantiates a new client with the mock azure service
func NewMockClient() *Client {
azureMockService := new(MockService)
logger := logp.NewLogger("test azure monitor")
client := &Client{
AzureMonitorService: azureMockService,
Config: Config{},
Log: logp.NewLogger("test azure monitor"),
MetricRegistry: NewMetricRegistry(),
Log: logger,
MetricRegistry: NewMetricRegistry(logger),
}
return client
}
7 changes: 5 additions & 2 deletions x-pack/metricbeat/module/azure/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package azure
import (
"errors"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
Expand Down Expand Up @@ -66,6 +67,7 @@ func TestGetMetricValues(t *testing.T) {
client.Config = resourceIDConfig

t.Run("return no error when no metric values are returned but log and send event", func(t *testing.T) {
referenceTime := time.Now().UTC().Truncate(time.Second)
client.ResourceConfigurations = ResourceConfiguration{
Metrics: []Metric{
{
Expand All @@ -82,12 +84,13 @@ func TestGetMetricValues(t *testing.T) {
client.AzureMonitorService = m
mr := MockReporterV2{}
mr.On("Error", mock.Anything).Return(true)
metrics := client.GetMetricValues(client.ResourceConfigurations.Metrics, &mr)
metrics := client.GetMetricValues(referenceTime, client.ResourceConfigurations.Metrics, &mr)
assert.Equal(t, len(metrics), 0)
assert.Equal(t, len(client.ResourceConfigurations.Metrics[0].Values), 0)
m.AssertExpectations(t)
})
t.Run("return metric values", func(t *testing.T) {
referenceTime := time.Now().UTC().Truncate(time.Second)
client.ResourceConfigurations = ResourceConfiguration{
Metrics: []Metric{
{
Expand All @@ -104,7 +107,7 @@ func TestGetMetricValues(t *testing.T) {
client.AzureMonitorService = m
mr := MockReporterV2{}
mr.On("Error", mock.Anything).Return(true)
metricValues := client.GetMetricValues(client.ResourceConfigurations.Metrics, &mr)
metricValues := client.GetMetricValues(referenceTime, client.ResourceConfigurations.Metrics, &mr)
assert.Equal(t, len(metricValues), 0)
assert.Equal(t, len(client.ResourceConfigurations.Metrics[0].Values), 0)
m.AssertExpectations(t)
Expand Down

0 comments on commit 8dbccb7

Please sign in to comment.