diff --git a/README.md b/README.md index acaf9c1818cba..ca969132d5eb2 100644 --- a/README.md +++ b/README.md @@ -447,3 +447,4 @@ For documentation on the latest development code see the [documentation index][d * [warp10](./plugins/outputs/warp10) * [wavefront](./plugins/outputs/wavefront) * [sumologic](./plugins/outputs/sumologic) +* [yandex_cloud_monitoring](./plugins/outputs/yandex_cloud_monitoring) diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 9d89976dd6cca..a5f8438670093 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -41,4 +41,5 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/timestream" _ "github.com/influxdata/telegraf/plugins/outputs/warp10" _ "github.com/influxdata/telegraf/plugins/outputs/wavefront" + _ "github.com/influxdata/telegraf/plugins/outputs/yandex_cloud_monitoring" ) diff --git a/plugins/outputs/yandex_cloud_monitoring/README.md b/plugins/outputs/yandex_cloud_monitoring/README.md new file mode 100644 index 0000000000000..3bace22b4adb2 --- /dev/null +++ b/plugins/outputs/yandex_cloud_monitoring/README.md @@ -0,0 +1,26 @@ +# Yandex Cloud Monitoring + +This plugin will send custom metrics to Yandex Cloud Monitoring. +https://cloud.yandex.com/services/monitoring + +### Configuration: + +```toml +[[outputs.yandex_cloud_monitoring]] + ## Timeout for HTTP writes. + # timeout = "20s" + + ## Yandex.Cloud monitoring API endpoint. Normally should not be changed + # endpoint_url = "https://monitoring.api.cloud.yandex.net/monitoring/v2/data/write" + + ## All user metrics should be sent with "custom" service specified. Normally should not be changed + # service = "custom" +``` + +### Authentication + +This plugin currently support only YC.Compute metadata based authentication. + +When plugin is working inside a YC.Compute instance it will take IAM token and Folder ID from instance metadata. + +Other authentication methods will be added later. diff --git a/plugins/outputs/yandex_cloud_monitoring/yandex_cloud_monitoring.go b/plugins/outputs/yandex_cloud_monitoring/yandex_cloud_monitoring.go new file mode 100644 index 0000000000000..36fd4ab0bef9f --- /dev/null +++ b/plugins/outputs/yandex_cloud_monitoring/yandex_cloud_monitoring.go @@ -0,0 +1,259 @@ +package yandex_cloud_monitoring + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/selfstat" +) + +// YandexCloudMonitoring allows publishing of metrics to the Yandex Cloud Monitoring custom metrics +// service +type YandexCloudMonitoring struct { + Timeout internal.Duration `toml:"timeout"` + EndpointUrl string `toml:"endpoint_url"` + Service string `toml:"service"` + + Log telegraf.Logger + + MetadataTokenURL string + MetadataFolderURL string + FolderID string + IAMToken string + IamTokenExpirationTime time.Time + + client *http.Client + + timeFunc func() time.Time + + MetricOutsideWindow selfstat.Stat +} + +type yandexCloudMonitoringMessage struct { + TS string `json:"ts,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Metrics []yandexCloudMonitoringMetric `json:"metrics"` +} + +type yandexCloudMonitoringMetric struct { + Name string `json:"name"` + Labels map[string]string `json:"labels"` + MetricType string `json:"type,omitempty"` // DGAUGE|IGAUGE|COUNTER|RATE. Default: DGAUGE + TS string `json:"ts,omitempty"` + Value float64 `json:"value"` +} + +type MetadataIamToken struct { + AccessToken string `json:"access_token"` + ExpiresIn int64 `json:"expires_in"` + TokenType string `json:"token_type"` +} + +const ( + defaultRequestTimeout = time.Second * 20 + defaultEndpointUrl = "https://monitoring.api.cloud.yandex.net/monitoring/v2/data/write" + defaultMetadataTokenUrl = "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token" + defaultMetadataFolderUrl = "http://169.254.169.254/computeMetadata/v1/instance/attributes/folder-id" +) + +var sampleConfig = ` + ## Timeout for HTTP writes. + # timeout = "20s" + + ## Yandex.Cloud monitoring API endpoint. Normally should not be changed + # endpoint_url = "https://monitoring.api.cloud.yandex.net/monitoring/v2/data/write" + + ## All user metrics should be sent with "custom" service specified. Normally should not be changed + # service = "custom" +` + +// Description provides a description of the plugin +func (a *YandexCloudMonitoring) Description() string { + return "Send aggregated metrics to Yandex.Cloud Monitoring" +} + +// SampleConfig provides a sample configuration for the plugin +func (a *YandexCloudMonitoring) SampleConfig() string { + return sampleConfig +} + +// Connect initializes the plugin and validates connectivity +func (a *YandexCloudMonitoring) Connect() error { + if a.Timeout.Duration <= 0 { + a.Timeout.Duration = defaultRequestTimeout + } + if a.EndpointUrl == "" { + a.EndpointUrl = defaultEndpointUrl + } + if a.Service == "" { + a.Service = "custom" + } + if a.MetadataTokenURL == "" { + a.MetadataTokenURL = defaultMetadataTokenUrl + } + if a.MetadataFolderURL == "" { + a.MetadataFolderURL = defaultMetadataFolderUrl + } + + a.client = &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + }, + Timeout: a.Timeout.Duration, + } + + var err error + a.FolderID, err = a.getFolderIDFromMetadata() + if err != nil { + return err + } + + a.Log.Infof("Writing to Yandex.Cloud Monitoring URL: %s", a.EndpointUrl) + + tags := map[string]string{} + a.MetricOutsideWindow = selfstat.Register("yandex_cloud_monitoring", "metric_outside_window", tags) + + return nil +} + +// Close shuts down an any active connections +func (a *YandexCloudMonitoring) Close() error { + a.client = nil + return nil +} + +// Write writes metrics to the remote endpoint +func (a *YandexCloudMonitoring) Write(metrics []telegraf.Metric) error { + var yandexCloudMonitoringMetrics []yandexCloudMonitoringMetric + for _, m := range metrics { + for _, field := range m.FieldList() { + yandexCloudMonitoringMetrics = append( + yandexCloudMonitoringMetrics, + yandexCloudMonitoringMetric{ + Name: field.Key, + Labels: m.Tags(), + TS: fmt.Sprint(m.Time().Format(time.RFC3339)), + Value: field.Value.(float64), + }, + ) + } + } + + var body []byte + jsonBytes, err := json.Marshal( + yandexCloudMonitoringMessage{ + Metrics: yandexCloudMonitoringMetrics, + }, + ) + + if err != nil { + return err + } + body = append(body, jsonBytes...) + body = append(jsonBytes, '\n') + return a.send(body) +} + +func getResponseFromMetadata(c *http.Client, metadataUrl string) ([]byte, error) { + req, err := http.NewRequest("GET", metadataUrl, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %v", err) + } + req.Header.Set("Metadata-Flavor", "Google") + resp, err := c.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode >= 300 || resp.StatusCode < 200 { + return nil, fmt.Errorf("unable to fetch instance metadata: [%s] %d", + metadataUrl, resp.StatusCode) + } + return body, nil +} + +func (a *YandexCloudMonitoring) getFolderIDFromMetadata() (string, error) { + a.Log.Infof("getting folder ID in %s", a.MetadataFolderURL) + body, err := getResponseFromMetadata(a.client, a.MetadataFolderURL) + if err != nil { + return "", err + } + folderID := string(body) + if folderID == "" { + return "", fmt.Errorf("unable to fetch folder id from URL %s: %v", a.MetadataFolderURL, err) + } + return folderID, nil +} + +func (a *YandexCloudMonitoring) getIAMTokenFromMetadata() (string, int, error) { + a.Log.Debugf("getting new IAM token in %s", a.MetadataTokenURL) + body, err := getResponseFromMetadata(a.client, a.MetadataTokenURL) + if err != nil { + return "", 0, err + } + var metadata MetadataIamToken + if err := json.Unmarshal(body, &metadata); err != nil { + return "", 0, err + } + if metadata.AccessToken == "" || metadata.ExpiresIn == 0 { + return "", 0, fmt.Errorf("unable to fetch authentication credentials %s: %v", a.MetadataTokenURL, err) + } + return metadata.AccessToken, int(metadata.ExpiresIn), nil +} + +func (a *YandexCloudMonitoring) send(body []byte) error { + req, err := http.NewRequest("POST", a.EndpointUrl, bytes.NewBuffer(body)) + if err != nil { + return err + } + q := req.URL.Query() + q.Add("folderId", a.FolderID) + q.Add("service", a.Service) + req.URL.RawQuery = q.Encode() + + req.Header.Set("Content-Type", "application/json") + isTokenExpired := !a.IamTokenExpirationTime.After(time.Now()) + if a.IAMToken == "" || isTokenExpired { + token, expiresIn, err := a.getIAMTokenFromMetadata() + if err != nil { + return err + } + a.IamTokenExpirationTime = time.Now().Add(time.Duration(expiresIn) * time.Second) + a.IAMToken = token + } + req.Header.Set("Authorization", "Bearer "+a.IAMToken) + + a.Log.Debugf("sending metrics to %s", req.URL.String()) + resp, err := a.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + _, err = ioutil.ReadAll(resp.Body) + if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 { + return fmt.Errorf("failed to write batch: [%v] %s", resp.StatusCode, resp.Status) + } + + return nil +} + +func init() { + outputs.Add("yandex_cloud_monitoring", func() telegraf.Output { + return &YandexCloudMonitoring{ + timeFunc: time.Now, + } + }) +} diff --git a/plugins/outputs/yandex_cloud_monitoring/yandex_cloud_monitoring_test.go b/plugins/outputs/yandex_cloud_monitoring/yandex_cloud_monitoring_test.go new file mode 100644 index 0000000000000..edd2960bf0cff --- /dev/null +++ b/plugins/outputs/yandex_cloud_monitoring/yandex_cloud_monitoring_test.go @@ -0,0 +1,96 @@ +package yandex_cloud_monitoring + +import ( + "encoding/json" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +func TestWrite(t *testing.T) { + readBody := func(r *http.Request) (yandexCloudMonitoringMessage, error) { + decoder := json.NewDecoder(r.Body) + var message yandexCloudMonitoringMessage + err := decoder.Decode(&message) + require.NoError(t, err) + return message, nil + } + + testMetadataHttpServer := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/token") { + token := MetadataIamToken{ + AccessToken: "token1", + ExpiresIn: 123, + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + err := json.NewEncoder(w).Encode(token) + require.NoError(t, err) + } else if strings.HasSuffix(r.URL.Path, "/folder") { + _, err := io.WriteString(w, "folder1") + require.NoError(t, err) + } + w.WriteHeader(http.StatusOK) + }), + ) + defer testMetadataHttpServer.Close() + metadataTokenUrl := "http://" + testMetadataHttpServer.Listener.Addr().String() + "/token" + metadataFolderUrl := "http://" + testMetadataHttpServer.Listener.Addr().String() + "/folder" + + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + url := "http://" + ts.Listener.Addr().String() + "/metrics" + + tests := []struct { + name string + plugin *YandexCloudMonitoring + metrics []telegraf.Metric + handler func(t *testing.T, w http.ResponseWriter, r *http.Request) + }{ + { + name: "metric is converted to json value", + plugin: &YandexCloudMonitoring{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cluster", + map[string]string{}, + map[string]interface{}{ + "cpu": 42.0, + }, + time.Unix(0, 0), + ), + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + message, err := readBody(r) + require.NoError(t, err) + require.Len(t, message.Metrics, 1) + require.Equal(t, "cpu", message.Metrics[0].Name) + require.Equal(t, 42.0, message.Metrics[0].Value) + w.WriteHeader(http.StatusOK) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tt.handler(t, w, r) + }) + tt.plugin.Log = testutil.Logger{} + tt.plugin.EndpointUrl = url + tt.plugin.MetadataTokenURL = metadataTokenUrl + tt.plugin.MetadataFolderURL = metadataFolderUrl + err := tt.plugin.Connect() + require.NoError(t, err) + + err = tt.plugin.Write(tt.metrics) + + require.NoError(t, err) + }) + } +}