Skip to content

Commit

Permalink
#8295 Initial Yandex.Cloud monitoring (#8296)
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-volkov authored Nov 2, 2020
1 parent 68a4f18 commit 38796f0
Show file tree
Hide file tree
Showing 5 changed files with 383 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,4 @@ For documentation on the latest development code see the [documentation index][d
* [warp10](./plugins/outputs/warp10)
* [wavefront](./plugins/outputs/wavefront)
* [sumologic](./plugins/outputs/sumologic)
* [yandex_cloud_monitoring](./plugins/outputs/yandex_cloud_monitoring)
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/timestream"
_ "github.com/influxdata/telegraf/plugins/outputs/warp10"
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
_ "github.com/influxdata/telegraf/plugins/outputs/yandex_cloud_monitoring"
)
26 changes: 26 additions & 0 deletions plugins/outputs/yandex_cloud_monitoring/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Yandex Cloud Monitoring

This plugin will send custom metrics to Yandex Cloud Monitoring.
https://cloud.yandex.com/services/monitoring

### Configuration:

```toml
[[outputs.yandex_cloud_monitoring]]
## Timeout for HTTP writes.
# timeout = "20s"

## Yandex.Cloud monitoring API endpoint. Normally should not be changed
# endpoint_url = "https://monitoring.api.cloud.yandex.net/monitoring/v2/data/write"

## All user metrics should be sent with "custom" service specified. Normally should not be changed
# service = "custom"
```

### Authentication

This plugin currently support only YC.Compute metadata based authentication.

When plugin is working inside a YC.Compute instance it will take IAM token and Folder ID from instance metadata.

Other authentication methods will be added later.
259 changes: 259 additions & 0 deletions plugins/outputs/yandex_cloud_monitoring/yandex_cloud_monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package yandex_cloud_monitoring

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/selfstat"
)

// YandexCloudMonitoring allows publishing of metrics to the Yandex Cloud Monitoring custom metrics
// service
type YandexCloudMonitoring struct {
Timeout internal.Duration `toml:"timeout"`
EndpointUrl string `toml:"endpoint_url"`
Service string `toml:"service"`

Log telegraf.Logger

MetadataTokenURL string
MetadataFolderURL string
FolderID string
IAMToken string
IamTokenExpirationTime time.Time

client *http.Client

timeFunc func() time.Time

MetricOutsideWindow selfstat.Stat
}

type yandexCloudMonitoringMessage struct {
TS string `json:"ts,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Metrics []yandexCloudMonitoringMetric `json:"metrics"`
}

type yandexCloudMonitoringMetric struct {
Name string `json:"name"`
Labels map[string]string `json:"labels"`
MetricType string `json:"type,omitempty"` // DGAUGE|IGAUGE|COUNTER|RATE. Default: DGAUGE
TS string `json:"ts,omitempty"`
Value float64 `json:"value"`
}

type MetadataIamToken struct {
AccessToken string `json:"access_token"`
ExpiresIn int64 `json:"expires_in"`
TokenType string `json:"token_type"`
}

const (
defaultRequestTimeout = time.Second * 20
defaultEndpointUrl = "https://monitoring.api.cloud.yandex.net/monitoring/v2/data/write"
defaultMetadataTokenUrl = "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token"
defaultMetadataFolderUrl = "http://169.254.169.254/computeMetadata/v1/instance/attributes/folder-id"
)

var sampleConfig = `
## Timeout for HTTP writes.
# timeout = "20s"
## Yandex.Cloud monitoring API endpoint. Normally should not be changed
# endpoint_url = "https://monitoring.api.cloud.yandex.net/monitoring/v2/data/write"
## All user metrics should be sent with "custom" service specified. Normally should not be changed
# service = "custom"
`

// Description provides a description of the plugin
func (a *YandexCloudMonitoring) Description() string {
return "Send aggregated metrics to Yandex.Cloud Monitoring"
}

// SampleConfig provides a sample configuration for the plugin
func (a *YandexCloudMonitoring) SampleConfig() string {
return sampleConfig
}

// Connect initializes the plugin and validates connectivity
func (a *YandexCloudMonitoring) Connect() error {
if a.Timeout.Duration <= 0 {
a.Timeout.Duration = defaultRequestTimeout
}
if a.EndpointUrl == "" {
a.EndpointUrl = defaultEndpointUrl
}
if a.Service == "" {
a.Service = "custom"
}
if a.MetadataTokenURL == "" {
a.MetadataTokenURL = defaultMetadataTokenUrl
}
if a.MetadataFolderURL == "" {
a.MetadataFolderURL = defaultMetadataFolderUrl
}

a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: a.Timeout.Duration,
}

var err error
a.FolderID, err = a.getFolderIDFromMetadata()
if err != nil {
return err
}

a.Log.Infof("Writing to Yandex.Cloud Monitoring URL: %s", a.EndpointUrl)

tags := map[string]string{}
a.MetricOutsideWindow = selfstat.Register("yandex_cloud_monitoring", "metric_outside_window", tags)

return nil
}

// Close shuts down an any active connections
func (a *YandexCloudMonitoring) Close() error {
a.client = nil
return nil
}

// Write writes metrics to the remote endpoint
func (a *YandexCloudMonitoring) Write(metrics []telegraf.Metric) error {
var yandexCloudMonitoringMetrics []yandexCloudMonitoringMetric
for _, m := range metrics {
for _, field := range m.FieldList() {
yandexCloudMonitoringMetrics = append(
yandexCloudMonitoringMetrics,
yandexCloudMonitoringMetric{
Name: field.Key,
Labels: m.Tags(),
TS: fmt.Sprint(m.Time().Format(time.RFC3339)),
Value: field.Value.(float64),
},
)
}
}

var body []byte
jsonBytes, err := json.Marshal(
yandexCloudMonitoringMessage{
Metrics: yandexCloudMonitoringMetrics,
},
)

if err != nil {
return err
}
body = append(body, jsonBytes...)
body = append(jsonBytes, '\n')
return a.send(body)
}

func getResponseFromMetadata(c *http.Client, metadataUrl string) ([]byte, error) {
req, err := http.NewRequest("GET", metadataUrl, nil)
if err != nil {
return nil, fmt.Errorf("error creating request: %v", err)
}
req.Header.Set("Metadata-Flavor", "Google")
resp, err := c.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return nil, fmt.Errorf("unable to fetch instance metadata: [%s] %d",
metadataUrl, resp.StatusCode)
}
return body, nil
}

func (a *YandexCloudMonitoring) getFolderIDFromMetadata() (string, error) {
a.Log.Infof("getting folder ID in %s", a.MetadataFolderURL)
body, err := getResponseFromMetadata(a.client, a.MetadataFolderURL)
if err != nil {
return "", err
}
folderID := string(body)
if folderID == "" {
return "", fmt.Errorf("unable to fetch folder id from URL %s: %v", a.MetadataFolderURL, err)
}
return folderID, nil
}

func (a *YandexCloudMonitoring) getIAMTokenFromMetadata() (string, int, error) {
a.Log.Debugf("getting new IAM token in %s", a.MetadataTokenURL)
body, err := getResponseFromMetadata(a.client, a.MetadataTokenURL)
if err != nil {
return "", 0, err
}
var metadata MetadataIamToken
if err := json.Unmarshal(body, &metadata); err != nil {
return "", 0, err
}
if metadata.AccessToken == "" || metadata.ExpiresIn == 0 {
return "", 0, fmt.Errorf("unable to fetch authentication credentials %s: %v", a.MetadataTokenURL, err)
}
return metadata.AccessToken, int(metadata.ExpiresIn), nil
}

func (a *YandexCloudMonitoring) send(body []byte) error {
req, err := http.NewRequest("POST", a.EndpointUrl, bytes.NewBuffer(body))
if err != nil {
return err
}
q := req.URL.Query()
q.Add("folderId", a.FolderID)
q.Add("service", a.Service)
req.URL.RawQuery = q.Encode()

req.Header.Set("Content-Type", "application/json")
isTokenExpired := !a.IamTokenExpirationTime.After(time.Now())
if a.IAMToken == "" || isTokenExpired {
token, expiresIn, err := a.getIAMTokenFromMetadata()
if err != nil {
return err
}
a.IamTokenExpirationTime = time.Now().Add(time.Duration(expiresIn) * time.Second)
a.IAMToken = token
}
req.Header.Set("Authorization", "Bearer "+a.IAMToken)

a.Log.Debugf("sending metrics to %s", req.URL.String())
resp, err := a.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

_, err = ioutil.ReadAll(resp.Body)
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
return fmt.Errorf("failed to write batch: [%v] %s", resp.StatusCode, resp.Status)
}

return nil
}

func init() {
outputs.Add("yandex_cloud_monitoring", func() telegraf.Output {
return &YandexCloudMonitoring{
timeFunc: time.Now,
}
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package yandex_cloud_monitoring

import (
"encoding/json"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)

func TestWrite(t *testing.T) {
readBody := func(r *http.Request) (yandexCloudMonitoringMessage, error) {
decoder := json.NewDecoder(r.Body)
var message yandexCloudMonitoringMessage
err := decoder.Decode(&message)
require.NoError(t, err)
return message, nil
}

testMetadataHttpServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/token") {
token := MetadataIamToken{
AccessToken: "token1",
ExpiresIn: 123,
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
err := json.NewEncoder(w).Encode(token)
require.NoError(t, err)
} else if strings.HasSuffix(r.URL.Path, "/folder") {
_, err := io.WriteString(w, "folder1")
require.NoError(t, err)
}
w.WriteHeader(http.StatusOK)
}),
)
defer testMetadataHttpServer.Close()
metadataTokenUrl := "http://" + testMetadataHttpServer.Listener.Addr().String() + "/token"
metadataFolderUrl := "http://" + testMetadataHttpServer.Listener.Addr().String() + "/folder"

ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
url := "http://" + ts.Listener.Addr().String() + "/metrics"

tests := []struct {
name string
plugin *YandexCloudMonitoring
metrics []telegraf.Metric
handler func(t *testing.T, w http.ResponseWriter, r *http.Request)
}{
{
name: "metric is converted to json value",
plugin: &YandexCloudMonitoring{},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cluster",
map[string]string{},
map[string]interface{}{
"cpu": 42.0,
},
time.Unix(0, 0),
),
},
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
message, err := readBody(r)
require.NoError(t, err)
require.Len(t, message.Metrics, 1)
require.Equal(t, "cpu", message.Metrics[0].Name)
require.Equal(t, 42.0, message.Metrics[0].Value)
w.WriteHeader(http.StatusOK)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tt.handler(t, w, r)
})
tt.plugin.Log = testutil.Logger{}
tt.plugin.EndpointUrl = url
tt.plugin.MetadataTokenURL = metadataTokenUrl
tt.plugin.MetadataFolderURL = metadataFolderUrl
err := tt.plugin.Connect()
require.NoError(t, err)

err = tt.plugin.Write(tt.metrics)

require.NoError(t, err)
})
}
}

0 comments on commit 38796f0

Please sign in to comment.