From 978850e799b06906b39fcf5e47e309df6efa6da2 Mon Sep 17 00:00:00 2001 From: Gunnar <628831+gunnaraasen@users.noreply.github.com> Date: Wed, 5 Sep 2018 14:50:32 -0700 Subject: [PATCH] Add Azure Monitor output plugin (#4089) --- Gopkg.lock | 22 +- Gopkg.toml | 4 + docs/LICENSE_OF_DEPENDENCIES.md | 2 + internal/models/running_output.go | 13 + output.go | 6 + plugins/outputs/all/all.go | 1 + plugins/outputs/azure_monitor/README.md | 139 ++++ .../outputs/azure_monitor/azure_monitor.go | 615 ++++++++++++++++++ .../azure_monitor/azure_monitor_test.go | 361 ++++++++++ plugins/parsers/logfmt/parser_test.go | 119 ++-- testutil/metric.go | 87 ++- testutil/metric_test.go | 37 +- 12 files changed, 1318 insertions(+), 88 deletions(-) create mode 100644 plugins/outputs/azure_monitor/README.md create mode 100644 plugins/outputs/azure_monitor/azure_monitor.go create mode 100644 plugins/outputs/azure_monitor/azure_monitor_test.go diff --git a/Gopkg.lock b/Gopkg.lock index ef76419baae11..0f29ab59abb62 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -21,6 +21,18 @@ revision = "2ce144541b8903101fb8f1483cc0497a68798122" version = "v0.3.0" +[[projects]] + name = "github.com/Azure/go-autorest" + packages = [ + "autorest", + "autorest/adal", + "autorest/azure", + "autorest/azure/auth", + "autorest/date" + ] + revision = "1f7cd6cfe0adea687ad44a512dfe76140f804318" + version = "v10.12.0" + [[projects]] branch = "master" digest = "1:298712a3ee36b59c3ca91f4183bd75d174d5eaa8b4aed5072831f126e2e752f6" @@ -224,6 +236,12 @@ revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e" version = "v3.2.0" +[[projects]] + branch = "master" + name = "github.com/dimchansky/utfbom" + packages = ["."] + revision = "6c6132ff69f0f6c088739067407b5d32c52e1d0f" + [[projects]] digest = "1:522eff2a1f014a64fb403db60fc0110653e4dc5b59779894d208e697b0708ddc" name = "github.com/docker/distribution" @@ -975,7 +993,9 @@ "ed25519/internal/edwards25519", "md4", "pbkdf2", - "ssh/terminal", + "pkcs12", + "pkcs12/internal/rc2", + "ssh/terminal" ] pruneopts = "" revision = "a2144134853fc9a27a7b1e3eb4f19f1a76df13c9" diff --git a/Gopkg.toml b/Gopkg.toml index d282e1ebd7e6b..f942f340116b3 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -221,3 +221,7 @@ [[override]] source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz" name = "gopkg.in/fsnotify.v1" + +[[constraint]] + name = "github.com/Azure/go-autorest" + version = "10.12.0" diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 2d215984b69e3..36f0389941445 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -8,6 +8,7 @@ following works: - github.com/aerospike/aerospike-client-go [APACHE](https://github.com/aerospike/aerospike-client-go/blob/master/LICENSE) - github.com/amir/raidman [PUBLIC DOMAIN](https://github.com/amir/raidman/blob/master/UNLICENSE) - github.com/armon/go-metrics [MIT](https://github.com/armon/go-metrics/blob/master/LICENSE) +- github.com/Azure/go-autorest [APACHE](https://github.com/Azure/go-autorest/blob/master/LICENSE) - github.com/aws/aws-sdk-go [APACHE](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt) - github.com/beorn7/perks [MIT](https://github.com/beorn7/perks/blob/master/LICENSE) - github.com/boltdb/bolt [MIT](https://github.com/boltdb/bolt/blob/master/LICENSE) @@ -19,6 +20,7 @@ following works: - github.com/couchbase/goutils [MIT](https://github.com/couchbase/go-couchbase/blob/master/LICENSE) - github.com/dancannon/gorethink [APACHE](https://github.com/dancannon/gorethink/blob/master/LICENSE) - github.com/davecgh/go-spew [ISC](https://github.com/davecgh/go-spew/blob/master/LICENSE) +- github.com/dimchansky/utfbom [APACHE](https://github.com/dimchansky/utfbom/blob/master/LICENSE) - github.com/docker/docker [APACHE](https://github.com/docker/docker/blob/master/LICENSE) - github.com/docker/cli [APACHE](https://github.com/docker/cli/blob/master/LICENSE) - github.com/eapache/go-resiliency [MIT](https://github.com/eapache/go-resiliency/blob/master/LICENSE) diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 25576d745c36b..c926917d60b13 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -114,6 +114,13 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { m, _ = metric.New(name, tags, fields, t, tp) } + if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { + ro.Lock() + defer ro.Unlock() + output.Add(m) + return + } + ro.metrics.Add(m) if ro.metrics.Len() == ro.MetricBatchSize { batch := ro.metrics.Batch(ro.MetricBatchSize) @@ -127,6 +134,12 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { + if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { + metrics := output.Push() + ro.metrics.Add(metrics...) + output.Reset() + } + nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len() ro.BufferSize.Set(int64(nFails + nMetrics)) log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ", diff --git a/output.go b/output.go index d66ea4556e4f5..39b371ac471a4 100644 --- a/output.go +++ b/output.go @@ -13,6 +13,12 @@ type Output interface { Write(metrics []Metric) error } +type AggregatingOutput interface { + Add(in Metric) + Push() []Metric + Reset() +} + type ServiceOutput interface { // Connect to the Output Connect() error diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 037807c22575c..4d49c0c6e1f27 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/amon" _ "github.com/influxdata/telegraf/plugins/outputs/amqp" _ "github.com/influxdata/telegraf/plugins/outputs/application_insights" + _ "github.com/influxdata/telegraf/plugins/outputs/azure_monitor" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/outputs/cratedb" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" diff --git a/plugins/outputs/azure_monitor/README.md b/plugins/outputs/azure_monitor/README.md new file mode 100644 index 0000000000000..cd13861369779 --- /dev/null +++ b/plugins/outputs/azure_monitor/README.md @@ -0,0 +1,139 @@ +# Azure Monitor + +This plugin will send custom metrics to Azure Monitor. Azure Monitor has a +metric resolution of one minute. To handle this in Telegraf, the Azure Monitor +output plugin will automatically aggregates metrics into one minute buckets, +which are then sent to Azure Monitor on every flush interval. + +The metrics from each input plugin will be written to a separate Azure Monitor +namespace, prefixed with `Telegraf/` by default. The field name for each +metric is written as the Azure Monitor metric name. All field values are +written as a summarized set that includes: min, max, sum, count. Tags are +written as a dimension on each Azure Monitor metric. + +Since Azure Monitor only accepts numeric values, string-typed fields are +dropped by default. There is a configuration option (`strings_as_dimensions`) +to retain fields that contain strings as extra dimensions. Azure Monitor +allows a maximum of 10 dimensions per metric so any dimensions over that +amount will be deterministically dropped. + +### Configuration: + +```toml +[[outputs.azure_monitor]] + ## Timeout for HTTP writes. + # timeout = "20s" + + ## Set the namespace prefix, defaults to "Telegraf/". + # namespace_prefix = "Telegraf/" + + ## Azure Monitor doesn't have a string value type, so convert string + ## fields to dimensions (a.k.a. tags) if enabled. Azure Monitor allows + ## a maximum of 10 dimensions so Telegraf will only send the first 10 + ## alphanumeric dimensions. + # strings_as_dimensions = false + + ## Both region and resource_id must be set or be available via the + ## Instance Metadata service on Azure Virtual Machines. + # + ## Azure Region to publish metrics against. + ## ex: region = "southcentralus" + # region = "" + # + ## The Azure Resource ID against which metric will be logged, e.g. + ## ex: resource_id = "/subscriptions//resourceGroups//providers/Microsoft.Compute/virtualMachines/" + # resource_id = "" +``` + +### Setup + +1. [Register the `microsoft.insights` resource provider in your Azure subscription][resource provider]. +2. If using Managed Service Identities to authenticate an Azure VM, + [enable system-assigned managed identity][enable msi]. +2. Use a region that supports Azure Monitor Custom Metrics, + For regions with Custom Metrics support, an endpoint will be available with + the format `https://.monitoring.azure.com`. The following regions + are currently known to be supported: + - East US (eastus) + - West US 2 (westus2) + - South Central US (southcentralus) + - West Central US (westcentralus) + - North Europe (northeurope) + - West Europe (westeurope) + - Southeast Asia (southeastasia) + +[resource provider]: https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-supported-services +[enable msi]: https://docs.microsoft.com/en-us/azure/active-directory/managed-service-identity/qs-configure-portal-windows-vm + +### Region and Resource ID + +The plugin will attempt to discover the region and resource ID using the Azure +VM Instance Metadata service. If Telegraf is not running on a virtual machine +or the VM Instance Metadata service is not available, the following variables +are required for the output to function. + +* region +* resource_id + +### Authentication + +This plugin uses one of several different types of authenticate methods. The +preferred authentication methods are different from the *order* in which each +authentication is checked. Here are the preferred authentication methods: + +1. Managed Service Identity (MSI) token + - This is the prefered authentication method. Telegraf will automatically + authenticate using this method when running on Azure VMs. +2. AAD Application Tokens (Service Principals) + - Primarily useful if Telegraf is writing metrics for other resources. + [More information][principal]. + - A Service Principal or User Principal needs to be assigned the `Monitoring + Contributor` roles. +3. AAD User Tokens (User Principals) + - Allows Telegraf to authenticate like a user. It is best to use this method + for development. + +[principal]: https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-application-objects + +The plugin will authenticate using the first available of the +following configurations: + +1. **Client Credentials**: Azure AD Application ID and Secret. + + Set the following Telegraf configuration variables: + + - `azure_tenant_id`: Specifies the Tenant to which to authenticate. + - `azure_client_id`: Specifies the app client ID to use. + - `azure_client_secret`: Specifies the app secret to use. + + Or set the following environment variables: + + - `AZURE_TENANT_ID`: Specifies the Tenant to which to authenticate. + - `AZURE_CLIENT_ID`: Specifies the app client ID to use. + - `AZURE_CLIENT_SECRET`: Specifies the app secret to use. + +2. **Client Certificate**: Azure AD Application ID and X.509 Certificate. + + - `AZURE_TENANT_ID`: Specifies the Tenant to which to authenticate. + - `AZURE_CLIENT_ID`: Specifies the app client ID to use. + - `AZURE_CERTIFICATE_PATH`: Specifies the certificate Path to use. + - `AZURE_CERTIFICATE_PASSWORD`: Specifies the certificate password to use. + +3. **Resource Owner Password**: Azure AD User and Password. This grant type is + *not recommended*, use device login instead if you need interactive login. + + - `AZURE_TENANT_ID`: Specifies the Tenant to which to authenticate. + - `AZURE_CLIENT_ID`: Specifies the app client ID to use. + - `AZURE_USERNAME`: Specifies the username to use. + - `AZURE_PASSWORD`: Specifies the password to use. + +4. **Azure Managed Service Identity**: Delegate credential management to the + platform. Requires that code is running in Azure, e.g. on a VM. All + configuration is handled by Azure. See [Azure Managed Service Identity][msi] + for more details. Only available when using the [Azure Resource Manager][arm]. + +[msi]: https://docs.microsoft.com/en-us/azure/active-directory/msi-overview +[arm]: https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-overview + +**Note: As shown above, the last option (#4) is the preferred way to +authenticate when running Telegraf on Azure VMs. diff --git a/plugins/outputs/azure_monitor/azure_monitor.go b/plugins/outputs/azure_monitor/azure_monitor.go new file mode 100644 index 0000000000000..afc3a20edd945 --- /dev/null +++ b/plugins/outputs/azure_monitor/azure_monitor.go @@ -0,0 +1,615 @@ +package azure_monitor + +import ( + "bytes" + "compress/gzip" + "encoding/binary" + "encoding/json" + "fmt" + "hash/fnv" + "io/ioutil" + "log" + "net/http" + "regexp" + "strings" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/Azure/go-autorest/autorest/azure/auth" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/selfstat" +) + +// AzureMonitor allows publishing of metrics to the Azure Monitor custom metrics +// service +type AzureMonitor struct { + Timeout internal.Duration + NamespacePrefix string `toml:"namespace_prefix"` + StringsAsDimensions bool `toml:"strings_as_dimensions"` + Region string + ResourceID string `toml:"resource_id"` + + url string + auth autorest.Authorizer + client *http.Client + + cache map[time.Time]map[uint64]*aggregate + timeFunc func() time.Time + + MetricOutsideWindow selfstat.Stat +} + +type dimension struct { + name string + value string +} + +type aggregate struct { + name string + min float64 + max float64 + sum float64 + count int64 + dimensions []dimension + updated bool +} + +const ( + defaultRequestTimeout = time.Second * 5 + defaultNamespacePrefix = "Telegraf/" + defaultAuthResource = "https://monitoring.azure.com/" + + vmInstanceMetadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-12-01" + resourceIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s" + urlTemplate = "https://%s.monitoring.azure.com%s/metrics" + maxRequestBodySize = 4000000 +) + +var sampleConfig = ` + ## Timeout for HTTP writes. + # timeout = "20s" + + ## Set the namespace prefix, defaults to "Telegraf/". + # namespace_prefix = "Telegraf/" + + ## Azure Monitor doesn't have a string value type, so convert string + ## fields to dimensions (a.k.a. tags) if enabled. Azure Monitor allows + ## a maximum of 10 dimensions so Telegraf will only send the first 10 + ## alphanumeric dimensions. + # strings_as_dimensions = false + + ## Both region and resource_id must be set or be available via the + ## Instance Metadata service on Azure Virtual Machines. + # + ## Azure Region to publish metrics against. + ## ex: region = "southcentralus" + # region = "" + # + ## The Azure Resource ID against which metric will be logged, e.g. + ## ex: resource_id = "/subscriptions//resourceGroups//providers/Microsoft.Compute/virtualMachines/" + # resource_id = "" +` + +// Description provides a description of the plugin +func (a *AzureMonitor) Description() string { + return "Send aggregate metrics to Azure Monitor" +} + +// SampleConfig provides a sample configuration for the plugin +func (a *AzureMonitor) SampleConfig() string { + return sampleConfig +} + +// Connect initializes the plugin and validates connectivity +func (a *AzureMonitor) Connect() error { + a.cache = make(map[time.Time]map[uint64]*aggregate, 36) + + if a.Timeout.Duration == 0 { + a.Timeout.Duration = defaultRequestTimeout + } + + a.client = &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + }, + Timeout: a.Timeout.Duration, + } + + if a.NamespacePrefix == "" { + a.NamespacePrefix = defaultNamespacePrefix + } + + var err error + var region string + var resourceID string + if a.Region == "" || a.ResourceID == "" { + // Pull region and resource identifier + region, resourceID, err = vmInstanceMetadata(a.client) + if err != nil { + return err + } + } + if a.Region != "" { + region = a.Region + } + if a.ResourceID != "" { + resourceID = a.ResourceID + } + + if resourceID == "" { + return fmt.Errorf("no resource ID configured or available via VM instance metadata") + } else if region == "" { + return fmt.Errorf("no region configured or available via VM instance metadata") + } + a.url = fmt.Sprintf(urlTemplate, region, resourceID) + + log.Printf("D! Writing to Azure Monitor URL: %s", a.url) + + a.auth, err = auth.NewAuthorizerFromEnvironmentWithResource(defaultAuthResource) + if err != nil { + return nil + } + + a.Reset() + + tags := map[string]string{ + "region": region, + "resource_id": resourceID, + } + a.MetricOutsideWindow = selfstat.Register("azure_monitor", "metric_outside_window", tags) + + return nil +} + +// vmMetadata retrieves metadata about the current Azure VM +func vmInstanceMetadata(c *http.Client) (string, string, error) { + req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil) + if err != nil { + return "", "", fmt.Errorf("error creating request: %v", err) + } + req.Header.Set("Metadata", "true") + + resp, err := c.Do(req) + if err != nil { + return "", "", err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", "", err + } + if resp.StatusCode >= 300 || resp.StatusCode < 200 { + return "", "", fmt.Errorf("unable to fetch instance metadata: [%v] %s", resp.StatusCode, body) + } + + // VirtualMachineMetadata contains information about a VM from the metadata service + type VirtualMachineMetadata struct { + Compute struct { + Location string `json:"location"` + Name string `json:"name"` + ResourceGroupName string `json:"resourceGroupName"` + SubscriptionID string `json:"subscriptionId"` + } `json:"compute"` + } + + var metadata VirtualMachineMetadata + if err := json.Unmarshal(body, &metadata); err != nil { + return "", "", err + } + + region := metadata.Compute.Location + resourceID := fmt.Sprintf( + resourceIDTemplate, + metadata.Compute.SubscriptionID, + metadata.Compute.ResourceGroupName, + metadata.Compute.Name, + ) + + return region, resourceID, nil +} + +// Close shuts down an any active connections +func (a *AzureMonitor) Close() error { + a.client = nil + return nil +} + +type azureMonitorMetric struct { + Time time.Time `json:"time"` + Data *azureMonitorData `json:"data"` +} + +type azureMonitorData struct { + BaseData *azureMonitorBaseData `json:"baseData"` +} + +type azureMonitorBaseData struct { + Metric string `json:"metric"` + Namespace string `json:"namespace"` + DimensionNames []string `json:"dimNames"` + Series []*azureMonitorSeries `json:"series"` +} + +type azureMonitorSeries struct { + DimensionValues []string `json:"dimValues"` + Min float64 `json:"min"` + Max float64 `json:"max"` + Sum float64 `json:"sum"` + Count int64 `json:"count"` +} + +// Write writes metrics to the remote endpoint +func (a *AzureMonitor) Write(metrics []telegraf.Metric) error { + azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics)) + for _, m := range metrics { + id := hashIDWithTagKeysOnly(m) + if azm, ok := azmetrics[id]; !ok { + amm, err := translate(m, a.NamespacePrefix) + if err != nil { + log.Printf("E! [outputs.azure_monitor]: could not create azure metric for %q; discarding point", m.Name()) + continue + } + azmetrics[id] = amm + } else { + amm, err := translate(m, a.NamespacePrefix) + if err != nil { + log.Printf("E! [outputs.azure_monitor]: could not create azure metric for %q; discarding point", m.Name()) + continue + } + + azmetrics[id].Data.BaseData.Series = append( + azm.Data.BaseData.Series, + amm.Data.BaseData.Series..., + ) + } + } + + if len(azmetrics) == 0 { + return nil + } + + var body []byte + for _, m := range azmetrics { + // Azure Monitor accepts new batches of points in new-line delimited + // JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec). + jsonBytes, err := json.Marshal(&m) + if err != nil { + return err + } + // Azure Monitor's maximum request body size of 4MB. Send batches that + // exceed this size via separate write requests. + if (len(body) + len(jsonBytes) + 1) > maxRequestBodySize { + err := a.send(body) + if err != nil { + return err + } + body = nil + } + body = append(body, jsonBytes...) + body = append(body, '\n') + } + + return a.send(body) +} + +func (a *AzureMonitor) send(body []byte) error { + var buf bytes.Buffer + g := gzip.NewWriter(&buf) + if _, err := g.Write(body); err != nil { + return err + } + if err := g.Close(); err != nil { + return err + } + + req, err := http.NewRequest("POST", a.url, &buf) + if err != nil { + return err + } + + req.Header.Set("Content-Encoding", "gzip") + req.Header.Set("Content-Type", "application/x-ndjson") + + // Add the authorization header. WithAuthorization will automatically + // refresh the token if needed. + req, err = autorest.CreatePreparer(a.auth.WithAuthorization()).Prepare(req) + if err != nil { + return fmt.Errorf("unable to fetch authentication credentials: %v", err) + } + + resp, err := a.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + _, err = ioutil.ReadAll(resp.Body) + if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 { + return fmt.Errorf("failed to write batch: [%v] %s", resp.StatusCode, resp.Status) + } + + return nil +} + +func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 { + h := fnv.New64a() + h.Write([]byte(m.Name())) + h.Write([]byte("\n")) + for _, tag := range m.TagList() { + h.Write([]byte(tag.Key)) + h.Write([]byte("\n")) + } + b := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(b, uint64(m.Time().UnixNano())) + h.Write(b[:n]) + h.Write([]byte("\n")) + return h.Sum64() +} + +func translate(m telegraf.Metric, prefix string) (*azureMonitorMetric, error) { + var dimensionNames []string + var dimensionValues []string + for _, tag := range m.TagList() { + // Azure custom metrics service supports up to 10 dimensions + if len(dimensionNames) > 10 { + continue + } + + if tag.Key == "" || tag.Value == "" { + continue + } + + dimensionNames = append(dimensionNames, tag.Key) + dimensionValues = append(dimensionValues, tag.Value) + } + + min, err := getFloatField(m, "min") + if err != nil { + return nil, err + } + max, err := getFloatField(m, "max") + if err != nil { + return nil, err + } + sum, err := getFloatField(m, "sum") + if err != nil { + return nil, err + } + count, err := getIntField(m, "count") + if err != nil { + return nil, err + } + + mn, ns := "Missing", "Missing" + names := strings.SplitN(m.Name(), "-", 2) + if len(names) > 1 { + mn = names[1] + } + if len(names) > 0 { + ns = names[0] + } + ns = prefix + ns + + return &azureMonitorMetric{ + Time: m.Time(), + Data: &azureMonitorData{ + BaseData: &azureMonitorBaseData{ + Metric: mn, + Namespace: ns, + DimensionNames: dimensionNames, + Series: []*azureMonitorSeries{ + &azureMonitorSeries{ + DimensionValues: dimensionValues, + Min: min, + Max: max, + Sum: sum, + Count: count, + }, + }, + }, + }, + }, nil +} + +func getFloatField(m telegraf.Metric, key string) (float64, error) { + fv, ok := m.GetField(key) + if !ok { + return 0, fmt.Errorf("missing field: %s", key) + } + + if value, ok := fv.(float64); ok { + return value, nil + } + return 0, fmt.Errorf("unexpected type: %s: %T", key, fv) +} + +func getIntField(m telegraf.Metric, key string) (int64, error) { + fv, ok := m.GetField(key) + if !ok { + return 0, fmt.Errorf("missing field: %s", key) + } + + if value, ok := fv.(int64); ok { + return value, nil + } + return 0, fmt.Errorf("unexpected type: %s: %T", key, fv) +} + +// Add will append a metric to the output aggregate +func (a *AzureMonitor) Add(m telegraf.Metric) { + // Azure Monitor only supports aggregates 30 minutes into the past and 4 + // minutes into the future. Future metrics are dropped when pushed. + t := m.Time() + tbucket := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location()) + if tbucket.Before(a.timeFunc().Add(-time.Minute * 30)) { + a.MetricOutsideWindow.Incr(1) + return + } + + // Azure Monitor doesn't have a string value type, so convert string fields + // to dimensions (a.k.a. tags) if enabled. + if a.StringsAsDimensions { + for _, f := range m.FieldList() { + if v, ok := f.Value.(string); ok { + m.AddTag(f.Key, v) + } + } + } + + for _, f := range m.FieldList() { + fv, ok := convert(f.Value) + if !ok { + continue + } + + // Azure Monitor does not support fields so the field name is appended + // to the metric name. + name := m.Name() + "-" + sanitize(f.Key) + id := hashIDWithField(m.HashID(), f.Key) + + _, ok = a.cache[tbucket] + if !ok { + // Time bucket does not exist and needs to be created. + a.cache[tbucket] = make(map[uint64]*aggregate) + } + + // Fetch existing aggregate + var agg *aggregate + agg, ok = a.cache[tbucket][id] + if !ok { + agg := &aggregate{ + name: name, + min: fv, + max: fv, + sum: fv, + count: 1, + } + for _, tag := range m.TagList() { + dim := dimension{ + name: tag.Key, + value: tag.Value, + } + agg.dimensions = append(agg.dimensions, dim) + } + agg.updated = true + a.cache[tbucket][id] = agg + continue + } + + if fv < agg.min { + agg.min = fv + } + if fv > agg.max { + agg.max = fv + } + agg.sum += fv + agg.count++ + agg.updated = true + } +} + +func convert(in interface{}) (float64, bool) { + switch v := in.(type) { + case int64: + return float64(v), true + case uint64: + return float64(v), true + case float64: + return v, true + case bool: + if v { + return 1, true + } + return 0, true + default: + return 0, false + } +} + +var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) + +func sanitize(value string) string { + return invalidNameCharRE.ReplaceAllString(value, "_") +} + +func hashIDWithField(id uint64, fk string) uint64 { + h := fnv.New64a() + b := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(b, id) + h.Write(b[:n]) + h.Write([]byte("\n")) + h.Write([]byte(fk)) + h.Write([]byte("\n")) + return h.Sum64() +} + +// Push sends metrics to the output metric buffer +func (a *AzureMonitor) Push() []telegraf.Metric { + var metrics []telegraf.Metric + for tbucket, aggs := range a.cache { + // Do not send metrics early + if tbucket.After(a.timeFunc().Add(-time.Minute)) { + continue + } + for _, agg := range aggs { + // Only send aggregates that have had an update since the last push. + if !agg.updated { + continue + } + + tags := make(map[string]string, len(agg.dimensions)) + for _, tag := range agg.dimensions { + tags[tag.name] = tag.value + } + + m, err := metric.New(agg.name, + tags, + map[string]interface{}{ + "min": agg.min, + "max": agg.max, + "sum": agg.sum, + "count": agg.count, + }, + tbucket, + ) + + if err != nil { + log.Printf("E! [outputs.azure_monitor]: could not create metric for aggregation %q; discarding point", agg.name) + } + + metrics = append(metrics, m) + } + } + return metrics +} + +// Reset clears the cache of aggregate metrics +func (a *AzureMonitor) Reset() { + for tbucket := range a.cache { + // Remove aggregates older than 30 minutes + if tbucket.Before(a.timeFunc().Add(-time.Minute * 30)) { + delete(a.cache, tbucket) + continue + } + // Metrics updated within the latest 1m have not been pushed and should + // not be cleared. + if tbucket.After(a.timeFunc().Add(-time.Minute)) { + continue + } + for id := range a.cache[tbucket] { + a.cache[tbucket][id].updated = false + } + } +} + +func init() { + outputs.Add("azure_monitor", func() telegraf.Output { + return &AzureMonitor{ + timeFunc: time.Now, + } + }) +} diff --git a/plugins/outputs/azure_monitor/azure_monitor_test.go b/plugins/outputs/azure_monitor/azure_monitor_test.go new file mode 100644 index 0000000000000..6fb40805ecd3e --- /dev/null +++ b/plugins/outputs/azure_monitor/azure_monitor_test.go @@ -0,0 +1,361 @@ +package azure_monitor + +import ( + "bufio" + "compress/gzip" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestAggregate(t *testing.T) { + tests := []struct { + name string + plugin *AzureMonitor + metrics []telegraf.Metric + addTime time.Time + pushTime time.Time + check func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric) + }{ + { + name: "add metric outside window is dropped", + plugin: &AzureMonitor{ + Region: "test", + ResourceID: "/test", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + }, + addTime: time.Unix(3600, 0), + pushTime: time.Unix(3600, 0), + check: func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric) { + require.Equal(t, int64(1), plugin.MetricOutsideWindow.Get()) + require.Len(t, metrics, 0) + }, + }, + { + name: "metric not sent until period expires", + plugin: &AzureMonitor{ + Region: "test", + ResourceID: "/test", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + }, + addTime: time.Unix(0, 0), + pushTime: time.Unix(0, 0), + check: func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric) { + require.Len(t, metrics, 0) + }, + }, + { + name: "add strings as dimensions", + plugin: &AzureMonitor{ + Region: "test", + ResourceID: "/test", + StringsAsDimensions: true, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "host": "localhost", + }, + map[string]interface{}{ + "value": 42, + "message": "howdy", + }, + time.Unix(0, 0), + ), + }, + addTime: time.Unix(0, 0), + pushTime: time.Unix(3600, 0), + check: func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric) { + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu-value", + map[string]string{ + "host": "localhost", + "message": "howdy", + }, + map[string]interface{}{ + "min": 42.0, + "max": 42.0, + "sum": 42.0, + "count": 1, + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, metrics) + }, + }, + { + name: "add metric to cache and push", + plugin: &AzureMonitor{ + Region: "test", + ResourceID: "/test", + cache: make(map[time.Time]map[uint64]*aggregate, 36), + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + }, + addTime: time.Unix(0, 0), + pushTime: time.Unix(3600, 0), + check: func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric) { + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu-value", + map[string]string{}, + map[string]interface{}{ + "min": 42.0, + "max": 42.0, + "sum": 42.0, + "count": 1, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, metrics) + }, + }, + { + name: "added metric are aggregated", + plugin: &AzureMonitor{ + Region: "test", + ResourceID: "/test", + cache: make(map[time.Time]map[uint64]*aggregate, 36), + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 84, + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 2, + }, + time.Unix(0, 0), + ), + }, + addTime: time.Unix(0, 0), + pushTime: time.Unix(3600, 0), + check: func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric) { + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu-value", + map[string]string{}, + map[string]interface{}{ + "min": 2.0, + "max": 84.0, + "sum": 128.0, + "count": 3, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, metrics) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.plugin.Connect() + require.NoError(t, err) + + // Reset globals + tt.plugin.MetricOutsideWindow.Set(0) + + tt.plugin.timeFunc = func() time.Time { return tt.addTime } + for _, m := range tt.metrics { + tt.plugin.Add(m) + } + + tt.plugin.timeFunc = func() time.Time { return tt.pushTime } + metrics := tt.plugin.Push() + tt.plugin.Reset() + + tt.check(t, tt.plugin, metrics) + }) + } +} + +func TestWrite(t *testing.T) { + readBody := func(r *http.Request) ([]*azureMonitorMetric, error) { + gz, err := gzip.NewReader(r.Body) + if err != nil { + return nil, err + } + scanner := bufio.NewScanner(gz) + + azmetrics := make([]*azureMonitorMetric, 0) + for scanner.Scan() { + line := scanner.Text() + var amm azureMonitorMetric + err = json.Unmarshal([]byte(line), &amm) + if err != nil { + return nil, err + } + azmetrics = append(azmetrics, &amm) + } + + return azmetrics, nil + } + + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + url := "http://" + ts.Listener.Addr().String() + "/metrics" + + tests := []struct { + name string + plugin *AzureMonitor + metrics []telegraf.Metric + handler func(t *testing.T, w http.ResponseWriter, r *http.Request) + }{ + { + name: "if not an azure metric nothing is sent", + plugin: &AzureMonitor{ + Region: "test", + ResourceID: "/test", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + t.Fatal("should not call") + }, + }, + { + name: "single azure metric", + plugin: &AzureMonitor{ + Region: "test", + ResourceID: "/test", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu-value", + map[string]string{}, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + time.Unix(0, 0), + ), + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + azmetrics, err := readBody(r) + require.NoError(t, err) + require.Len(t, azmetrics, 1) + w.WriteHeader(http.StatusOK) + }, + }, + { + name: "multiple azure metric", + plugin: &AzureMonitor{ + Region: "test", + ResourceID: "/test", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu-value", + map[string]string{}, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "cpu-value", + map[string]string{}, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + time.Unix(60, 0), + ), + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + azmetrics, err := readBody(r) + require.NoError(t, err) + require.Len(t, azmetrics, 2) + w.WriteHeader(http.StatusOK) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tt.handler(t, w, r) + }) + + err := tt.plugin.Connect() + require.NoError(t, err) + + // override real authorizer and write url + tt.plugin.auth = autorest.NullAuthorizer{} + tt.plugin.url = url + + err = tt.plugin.Write(tt.metrics) + require.NoError(t, err) + }) + } +} diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index c9096468467dc..dfacd8c8fae0d 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -7,7 +7,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric { @@ -25,28 +24,28 @@ func TestParse(t *testing.T) { measurement string now func() time.Time bytes []byte - want []testutil.Metric + want []telegraf.Metric wantErr bool }{ { name: "no bytes returns no metrics", now: func() time.Time { return time.Unix(0, 0) }, - want: []testutil.Metric{}, + want: []telegraf.Metric{}, }, { name: "test without trailing end", bytes: []byte("foo=\"bar\""), now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", - want: []testutil.Metric{ - testutil.Metric{ - Measurement: "testlog", - Tags: map[string]string{}, - Fields: map[string]interface{}{ + want: []telegraf.Metric{ + testutil.MustMetric( + "testlog", + map[string]string{}, + map[string]interface{}{ "foo": "bar", }, - Time: time.Unix(0, 0), - }, + time.Unix(0, 0), + ), }, }, { @@ -54,15 +53,15 @@ func TestParse(t *testing.T) { bytes: []byte("foo=\"bar\"\n"), now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", - want: []testutil.Metric{ - testutil.Metric{ - Measurement: "testlog", - Tags: map[string]string{}, - Fields: map[string]interface{}{ + want: []telegraf.Metric{ + testutil.MustMetric( + "testlog", + map[string]string{}, + map[string]interface{}{ "foo": "bar", }, - Time: time.Unix(0, 0), - }, + time.Unix(0, 0), + ), }, }, { @@ -70,18 +69,18 @@ func TestParse(t *testing.T) { bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", - want: []testutil.Metric{ - testutil.Metric{ - Measurement: "testlog", - Tags: map[string]string{}, - Fields: map[string]interface{}{ + want: []telegraf.Metric{ + testutil.MustMetric( + "testlog", + map[string]string{}, + map[string]interface{}{ "lvl": "info", "msg": "http request", "method": "POST", "ts": "2018-07-24T19:43:40.275Z", }, - Time: time.Unix(0, 0), - }, + time.Unix(0, 0), + ), }, }, { @@ -89,42 +88,42 @@ func TestParse(t *testing.T) { bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", - want: []testutil.Metric{ - testutil.Metric{ - Measurement: "testlog", - Tags: map[string]string{}, - Fields: map[string]interface{}{ + want: []telegraf.Metric{ + testutil.MustMetric( + "testlog", + map[string]string{}, + map[string]interface{}{ "lvl": "info", "msg": "http request", "method": "POST", "ts": "2018-07-24T19:43:40.275Z", }, - Time: time.Unix(0, 0), - }, - testutil.Metric{ - Measurement: "testlog", - Tags: map[string]string{}, - Fields: map[string]interface{}{ + time.Unix(0, 0), + ), + testutil.MustMetric( + "testlog", + map[string]string{}, + map[string]interface{}{ "parent_id": "088876RL000", "duration": 7.45, "log_id": "09R4e4Rl000", }, - Time: time.Unix(0, 0), - }, + time.Unix(0, 0), + ), }, }, { name: "keys without = or values are ignored", now: func() time.Time { return time.Unix(0, 0) }, bytes: []byte(`i am no data.`), - want: []testutil.Metric{}, + want: []telegraf.Metric{}, wantErr: false, }, { name: "keys without values are ignored", now: func() time.Time { return time.Unix(0, 0) }, bytes: []byte(`foo="" bar=`), - want: []testutil.Metric{}, + want: []telegraf.Metric{}, wantErr: false, }, { @@ -132,7 +131,7 @@ func TestParse(t *testing.T) { now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", bytes: []byte(`bar=baz foo="bar`), - want: []testutil.Metric{}, + want: []telegraf.Metric{}, wantErr: true, }, { @@ -140,7 +139,7 @@ func TestParse(t *testing.T) { now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", bytes: []byte(`"foo=" bar=baz`), - want: []testutil.Metric{}, + want: []telegraf.Metric{}, wantErr: true, }, } @@ -155,10 +154,8 @@ func TestParse(t *testing.T) { t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) return } - require.Equal(t, len(tt.want), len(got)) - for i, m := range got { - testutil.MustEqual(t, m, tt.want[i]) - } + + testutil.RequireMetricsEqual(t, tt.want, got) }) } } @@ -169,13 +166,13 @@ func TestParseLine(t *testing.T) { s string measurement string now func() time.Time - want testutil.Metric + want telegraf.Metric wantErr bool }{ { name: "No Metric In line", now: func() time.Time { return time.Unix(0, 0) }, - want: testutil.Metric{}, + want: nil, wantErr: true, }, { @@ -183,34 +180,34 @@ func TestParseLine(t *testing.T) { now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`, - want: testutil.Metric{ - Measurement: "testlog", - Fields: map[string]interface{}{ + want: testutil.MustMetric( + "testlog", + map[string]string{}, + map[string]interface{}{ "ts": "2018-07-24T19:43:35.207268Z", "lvl": int64(5), "msg": "Write failed", "log_id": "09R4e4Rl000", }, - Tags: map[string]string{}, - Time: time.Unix(0, 0), - }, + time.Unix(0, 0), + ), }, { name: "ParseLine only returns metrics from first string", now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000", - want: testutil.Metric{ - Measurement: "testlog", - Fields: map[string]interface{}{ + want: testutil.MustMetric( + "testlog", + map[string]string{}, + map[string]interface{}{ "ts": "2018-07-24T19:43:35.207268Z", "lvl": int64(5), "msg": "Write failed", "log_id": "09R4e4Rl000", }, - Tags: map[string]string{}, - Time: time.Unix(0, 0), - }, + time.Unix(0, 0), + ), }, } for _, tt := range tests { @@ -223,9 +220,7 @@ func TestParseLine(t *testing.T) { if (err != nil) != tt.wantErr { t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) } - if got != nil { - testutil.MustEqual(t, got, tt.want) - } + testutil.RequireMetricEqual(t, tt.want, got) }) } } diff --git a/testutil/metric.go b/testutil/metric.go index 9620fea15aa7a..56debd09399cf 100644 --- a/testutil/metric.go +++ b/testutil/metric.go @@ -1,16 +1,89 @@ package testutil import ( + "sort" "testing" + "time" + "github.com/google/go-cmp/cmp" "github.com/influxdata/telegraf" - "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/metric" ) -// MustEqual requires a and b to be identical. -func MustEqual(t *testing.T, got telegraf.Metric, want Metric) { - require.Equal(t, want.Measurement, got.Name()) - require.Equal(t, want.Fields, got.Fields()) - require.Equal(t, want.Tags, got.Tags()) - require.Equal(t, want.Time, got.Time()) +type metricDiff struct { + Measurement string + Tags []*telegraf.Tag + Fields []*telegraf.Field + Type telegraf.ValueType + Time time.Time +} + +func newMetricDiff(metric telegraf.Metric) *metricDiff { + m := &metricDiff{} + m.Measurement = metric.Name() + + for _, tag := range metric.TagList() { + m.Tags = append(m.Tags, tag) + } + sort.Slice(m.Tags, func(i, j int) bool { + return m.Tags[i].Key < m.Tags[j].Key + }) + + for _, field := range metric.FieldList() { + m.Fields = append(m.Fields, field) + } + sort.Slice(m.Fields, func(i, j int) bool { + return m.Fields[i].Key < m.Fields[j].Key + }) + + m.Type = metric.Type() + m.Time = metric.Time() + return m +} + +func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric) { + t.Helper() + + var lhs, rhs *metricDiff + if expected != nil { + lhs = newMetricDiff(expected) + } + if actual != nil { + rhs = newMetricDiff(actual) + } + + if diff := cmp.Diff(lhs, rhs); diff != "" { + t.Fatalf("telegraf.Metric\n--- expected\n+++ actual\n%s", diff) + } +} + +func RequireMetricsEqual(t *testing.T, expected, actual []telegraf.Metric) { + t.Helper() + + lhs := make([]*metricDiff, len(expected)) + for _, m := range expected { + lhs = append(lhs, newMetricDiff(m)) + } + rhs := make([]*metricDiff, len(actual)) + for _, m := range actual { + rhs = append(rhs, newMetricDiff(m)) + } + if diff := cmp.Diff(lhs, rhs); diff != "" { + t.Fatalf("[]telegraf.Metric\n--- expected\n+++ actual\n%s", diff) + } +} + +// Metric creates a new metric or panics on error. +func MustMetric( + name string, + tags map[string]string, + fields map[string]interface{}, + tm time.Time, + tp ...telegraf.ValueType, +) telegraf.Metric { + m, err := metric.New(name, tags, fields, tm, tp...) + if err != nil { + panic("MustMetric") + } + return m } diff --git a/testutil/metric_test.go b/testutil/metric_test.go index 7295227ce3c78..5b5ef01f470e7 100644 --- a/testutil/metric_test.go +++ b/testutil/metric_test.go @@ -8,13 +8,11 @@ import ( "github.com/influxdata/telegraf/metric" ) -func TestMustEqual(t *testing.T) { - type args struct { - } +func TestRequireMetricsEqual(t *testing.T) { tests := []struct { name string got telegraf.Metric - want Metric + want telegraf.Metric }{ { name: "telegraf and testutil metrics should be equal", @@ -34,24 +32,27 @@ func TestMustEqual(t *testing.T) { ) return m }(), - want: Metric{ - Measurement: "test", - Tags: map[string]string{ - "t1": "v1", - "t2": "v2", - }, - Fields: map[string]interface{}{ - "f1": int64(1), - "f2": 3.14, - "f3": "v3", - }, - Time: time.Unix(0, 0), - }, + want: func() telegraf.Metric { + m, _ := metric.New( + "test", + map[string]string{ + "t1": "v1", + "t2": "v2", + }, + map[string]interface{}{ + "f1": int64(1), + "f2": 3.14, + "f3": "v3", + }, + time.Unix(0, 0), + ) + return m + }(), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - MustEqual(t, tt.got, tt.want) + RequireMetricEqual(t, tt.want, tt.got) }) } }