From 8bcf39ac2d5d4c4b8d42dda25dedfe5b7fe3a64f Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Wed, 13 Feb 2019 16:49:05 +0200 Subject: [PATCH] Add datadog/statsd fix #196 (#915) --- Gopkg.lock | 9 + cmd/collectors.go | 17 + cmd/config.go | 6 + lib/options.go | 16 + lib/options_test.go | 23 +- release notes/upcoming.md | 13 + stats/datadog/collector.go | 74 ++ stats/datadog/collector_test.go | 45 ++ stats/statsd/collector.go | 33 + stats/statsd/collector_test.go | 16 + stats/statsd/common/api.go | 46 ++ stats/statsd/common/collector.go | 206 ++++++ stats/statsd/common/collector_test.go | 44 ++ stats/statsd/common/config.go | 67 ++ stats/statsd/common/testutil/test_helper.go | 133 ++++ .../github.com/DataDog/datadog-go/LICENSE.txt | 19 + .../DataDog/datadog-go/statsd/statsd.go | 680 ++++++++++++++++++ .../DataDog/datadog-go/statsd/udp.go | 40 ++ .../DataDog/datadog-go/statsd/uds.go | 67 ++ 19 files changed, 1553 insertions(+), 1 deletion(-) create mode 100644 stats/datadog/collector.go create mode 100644 stats/datadog/collector_test.go create mode 100644 stats/statsd/collector.go create mode 100644 stats/statsd/collector_test.go create mode 100644 stats/statsd/common/api.go create mode 100644 stats/statsd/common/collector.go create mode 100644 stats/statsd/common/collector_test.go create mode 100644 stats/statsd/common/config.go create mode 100644 stats/statsd/common/testutil/test_helper.go create mode 100644 vendor/github.com/DataDog/datadog-go/LICENSE.txt create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/statsd.go create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/udp.go create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/uds.go diff --git a/Gopkg.lock b/Gopkg.lock index 46885c38455..28928d10425 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -9,6 +9,14 @@ pruneopts = "NUT" revision = "4a21cbd618b459155f8b8ee7f4491cd54f5efa77" +[[projects]] + digest = "1:6d83aad9c98e13079ad8a4dbc740892edf5384bcd07f1d789bb66cd8ecfae2cd" + name = "github.com/DataDog/datadog-go" + packages = ["statsd"] + pruneopts = "NUT" + revision = "e67964b4021ad3a334e748e8811eb3cd6becbc6e" + version = "2.1.0" + [[projects]] branch = "master" digest = "1:7d8fe22a434ddfb29d95e10a0231eb2d96100b9eaaf7bff7312eee7df7a34bfa" @@ -644,6 +652,7 @@ analyzer-version = 1 input-imports = [ "github.com/Azure/go-ntlmssp", + "github.com/DataDog/datadog-go/statsd", "github.com/GeertJohan/go.rice", "github.com/GeertJohan/go.rice/embedded", "github.com/PuerkitoBio/goquery", diff --git a/cmd/collectors.go b/cmd/collectors.go index 0986f4e5102..aa395b672e6 100644 --- a/cmd/collectors.go +++ b/cmd/collectors.go @@ -29,9 +29,12 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/stats/cloud" + "github.com/loadimpact/k6/stats/datadog" "github.com/loadimpact/k6/stats/influxdb" jsonc "github.com/loadimpact/k6/stats/json" "github.com/loadimpact/k6/stats/kafka" + "github.com/loadimpact/k6/stats/statsd" + "github.com/loadimpact/k6/stats/statsd/common" "github.com/pkg/errors" "github.com/spf13/afero" ) @@ -41,6 +44,8 @@ const ( collectorJSON = "json" collectorKafka = "kafka" collectorCloud = "cloud" + collectorStatsD = "statsd" + collectorDatadog = "datadog" ) func parseCollector(s string) (t, arg string) { @@ -93,6 +98,18 @@ func newCollector(collectorName, arg string, src *lib.SourceData, conf Config) ( config = config.Apply(cmdConfig) } return kafka.New(config) + case collectorStatsD: + config := common.NewConfig().Apply(conf.Collectors.StatsD) + if err := envconfig.Process("k6_statsd", &config); err != nil { + return nil, err + } + return statsd.New(config) + case collectorDatadog: + config := datadog.NewConfig().Apply(conf.Collectors.Datadog) + if err := envconfig.Process("k6_datadog", &config); err != nil { + return nil, err + } + return datadog.New(config) default: return nil, errors.Errorf("unknown output type: %s", collectorName) } diff --git a/cmd/config.go b/cmd/config.go index 5eb2bd4d6a7..b1637cf259c 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -28,8 +28,10 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/stats/cloud" + "github.com/loadimpact/k6/stats/datadog" "github.com/loadimpact/k6/stats/influxdb" "github.com/loadimpact/k6/stats/kafka" + "github.com/loadimpact/k6/stats/statsd/common" "github.com/shibukawa/configdir" "github.com/spf13/afero" "github.com/spf13/pflag" @@ -74,6 +76,8 @@ type Config struct { InfluxDB influxdb.Config `json:"influxdb"` Kafka kafka.Config `json:"kafka"` Cloud cloud.Config `json:"cloud"` + StatsD common.Config `json:"statsd"` + Datadog datadog.Config `json:"datadog"` } `json:"collectors"` } @@ -97,6 +101,8 @@ func (c Config) Apply(cfg Config) Config { c.Collectors.InfluxDB = c.Collectors.InfluxDB.Apply(cfg.Collectors.InfluxDB) c.Collectors.Cloud = c.Collectors.Cloud.Apply(cfg.Collectors.Cloud) c.Collectors.Kafka = c.Collectors.Kafka.Apply(cfg.Collectors.Kafka) + c.Collectors.StatsD = c.Collectors.StatsD.Apply(cfg.Collectors.StatsD) + c.Collectors.Datadog = c.Collectors.Datadog.Apply(cfg.Collectors.Datadog) return c } diff --git a/lib/options.go b/lib/options.go index 8285e4764d8..8478febd79b 100644 --- a/lib/options.go +++ b/lib/options.go @@ -21,11 +21,13 @@ package lib import ( + "bytes" "crypto/tls" "encoding/json" "fmt" "net" "reflect" + "strings" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" @@ -73,6 +75,20 @@ func (t *TagSet) UnmarshalJSON(data []byte) error { return nil } +// UnmarshalText converts the tag list to tagset. +func (t *TagSet) UnmarshalText(data []byte) error { + var list = bytes.Split(data, []byte(",")) + *t = make(map[string]bool, len(list)) + for _, key := range list { + key := strings.TrimSpace(string(key)) + if key == "" { + continue + } + (*t)[key] = true + } + return nil +} + // Describes a TLS version. Serialised to/from JSON as a string, eg. "tls1.2". type TLSVersion int diff --git a/lib/options_test.go b/lib/options_test.go index 0e78c38cfbf..c3bcc8ad69b 100644 --- a/lib/options_test.go +++ b/lib/options_test.go @@ -33,7 +33,8 @@ import ( "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/stretchr/testify/assert" - "gopkg.in/guregu/null.v3" + "github.com/stretchr/testify/require" + null "gopkg.in/guregu/null.v3" ) func TestOptions(t *testing.T) { @@ -480,3 +481,23 @@ func TestOptionsEnv(t *testing.T) { }) } } + +func TestTagSetTextUnmarshal(t *testing.T) { + + var testMatrix = map[string]map[string]bool{ + "": {}, + "test": {"test": true}, + "test1,test2": {"test1": true, "test2": true}, + " test1 , test2 ": {"test1": true, "test2": true}, + " test1 , , test2 ": {"test1": true, "test2": true}, + " test1 ,, test2 ,,": {"test1": true, "test2": true}, + } + + for input, expected := range testMatrix { + var set = new(TagSet) + err := set.UnmarshalText([]byte(input)) + require.NoError(t, err) + + require.Equal(t, (map[string]bool)(*set), expected) + } +} diff --git a/release notes/upcoming.md b/release notes/upcoming.md index 5bbb85ab4c6..75f3d655ea4 100644 --- a/release notes/upcoming.md +++ b/release notes/upcoming.md @@ -8,6 +8,19 @@ You can now specify a file for all things logged by `console.log` to get written Thanks to @cheesedosa for both proposing and implementing this! +### New result outputs: StatsD and Datadog (#915) + +You can now output any metrics k6 collects to StatsD or Datadog by running `k6 run --out statsd script.js` or `k6 run --out datadog script.js` respectively. Both are very similar, but Datadog has a concept of metric tags, the key-value metadata pairs that will allow you to distinguish between requests for different URLs, response statuses, different groups, etc. + +Some details: +- By default both outputs send metrics to a local agent listening on `localhost:8125` (currently only UDP is supported as a transport). You can change this address via the `K6_DATADOG_ADDR` or `K6_STATSD_ADDR` environment variables, by setting their values in the format of `address:port`. +- The new outputs also support adding a `namespace` - a prefix before all the metric names. You can set it via the `K6_DATADOG_NAMESPACE` or `K6_STATSD_NAMESPACE` environment variables respectively. Its default value is `k6.` - notice the dot at the end. +- You can configure how often data batches are sent via the `K6_STATSD_PUSH_INTERVAL` / `K6_DATADOG_PUSH_INTEVAL` environment variables. The default value is `1s`. +- Another performance tweak can be done by changing the default buffer size of 20 through `K6_STATSD_BUFFER_SIZE` / `K6_DATADOG_BUFFER_SIZE`. +- In the case of Datadog, there is an additional configuration `K6_DATADOG_TAG_BLACKLIST`, which by default is equal to `` (nothing). This is a comma separated list of tags that should *NOT* be sent to Datadog. All other metric tags that k6 emits will be sent. + +Thanks to @ivoreis for their work on this! + ### k6/crypto: Random bytes method (#922) This feature adds a method to return an array with a number of cryptographically random bytes. It will either return exactly the amount of bytes requested or will throw an exception if something went wrong. diff --git a/stats/datadog/collector.go b/stats/datadog/collector.go new file mode 100644 index 00000000000..c31c8ed9f99 --- /dev/null +++ b/stats/datadog/collector.go @@ -0,0 +1,74 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2019 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package datadog + +import ( + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/stats/statsd/common" +) + +type tagHandler lib.TagSet + +func (t tagHandler) processTags(tags map[string]string) []string { + var res []string + + for key, value := range tags { + if value != "" && !t[key] { + res = append(res, key+":"+value) + } + } + return res +} + +// Config defines the datadog configuration +type Config struct { + common.Config + + TagBlacklist lib.TagSet `json:"tagBlacklist,omitempty" envconfig:"TAG_BLACKLIST"` +} + +// Apply saves config non-zero config values from the passed config in the receiver. +func (c Config) Apply(cfg Config) Config { + c.Config = c.Config.Apply(cfg.Config) + + if cfg.TagBlacklist != nil { + c.TagBlacklist = cfg.TagBlacklist + } + + return c +} + +// NewConfig creates a new Config instance with default values for some fields. +func NewConfig() Config { + return Config{ + Config: common.NewConfig(), + TagBlacklist: lib.GetTagSet(), + } +} + +// New creates a new statsd connector client +func New(conf Config) (*common.Collector, error) { + return &common.Collector{ + Config: conf.Config, + Type: "datadog", + ProcessTags: tagHandler(conf.TagBlacklist).processTags, + }, nil +} diff --git a/stats/datadog/collector_test.go b/stats/datadog/collector_test.go new file mode 100644 index 00000000000..33daeaee691 --- /dev/null +++ b/stats/datadog/collector_test.go @@ -0,0 +1,45 @@ +package datadog + +import ( + "strings" + "testing" + + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/stats" + "github.com/loadimpact/k6/stats/statsd/common" + "github.com/loadimpact/k6/stats/statsd/common/testutil" + "github.com/stretchr/testify/require" +) + +func TestCollector(t *testing.T) { + var tagSet = lib.GetTagSet("tag1", "tag2") + var handler = tagHandler(tagSet) + testutil.BaseTest(t, func(config common.Config) (*common.Collector, error) { + return New(NewConfig().Apply(Config{ + TagBlacklist: tagSet, + Config: config, + })) + }, func(t *testing.T, containers []stats.SampleContainer, expectedOutput, output string) { + var outputLines = strings.Split(output, "\n") + var expectedOutputLines = strings.Split(expectedOutput, "\n") + for i, container := range containers { + for j, sample := range container.GetSamples() { + var ( + expectedTagList = handler.processTags(sample.GetTags().CloneTags()) + expectedOutputLine = expectedOutputLines[i*j+i] + outputLine = outputLines[i*j+i] + outputWithoutTags = outputLine + outputTagList = []string{} + tagSplit = strings.LastIndex(outputLine, "|#") + ) + + if tagSplit != -1 { + outputWithoutTags = outputLine[:tagSplit] + outputTagList = strings.Split(outputLine[tagSplit+len("|#"):], ",") + } + require.Equal(t, expectedOutputLine, outputWithoutTags) + require.ElementsMatch(t, expectedTagList, outputTagList) + } + } + }) +} diff --git a/stats/statsd/collector.go b/stats/statsd/collector.go new file mode 100644 index 00000000000..6a2f0f8451c --- /dev/null +++ b/stats/statsd/collector.go @@ -0,0 +1,33 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2019 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package statsd + +import ( + "github.com/loadimpact/k6/stats/statsd/common" +) + +// New creates a new statsd connector client +func New(conf common.Config) (*common.Collector, error) { + return &common.Collector{ + Config: conf, + Type: "statsd", + }, nil +} diff --git a/stats/statsd/collector_test.go b/stats/statsd/collector_test.go new file mode 100644 index 00000000000..e5947861a09 --- /dev/null +++ b/stats/statsd/collector_test.go @@ -0,0 +1,16 @@ +package statsd + +import ( + "testing" + + "github.com/loadimpact/k6/stats" + "github.com/loadimpact/k6/stats/statsd/common/testutil" + "github.com/stretchr/testify/require" +) + +func TestCollector(t *testing.T) { + testutil.BaseTest(t, New, + func(t *testing.T, _ []stats.SampleContainer, expectedOutput, output string) { + require.Equal(t, expectedOutput, output) + }) +} diff --git a/stats/statsd/common/api.go b/stats/statsd/common/api.go new file mode 100644 index 00000000000..3c6978831aa --- /dev/null +++ b/stats/statsd/common/api.go @@ -0,0 +1,46 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2019 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package common + +import ( + "time" + + "github.com/loadimpact/k6/stats" +) + +// Sample defines a sample type +type Sample struct { + Type stats.MetricType `json:"type"` + Metric string `json:"metric"` + Time time.Time `json:"time"` + Value float64 `json:"value"` + Tags map[string]string `json:"tags,omitempty"` +} + +func generateDataPoint(sample stats.Sample) *Sample { + return &Sample{ + Type: sample.Metric.Type, + Metric: sample.Metric.Name, + Time: sample.Time, + Value: sample.Value, + Tags: sample.Tags.CloneTags(), + } +} diff --git a/stats/statsd/common/collector.go b/stats/statsd/common/collector.go new file mode 100644 index 00000000000..99d4b2ec638 --- /dev/null +++ b/stats/statsd/common/collector.go @@ -0,0 +1,206 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2019 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package common + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/DataDog/datadog-go/statsd" + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/stats" + log "github.com/sirupsen/logrus" +) + +var _ lib.Collector = &Collector{} + +// Collector sends result data to statsd daemons with the ability to send to datadog as well +type Collector struct { + Config Config + Type string + // ProcessTags is called on a map of all tags for each metric and returns a slice representation + // of those tags that should be sent. No tags are send in case of ProcessTags being null + ProcessTags func(map[string]string) []string + + logger *log.Entry + client *statsd.Client + startTime time.Time + buffer []*Sample + bufferLock sync.Mutex +} + +// Init sets up the collector +func (c *Collector) Init() (err error) { + c.logger = log.WithField("type", c.Type) + if address := c.Config.Addr.String; address == "" { + err = fmt.Errorf( + "connection string is invalid. Received: \"%+s\"", + address, + ) + c.logger.Error(err) + + return err + } + + c.client, err = statsd.NewBuffered(c.Config.Addr.String, int(c.Config.BufferSize.Int64)) + + if err != nil { + c.logger.Errorf("Couldn't make buffered client, %s", err) + return err + } + + if namespace := c.Config.Namespace.String; namespace != "" { + c.client.Namespace = namespace + } + + return nil +} + +// Link returns the address of the client +func (c *Collector) Link() string { + return c.Config.Addr.String +} + +// Run the collector +func (c *Collector) Run(ctx context.Context) { + c.logger.Debugf("%s: Running!", c.Type) + ticker := time.NewTicker(time.Duration(c.Config.PushInterval.Duration)) + c.startTime = time.Now() + + for { + select { + case <-ticker.C: + c.pushMetrics() + case <-ctx.Done(): + c.pushMetrics() + c.finish() + return + } + } +} + +// GetRequiredSystemTags Return the required system sample tags for the specific collector +func (c *Collector) GetRequiredSystemTags() lib.TagSet { + return lib.TagSet{} // no tags are required +} + +// SetRunStatus does nothing in statsd collector +func (c *Collector) SetRunStatus(status lib.RunStatus) {} + +// Collect metrics +func (c *Collector) Collect(containers []stats.SampleContainer) { + var pointSamples []*Sample + + for _, container := range containers { + for _, sample := range container.GetSamples() { + pointSamples = append(pointSamples, generateDataPoint(sample)) + } + } + + if len(pointSamples) > 0 { + c.bufferLock.Lock() + c.buffer = append(c.buffer, pointSamples...) + c.bufferLock.Unlock() + } +} + +func (c *Collector) pushMetrics() { + c.bufferLock.Lock() + if len(c.buffer) == 0 { + c.bufferLock.Unlock() + return + } + buffer := c.buffer + c.buffer = nil + c.bufferLock.Unlock() + + c.logger. + WithField("samples", len(buffer)). + Debug("Pushing metrics to server") + + if err := c.commit(buffer); err != nil { + c.logger. + WithError(err). + Error("Couldn't commit a batch") + } +} + +func (c *Collector) finish() { + // Close when context is done + if err := c.client.Close(); err != nil { + c.logger.Warnf("Error closing the client, %+v", err) + } +} + +func (c *Collector) commit(data []*Sample) error { + var errorCount int + for _, entry := range data { + if err := c.dispatch(entry); err != nil { + // No need to return error if just one metric didn't go through + c.logger.WithError(err).Debugf("Error while sending metric %s", entry.Metric) + errorCount++ + } + } + if errorCount != 0 { + c.logger.Warnf("Couldn't send %d out of %d metrics. Enable debug logging to see individual errors", + errorCount, len(data)) + + } + return c.client.Flush() +} + +func (c *Collector) dispatch(entry *Sample) error { + var tagList []string + if c.ProcessTags != nil { + tagList = c.ProcessTags(entry.Tags) + } + + switch entry.Type { + case stats.Counter: + return c.client.Count(entry.Metric, int64(entry.Value), tagList, 1) + case stats.Trend: + return c.client.TimeInMilliseconds(entry.Metric, entry.Value, tagList, 1) + case stats.Gauge: + return c.client.Gauge(entry.Metric, entry.Value, tagList, 1) + case stats.Rate: + if check := entry.Tags["check"]; check != "" { + return c.client.Count( + checkToString(check, entry.Value), + 1, + tagList, + 1, + ) + } + return c.client.Count(entry.Metric, int64(entry.Value), tagList, 1) + default: + return fmt.Errorf("unsupported metric type %s", entry.Type) + } +} + +func checkToString(check string, value float64) string { + label := "pass" + if value == 0 { + label = "fail" + } + return "check." + check + "." + label +} diff --git a/stats/statsd/common/collector_test.go b/stats/statsd/common/collector_test.go new file mode 100644 index 00000000000..f193de2dd66 --- /dev/null +++ b/stats/statsd/common/collector_test.go @@ -0,0 +1,44 @@ +package common + +import ( + "testing" + + "github.com/loadimpact/k6/lib" + "github.com/stretchr/testify/require" + null "gopkg.in/guregu/null.v3" +) + +func TestInitWithoutAddressErrors(t *testing.T) { + var c = &Collector{ + Config: Config{}, + Type: "testtype", + } + err := c.Init() + require.Error(t, err) +} + +func TestInitWithBogusAddressErrors(t *testing.T) { + var c = &Collector{ + Config: Config{ + Addr: null.StringFrom("localhost:90000"), + }, + Type: "testtype", + } + err := c.Init() + require.Error(t, err) +} + +func TestLinkReturnAddress(t *testing.T) { + var bogusValue = "bogus value" + var c = &Collector{ + Config: Config{ + Addr: null.StringFrom(bogusValue), + }, + } + require.Equal(t, bogusValue, c.Link()) +} + +func TestGetRequiredSystemTags(t *testing.T) { + var c = &Collector{} + require.Equal(t, lib.TagSet{}, c.GetRequiredSystemTags()) +} diff --git a/stats/statsd/common/config.go b/stats/statsd/common/config.go new file mode 100644 index 00000000000..5d7518bed58 --- /dev/null +++ b/stats/statsd/common/config.go @@ -0,0 +1,67 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2019 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package common + +import ( + "time" + + "github.com/loadimpact/k6/lib/types" + null "gopkg.in/guregu/null.v3" +) + +// Config defines the statsd configuration +type Config struct { + Addr null.String `json:"addr,omitempty" envconfig:"ADDR"` + BufferSize null.Int `json:"bufferSize,omitempty" envconfig:"BUFFER_SIZE"` + Namespace null.String `json:"namespace,omitempty" envconfig:"NAMESPACE"` + PushInterval types.NullDuration `json:"pushInterval,omitempty" envconfig:"PUSH_INTERVAL"` +} + +// NewConfig creates a new Config instance with default values for some fields. +func NewConfig() Config { + return Config{ + Addr: null.NewString("localhost:8125", false), + BufferSize: null.NewInt(20, false), + Namespace: null.NewString("k6.", false), + PushInterval: types.NewNullDuration(1*time.Second, false), + } +} + +// Apply saves config non-zero config values from the passed config in the receiver. +func (c Config) Apply(cfg Config) Config { + if cfg.Addr.Valid { + c.Addr = cfg.Addr + } + + if cfg.BufferSize.Valid { + c.BufferSize = cfg.BufferSize + } + + if cfg.Namespace.Valid { + c.Namespace = cfg.Namespace + } + + if cfg.PushInterval.Valid { + c.PushInterval = cfg.PushInterval + } + + return c +} diff --git a/stats/statsd/common/testutil/test_helper.go b/stats/statsd/common/testutil/test_helper.go new file mode 100644 index 00000000000..b7c49e53a1b --- /dev/null +++ b/stats/statsd/common/testutil/test_helper.go @@ -0,0 +1,133 @@ +package testutil + +import ( + "context" + "net" + "testing" + "time" + + "github.com/loadimpact/k6/lib/types" + "github.com/loadimpact/k6/stats" + "github.com/loadimpact/k6/stats/statsd/common" + "github.com/stretchr/testify/require" + null "gopkg.in/guregu/null.v3" +) + +// BaseTest is a helper function to test statsd/datadog collector throughtly +func BaseTest(t *testing.T, + getCollector func(common.Config) (*common.Collector, error), + checkResult func(t *testing.T, samples []stats.SampleContainer, expectedOutput, output string), +) { + t.Helper() + var ( + testNamespace = "testing.things." // to be dynamic + ) + + addr, err := net.ResolveUDPAddr("udp", "localhost:0") + require.NoError(t, err) + listener, err := net.ListenUDP("udp", addr) // we want to listen on a random port + require.NoError(t, err) + var ch = make(chan string, 20) + var end = make(chan struct{}) + defer close(end) + + go func() { + defer close(ch) + var buf [4096]byte + for { + select { + case <-end: + return + default: + n, _, err := listener.ReadFromUDP(buf[:]) + require.NoError(t, err) + ch <- string(buf[:n]) + } + } + }() + var baseConfig = common.NewConfig().Apply(common.Config{ + Addr: null.StringFrom(listener.LocalAddr().String()), + Namespace: null.StringFrom(testNamespace), + BufferSize: null.IntFrom(5), + PushInterval: types.NullDurationFrom(time.Millisecond * 10), + }) + + collector, err := getCollector(baseConfig) + require.NoError(t, err) + require.NoError(t, collector.Init()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go collector.Run(ctx) + newSample := func(m *stats.Metric, value float64, tags map[string]string) stats.Sample { + return stats.Sample{Time: time.Now(), + Metric: m, Value: value, Tags: stats.IntoSampleTags(&tags)} + } + + myCounter := stats.New("my_counter", stats.Counter) + myGauge := stats.New("my_gauge", stats.Gauge) + myTrend := stats.New("my_trend", stats.Trend) + myRate := stats.New("my_rate", stats.Rate) + myCheck := stats.New("my_check", stats.Rate) + var testMatrix = []struct { + input []stats.SampleContainer + output string + }{ + { + input: []stats.SampleContainer{ + newSample(myCounter, 12, map[string]string{ + "tag1": "value1", + "tag3": "value3", + }), + }, + output: "testing.things.my_counter:12|c", + }, + { + input: []stats.SampleContainer{ + newSample(myGauge, 13, map[string]string{ + "tag1": "value1", + "tag3": "value3", + }), + }, + output: "testing.things.my_gauge:13.000000|g", + }, + { + input: []stats.SampleContainer{ + newSample(myTrend, 14, map[string]string{ + "tag1": "value1", + "tag3": "value3", + }), + }, + output: "testing.things.my_trend:14.000000|ms", + }, + { + input: []stats.SampleContainer{ + newSample(myRate, 15, map[string]string{ + "tag1": "value1", + "tag3": "value3", + }), + }, + output: "testing.things.my_rate:15|c", + }, + { + input: []stats.SampleContainer{ + newSample(myCheck, 16, map[string]string{ + "tag1": "value1", + "tag3": "value3", + "check": "max<100", + }), + newSample(myCheck, 0, map[string]string{ + "tag1": "value1", + "tag3": "value3", + "check": "max>100", + }), + }, + output: "testing.things.check.max<100.pass:1|c\ntesting.things.check.max>100.fail:1|c", + }, + } + for _, test := range testMatrix { + collector.Collect(test.input) + time.Sleep((time.Duration)(baseConfig.PushInterval.Duration)) + output := <-ch + checkResult(t, test.input, test.output, output) + } +} diff --git a/vendor/github.com/DataDog/datadog-go/LICENSE.txt b/vendor/github.com/DataDog/datadog-go/LICENSE.txt new file mode 100644 index 00000000000..97cd06d7fb1 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/LICENSE.txt @@ -0,0 +1,19 @@ +Copyright (c) 2015 Datadog, Inc + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/DataDog/datadog-go/statsd/statsd.go b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go new file mode 100644 index 00000000000..0ae4dac03cb --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go @@ -0,0 +1,680 @@ +// Copyright 2013 Ooyala, Inc. + +/* +Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd, +adding tags and histograms and pushing upstream to Datadog. + +Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD. + +Example Usage: + + // Create the client + c, err := statsd.New("127.0.0.1:8125") + if err != nil { + log.Fatal(err) + } + // Prefix every metric with the app name + c.Namespace = "flubber." + // Send the EC2 availability zone as a tag with every metric + c.Tags = append(c.Tags, "us-east-1a") + err = c.Gauge("request.duration", 1.2, nil, 1) + +statsd is based on go-statsd-client. +*/ +package statsd + +import ( + "bytes" + "errors" + "fmt" + "io" + "math/rand" + "strconv" + "strings" + "sync" + "time" +) + +/* +OptimalPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes +is optimal for regular networks with an MTU of 1500 so datagrams don't get +fragmented. It's generally recommended not to fragment UDP datagrams as losing +a single fragment will cause the entire datagram to be lost. + +This can be increased if your network has a greater MTU or you don't mind UDP +datagrams getting fragmented. The practical limit is MaxUDPPayloadSize +*/ +const OptimalPayloadSize = 1432 + +/* +MaxUDPPayloadSize defines the maximum payload size for a UDP datagram. +Its value comes from the calculation: 65535 bytes Max UDP datagram size - +8byte UDP header - 60byte max IP headers +any number greater than that will see frames being cut out. +*/ +const MaxUDPPayloadSize = 65467 + +/* +UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket +traffic instead of UDP. +*/ +const UnixAddressPrefix = "unix://" + +/* +Stat suffixes +*/ +var ( + gaugeSuffix = []byte("|g") + countSuffix = []byte("|c") + histogramSuffix = []byte("|h") + distributionSuffix = []byte("|d") + decrSuffix = []byte("-1|c") + incrSuffix = []byte("1|c") + setSuffix = []byte("|s") + timingSuffix = []byte("|ms") +) + +// A statsdWriter offers a standard interface regardless of the underlying +// protocol. For now UDS and UPD writers are available. +type statsdWriter interface { + Write(data []byte) (n int, err error) + SetWriteTimeout(time.Duration) error + Close() error +} + +// A Client is a handle for sending messages to dogstatsd. It is safe to +// use one Client from multiple goroutines simultaneously. +type Client struct { + // Writer handles the underlying networking protocol + writer statsdWriter + // Namespace to prepend to all statsd calls + Namespace string + // Tags are global tags to be added to every statsd call + Tags []string + // skipErrors turns off error passing and allows UDS to emulate UDP behaviour + SkipErrors bool + // BufferLength is the length of the buffer in commands. + bufferLength int + flushTime time.Duration + commands []string + buffer bytes.Buffer + stop chan struct{} + sync.Mutex +} + +// New returns a pointer to a new Client given an addr in the format "hostname:port" or +// "unix:///path/to/socket". +func New(addr string) (*Client, error) { + if strings.HasPrefix(addr, UnixAddressPrefix) { + w, err := newUdsWriter(addr[len(UnixAddressPrefix)-1:]) + if err != nil { + return nil, err + } + return NewWithWriter(w) + } + w, err := newUDPWriter(addr) + if err != nil { + return nil, err + } + return NewWithWriter(w) +} + +// NewWithWriter creates a new Client with given writer. Writer is a +// io.WriteCloser + SetWriteTimeout(time.Duration) error +func NewWithWriter(w statsdWriter) (*Client, error) { + client := &Client{writer: w, SkipErrors: false} + return client, nil +} + +// NewBuffered returns a Client that buffers its output and sends it in chunks. +// Buflen is the length of the buffer in number of commands. +func NewBuffered(addr string, buflen int) (*Client, error) { + client, err := New(addr) + if err != nil { + return nil, err + } + client.bufferLength = buflen + client.commands = make([]string, 0, buflen) + client.flushTime = time.Millisecond * 100 + client.stop = make(chan struct{}, 1) + go client.watch() + return client, nil +} + +// format a message from its name, value, tags and rate. Also adds global +// namespace and tags. +func (c *Client) format(name string, value interface{}, suffix []byte, tags []string, rate float64) string { + var buf bytes.Buffer + if c.Namespace != "" { + buf.WriteString(c.Namespace) + } + buf.WriteString(name) + buf.WriteString(":") + + switch val := value.(type) { + case float64: + buf.Write(strconv.AppendFloat([]byte{}, val, 'f', 6, 64)) + + case int64: + buf.Write(strconv.AppendInt([]byte{}, val, 10)) + + case string: + buf.WriteString(val) + + default: + // do nothing + } + buf.Write(suffix) + + if rate < 1 { + buf.WriteString(`|@`) + buf.WriteString(strconv.FormatFloat(rate, 'f', -1, 64)) + } + + writeTagString(&buf, c.Tags, tags) + + return buf.String() +} + +// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP. +func (c *Client) SetWriteTimeout(d time.Duration) error { + if c == nil { + return nil + } + return c.writer.SetWriteTimeout(d) +} + +func (c *Client) watch() { + ticker := time.NewTicker(c.flushTime) + + for { + select { + case <-ticker.C: + c.Lock() + if len(c.commands) > 0 { + // FIXME: eating error here + c.flushLocked() + } + c.Unlock() + case <-c.stop: + ticker.Stop() + return + } + } +} + +func (c *Client) append(cmd string) error { + c.Lock() + defer c.Unlock() + c.commands = append(c.commands, cmd) + // if we should flush, lets do it + if len(c.commands) == c.bufferLength { + if err := c.flushLocked(); err != nil { + return err + } + } + return nil +} + +func (c *Client) joinMaxSize(cmds []string, sep string, maxSize int) ([][]byte, []int) { + c.buffer.Reset() //clear buffer + + var frames [][]byte + var ncmds []int + sepBytes := []byte(sep) + sepLen := len(sep) + + elem := 0 + for _, cmd := range cmds { + needed := len(cmd) + + if elem != 0 { + needed = needed + sepLen + } + + if c.buffer.Len()+needed <= maxSize { + if elem != 0 { + c.buffer.Write(sepBytes) + } + c.buffer.WriteString(cmd) + elem++ + } else { + frames = append(frames, copyAndResetBuffer(&c.buffer)) + ncmds = append(ncmds, elem) + // if cmd is bigger than maxSize it will get flushed on next loop + c.buffer.WriteString(cmd) + elem = 1 + } + } + + //add whatever is left! if there's actually something + if c.buffer.Len() > 0 { + frames = append(frames, copyAndResetBuffer(&c.buffer)) + ncmds = append(ncmds, elem) + } + + return frames, ncmds +} + +func copyAndResetBuffer(buf *bytes.Buffer) []byte { + tmpBuf := make([]byte, buf.Len()) + copy(tmpBuf, buf.Bytes()) + buf.Reset() + return tmpBuf +} + +// Flush forces a flush of the pending commands in the buffer +func (c *Client) Flush() error { + if c == nil { + return nil + } + c.Lock() + defer c.Unlock() + return c.flushLocked() +} + +// flush the commands in the buffer. Lock must be held by caller. +func (c *Client) flushLocked() error { + frames, flushable := c.joinMaxSize(c.commands, "\n", OptimalPayloadSize) + var err error + cmdsFlushed := 0 + for i, data := range frames { + _, e := c.writer.Write(data) + if e != nil { + err = e + break + } + cmdsFlushed += flushable[i] + } + + // clear the slice with a slice op, doesn't realloc + if cmdsFlushed == len(c.commands) { + c.commands = c.commands[:0] + } else { + //this case will cause a future realloc... + // drop problematic command though (sorry). + c.commands = c.commands[cmdsFlushed+1:] + } + return err +} + +func (c *Client) sendMsg(msg string) error { + // return an error if message is bigger than MaxUDPPayloadSize + if len(msg) > MaxUDPPayloadSize { + return errors.New("message size exceeds MaxUDPPayloadSize") + } + + // if this client is buffered, then we'll just append this + if c.bufferLength > 0 { + return c.append(msg) + } + + _, err := c.writer.Write([]byte(msg)) + + if c.SkipErrors { + return nil + } + return err +} + +// send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags. +func (c *Client) send(name string, value interface{}, suffix []byte, tags []string, rate float64) error { + if c == nil { + return nil + } + if rate < 1 && rand.Float64() > rate { + return nil + } + data := c.format(name, value, suffix, tags, rate) + return c.sendMsg(data) +} + +// Gauge measures the value of a metric at a particular time. +func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error { + return c.send(name, value, gaugeSuffix, tags, rate) +} + +// Count tracks how many times something happened per second. +func (c *Client) Count(name string, value int64, tags []string, rate float64) error { + return c.send(name, value, countSuffix, tags, rate) +} + +// Histogram tracks the statistical distribution of a set of values on each host. +func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { + return c.send(name, value, histogramSuffix, tags, rate) +} + +// Distribution tracks the statistical distribution of a set of values across your infrastructure. +func (c *Client) Distribution(name string, value float64, tags []string, rate float64) error { + return c.send(name, value, distributionSuffix, tags, rate) +} + +// Decr is just Count of -1 +func (c *Client) Decr(name string, tags []string, rate float64) error { + return c.send(name, nil, decrSuffix, tags, rate) +} + +// Incr is just Count of 1 +func (c *Client) Incr(name string, tags []string, rate float64) error { + return c.send(name, nil, incrSuffix, tags, rate) +} + +// Set counts the number of unique elements in a group. +func (c *Client) Set(name string, value string, tags []string, rate float64) error { + return c.send(name, value, setSuffix, tags, rate) +} + +// Timing sends timing information, it is an alias for TimeInMilliseconds +func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error { + return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate) +} + +// TimeInMilliseconds sends timing information in milliseconds. +// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) +func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error { + return c.send(name, value, timingSuffix, tags, rate) +} + +// Event sends the provided Event. +func (c *Client) Event(e *Event) error { + if c == nil { + return nil + } + stat, err := e.Encode(c.Tags...) + if err != nil { + return err + } + return c.sendMsg(stat) +} + +// SimpleEvent sends an event with the provided title and text. +func (c *Client) SimpleEvent(title, text string) error { + e := NewEvent(title, text) + return c.Event(e) +} + +// ServiceCheck sends the provided ServiceCheck. +func (c *Client) ServiceCheck(sc *ServiceCheck) error { + if c == nil { + return nil + } + stat, err := sc.Encode(c.Tags...) + if err != nil { + return err + } + return c.sendMsg(stat) +} + +// SimpleServiceCheck sends an serviceCheck with the provided name and status. +func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error { + sc := NewServiceCheck(name, status) + return c.ServiceCheck(sc) +} + +// Close the client connection. +func (c *Client) Close() error { + if c == nil { + return nil + } + select { + case c.stop <- struct{}{}: + default: + } + + // if this client is buffered, flush before closing the writer + if c.bufferLength > 0 { + if err := c.Flush(); err != nil { + return err + } + } + + return c.writer.Close() +} + +// Events support +// EventAlertType and EventAlertPriority became exported types after this issue was submitted: https://github.com/DataDog/datadog-go/issues/41 +// The reason why they got exported is so that client code can directly use the types. + +// EventAlertType is the alert type for events +type EventAlertType string + +const ( + // Info is the "info" AlertType for events + Info EventAlertType = "info" + // Error is the "error" AlertType for events + Error EventAlertType = "error" + // Warning is the "warning" AlertType for events + Warning EventAlertType = "warning" + // Success is the "success" AlertType for events + Success EventAlertType = "success" +) + +// EventPriority is the event priority for events +type EventPriority string + +const ( + // Normal is the "normal" Priority for events + Normal EventPriority = "normal" + // Low is the "low" Priority for events + Low EventPriority = "low" +) + +// An Event is an object that can be posted to your DataDog event stream. +type Event struct { + // Title of the event. Required. + Title string + // Text is the description of the event. Required. + Text string + // Timestamp is a timestamp for the event. If not provided, the dogstatsd + // server will set this to the current time. + Timestamp time.Time + // Hostname for the event. + Hostname string + // AggregationKey groups this event with others of the same key. + AggregationKey string + // Priority of the event. Can be statsd.Low or statsd.Normal. + Priority EventPriority + // SourceTypeName is a source type for the event. + SourceTypeName string + // AlertType can be statsd.Info, statsd.Error, statsd.Warning, or statsd.Success. + // If absent, the default value applied by the dogstatsd server is Info. + AlertType EventAlertType + // Tags for the event. + Tags []string +} + +// NewEvent creates a new event with the given title and text. Error checking +// against these values is done at send-time, or upon running e.Check. +func NewEvent(title, text string) *Event { + return &Event{ + Title: title, + Text: text, + } +} + +// Check verifies that an event is valid. +func (e Event) Check() error { + if len(e.Title) == 0 { + return fmt.Errorf("statsd.Event title is required") + } + if len(e.Text) == 0 { + return fmt.Errorf("statsd.Event text is required") + } + return nil +} + +// Encode returns the dogstatsd wire protocol representation for an event. +// Tags may be passed which will be added to the encoded output but not to +// the Event's list of tags, eg. for default tags. +func (e Event) Encode(tags ...string) (string, error) { + err := e.Check() + if err != nil { + return "", err + } + text := e.escapedText() + + var buffer bytes.Buffer + buffer.WriteString("_e{") + buffer.WriteString(strconv.FormatInt(int64(len(e.Title)), 10)) + buffer.WriteRune(',') + buffer.WriteString(strconv.FormatInt(int64(len(text)), 10)) + buffer.WriteString("}:") + buffer.WriteString(e.Title) + buffer.WriteRune('|') + buffer.WriteString(text) + + if !e.Timestamp.IsZero() { + buffer.WriteString("|d:") + buffer.WriteString(strconv.FormatInt(int64(e.Timestamp.Unix()), 10)) + } + + if len(e.Hostname) != 0 { + buffer.WriteString("|h:") + buffer.WriteString(e.Hostname) + } + + if len(e.AggregationKey) != 0 { + buffer.WriteString("|k:") + buffer.WriteString(e.AggregationKey) + + } + + if len(e.Priority) != 0 { + buffer.WriteString("|p:") + buffer.WriteString(string(e.Priority)) + } + + if len(e.SourceTypeName) != 0 { + buffer.WriteString("|s:") + buffer.WriteString(e.SourceTypeName) + } + + if len(e.AlertType) != 0 { + buffer.WriteString("|t:") + buffer.WriteString(string(e.AlertType)) + } + + writeTagString(&buffer, tags, e.Tags) + + return buffer.String(), nil +} + +// ServiceCheckStatus support +type ServiceCheckStatus byte + +const ( + // Ok is the "ok" ServiceCheck status + Ok ServiceCheckStatus = 0 + // Warn is the "warning" ServiceCheck status + Warn ServiceCheckStatus = 1 + // Critical is the "critical" ServiceCheck status + Critical ServiceCheckStatus = 2 + // Unknown is the "unknown" ServiceCheck status + Unknown ServiceCheckStatus = 3 +) + +// An ServiceCheck is an object that contains status of DataDog service check. +type ServiceCheck struct { + // Name of the service check. Required. + Name string + // Status of service check. Required. + Status ServiceCheckStatus + // Timestamp is a timestamp for the serviceCheck. If not provided, the dogstatsd + // server will set this to the current time. + Timestamp time.Time + // Hostname for the serviceCheck. + Hostname string + // A message describing the current state of the serviceCheck. + Message string + // Tags for the serviceCheck. + Tags []string +} + +// NewServiceCheck creates a new serviceCheck with the given name and status. Error checking +// against these values is done at send-time, or upon running sc.Check. +func NewServiceCheck(name string, status ServiceCheckStatus) *ServiceCheck { + return &ServiceCheck{ + Name: name, + Status: status, + } +} + +// Check verifies that an event is valid. +func (sc ServiceCheck) Check() error { + if len(sc.Name) == 0 { + return fmt.Errorf("statsd.ServiceCheck name is required") + } + if byte(sc.Status) < 0 || byte(sc.Status) > 3 { + return fmt.Errorf("statsd.ServiceCheck status has invalid value") + } + return nil +} + +// Encode returns the dogstatsd wire protocol representation for an serviceCheck. +// Tags may be passed which will be added to the encoded output but not to +// the Event's list of tags, eg. for default tags. +func (sc ServiceCheck) Encode(tags ...string) (string, error) { + err := sc.Check() + if err != nil { + return "", err + } + message := sc.escapedMessage() + + var buffer bytes.Buffer + buffer.WriteString("_sc|") + buffer.WriteString(sc.Name) + buffer.WriteRune('|') + buffer.WriteString(strconv.FormatInt(int64(sc.Status), 10)) + + if !sc.Timestamp.IsZero() { + buffer.WriteString("|d:") + buffer.WriteString(strconv.FormatInt(int64(sc.Timestamp.Unix()), 10)) + } + + if len(sc.Hostname) != 0 { + buffer.WriteString("|h:") + buffer.WriteString(sc.Hostname) + } + + writeTagString(&buffer, tags, sc.Tags) + + if len(message) != 0 { + buffer.WriteString("|m:") + buffer.WriteString(message) + } + + return buffer.String(), nil +} + +func (e Event) escapedText() string { + return strings.Replace(e.Text, "\n", "\\n", -1) +} + +func (sc ServiceCheck) escapedMessage() string { + msg := strings.Replace(sc.Message, "\n", "\\n", -1) + return strings.Replace(msg, "m:", `m\:`, -1) +} + +func removeNewlines(str string) string { + return strings.Replace(str, "\n", "", -1) +} + +func writeTagString(w io.Writer, tagList1, tagList2 []string) { + // the tag lists may be shared with other callers, so we cannot modify + // them in any way (which means we cannot append to them either) + // therefore we must make an entirely separate copy just for this call + totalLen := len(tagList1) + len(tagList2) + if totalLen == 0 { + return + } + tags := make([]string, 0, totalLen) + tags = append(tags, tagList1...) + tags = append(tags, tagList2...) + + io.WriteString(w, "|#") + io.WriteString(w, removeNewlines(tags[0])) + for _, tag := range tags[1:] { + io.WriteString(w, ",") + io.WriteString(w, removeNewlines(tag)) + } +} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/udp.go b/vendor/github.com/DataDog/datadog-go/statsd/udp.go new file mode 100644 index 00000000000..8af522c5bb4 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/udp.go @@ -0,0 +1,40 @@ +package statsd + +import ( + "errors" + "net" + "time" +) + +// udpWriter is an internal class wrapping around management of UDP connection +type udpWriter struct { + conn net.Conn +} + +// New returns a pointer to a new udpWriter given an addr in the format "hostname:port". +func newUDPWriter(addr string) (*udpWriter, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + writer := &udpWriter{conn: conn} + return writer, nil +} + +// SetWriteTimeout is not needed for UDP, returns error +func (w *udpWriter) SetWriteTimeout(d time.Duration) error { + return errors.New("SetWriteTimeout: not supported for UDP connections") +} + +// Write data to the UDP connection with no error handling +func (w *udpWriter) Write(data []byte) (int, error) { + return w.conn.Write(data) +} + +func (w *udpWriter) Close() error { + return w.conn.Close() +} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/uds.go b/vendor/github.com/DataDog/datadog-go/statsd/uds.go new file mode 100644 index 00000000000..31154ab4dd9 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/uds.go @@ -0,0 +1,67 @@ +package statsd + +import ( + "net" + "time" +) + +/* +UDSTimeout holds the default timeout for UDS socket writes, as they can get +blocking when the receiving buffer is full. +*/ +const defaultUDSTimeout = 1 * time.Millisecond + +// udsWriter is an internal class wrapping around management of UDS connection +type udsWriter struct { + // Address to send metrics to, needed to allow reconnection on error + addr net.Addr + // Established connection object, or nil if not connected yet + conn net.Conn + // write timeout + writeTimeout time.Duration +} + +// New returns a pointer to a new udsWriter given a socket file path as addr. +func newUdsWriter(addr string) (*udsWriter, error) { + udsAddr, err := net.ResolveUnixAddr("unixgram", addr) + if err != nil { + return nil, err + } + // Defer connection to first Write + writer := &udsWriter{addr: udsAddr, conn: nil, writeTimeout: defaultUDSTimeout} + return writer, nil +} + +// SetWriteTimeout allows the user to set a custom write timeout +func (w *udsWriter) SetWriteTimeout(d time.Duration) error { + w.writeTimeout = d + return nil +} + +// Write data to the UDS connection with write timeout and minimal error handling: +// create the connection if nil, and destroy it if the statsd server has disconnected +func (w *udsWriter) Write(data []byte) (int, error) { + // Try connecting (first packet or connection lost) + if w.conn == nil { + conn, err := net.Dial(w.addr.Network(), w.addr.String()) + if err != nil { + return 0, err + } + w.conn = conn + } + w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)) + n, e := w.conn.Write(data) + if e != nil { + // Statsd server disconnected, retry connecting at next packet + w.conn = nil + return 0, e + } + return n, e +} + +func (w *udsWriter) Close() error { + if w.conn != nil { + return w.conn.Close() + } + return nil +}