From ff39ba60a0933c1be96207c6425e1dbd836fc181 Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Wed, 1 May 2019 15:09:20 -0500 Subject: [PATCH 01/13] datadog-statsd WIP --- plugins/inputs/statsd/datadog.go | 154 +++++++++++++++++++++++++++++++ plugins/inputs/statsd/statsd.go | 13 +++ 2 files changed, 167 insertions(+) create mode 100644 plugins/inputs/statsd/datadog.go diff --git a/plugins/inputs/statsd/datadog.go b/plugins/inputs/statsd/datadog.go new file mode 100644 index 0000000000000..06e7d486f0a51 --- /dev/null +++ b/plugins/inputs/statsd/datadog.go @@ -0,0 +1,154 @@ +package statsd + +import ( + "bytes" + "fmt" + "log" + "strconv" + "time" +) + +const ( + priorityNormal = "normal" + priorityLow = "low" +) + +// this is adapted from datadog's apache licensed version at +// https://github.com/DataDog/datadog-agent/blob/fcfc74f106ab1bd6991dfc6a7061c558d934158a/pkg/dogstatsd/parser.go#L173 +func (s *Statsd) parseDataDogEventMessage(message []byte, defaultHostname string) error { + // _e{title.length,text.length}:title|text + // [ + // |d:date_happened + // |p:priority + // |h:hostname + // |t:alert_type + // |s:source_type_nam + // |#tag1,tag2 + // ] + + messageRaw := bytes.SplitN(message, []byte(":"), 2) + if len(messageRaw) < 2 || len(messageRaw[0]) < 7 || len(messageRaw[1]) < 3 { + return fmt.Errorf("Invalid message format") + } + header := messageRaw[0] + message = messageRaw[1] + + rawLen := bytes.SplitN(header[3:], []byte(","), 2) + if len(rawLen) != 2 { + return fmt.Errorf("Invalid message format") + } + + titleLen, err := strconv.ParseInt(string(rawLen[0]), 10, 64) + if err != nil { + return fmt.Errorf("Invalid message format, could not parse title.length: '%s'", rawLen[0]) + } + + textLen, err := strconv.ParseInt(string(rawLen[1][:len(rawLen[1])-1]), 10, 64) + if err != nil { + return fmt.Errorf("Invalid message format, could not parse text.length: '%s'", rawLen[0]) + } + if titleLen+textLen+1 > int64(len(message)) { + return fmt.Errorf("Invalid message format, title.length and text.length exceed total message length") + } + + rawTitle := message[:titleLen] + rawText := message[titleLen+1 : titleLen+1+textLen] + message = message[titleLen+1+textLen:] + + if len(rawTitle) == 0 || len(rawText) == 0 { + return fmt.Errorf("Invalid event message format: empty 'title' or 'text' field") + } + + // event := metrics.Event{ + // Priority: metrics.EventPriorityNormal, + // AlertType: metrics.EventAlertTypeInfo, + // Title: string(rawTitle), + // Text: string(bytes.Replace(rawText, []byte("\\n"), []byte("\n"), -1)), + // } + + // Handle hostname, with a priority to the h: field, then the host: + // tag and finally the defaultHostname value + // Metadata + m := cachedEvent{ + name: string(rawTitle), + } + m.tags = make(map[string]string, bytes.Count(message[1:], []byte{','})+1) // allocate for the approximate number of tags + m.fields = make(map[string]interface{}, 8) + m.fields["alert_type"] = "info" // default event type + m.fields["text"] = string(rawText) + m.tags["hostname"] = defaultHostname + if len(message) > 1 { + rawMetadataFields := bytes.Split(message[1:], []byte{'|'}) + for i := range rawMetadataFields { + if len(rawMetadataFields[i]) < 2 { + log.Printf("W! [inputs.statsd] too short metadata field") + } + switch string(rawMetadataFields[i]) { + case "d:": + ts, err := strconv.ParseInt(string(rawMetadataFields[i][2:]), 10, 64) + if err != nil { + log.Printf("W! [inputs.statsd] skipping timestamp: %s", err) + continue + } + m.ts = time.Unix(ts, 0) + case "p:": + switch string(rawMetadataFields[i][2:]) { + case priorityLow: + m.fields["priority"] = priorityLow + case priorityNormal: + m.fields["priority"] = priorityNormal + default: + log.Printf("W! [inputs.statsd] skipping priority: %s", err) + continue + } + case "h:": + m.tags["hostname"] = string(rawMetadataFields[i][2:]) + case "t:": + switch string(rawMetadataFields[i][2:]) { + case "error": + m.fields["alert_type"] = "error" + case "warning": + m.fields["alert_type"] = "warning" + case "success": + m.fields["alert_type"] = "success" + case "info": + m.fields["alert_type"] = "info" + default: + log.Printf("W! [inputs.statsd] skipping priority: %s", err) + continue + } + case "k:": + // TODO(docmerlin): does this make sense? + m.tags["aggregation_key"] = string(rawMetadataFields[i][2:]) + case "s:": + m.fields["source_type_name"] = string(rawMetadataFields[i][2:]) + case "#": + parseDataDogTags(m.tags, string(rawMetadataFields[i][2:])) + //event.Tags, hostFromTags = parseTags(, defaultHostname) + default: + log.Printf("W! [inputs.statsd] unknown metadata type: '%s'", rawMetadataFields[i]) + } + } + } + return nil +} + +func parseDataDogTags(tags map[string]string, message string) { + //tags := make(map[string]string, strings.Count(message, ",")) + start := 0 + var k, v string + for i := range message { + switch message[i] { + case ',': + v = message[start:i] + start = i + 1 + if k == "" || v == "" { + continue + } + tags[k] = v + case ':': + k = message[start:i] + start = i + 1 + } + } +} diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 8b5e15502d20f..64db733ad4094 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -69,6 +69,10 @@ type Statsd struct { // statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/) ParseDataDogTags bool + // This flag enables parsing of data dog events from the dogstatsd extension to + // the statsd protocol + ParseDataDogEvents bool + // UDPPacketSize is deprecated, it's only here for legacy support // we now always create 1 max size buffer and then copy only what we need // into the in channel @@ -101,6 +105,7 @@ type Statsd struct { counters map[string]cachedcounter sets map[string]cachedset timings map[string]cachedtimings + events map[string]cachedEvent // bucket -> influx templates Templates []string @@ -144,6 +149,7 @@ type metric struct { additive bool samplerate float64 tags map[string]string + ts time.Time // for events } type cachedset struct { @@ -170,6 +176,13 @@ type cachedtimings struct { tags map[string]string } +type cachedEvent struct { + name string + fields map[string]interface{} + tags map[string]string + ts time.Time +} + func (_ *Statsd) Description() string { return "Statsd UDP/TCP Server" } From ac219219b25c799f062fa89a615dbce24461895b Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Wed, 1 May 2019 17:14:44 -0500 Subject: [PATCH 02/13] WIP --- plugins/inputs/statsd/datadog.go | 82 +++++++++++++------------- plugins/inputs/statsd/running_stats.go | 2 +- plugins/inputs/statsd/statsd.go | 51 +++++++--------- plugins/inputs/statsd/statsd_test.go | 43 +++++++++----- 4 files changed, 92 insertions(+), 86 deletions(-) diff --git a/plugins/inputs/statsd/datadog.go b/plugins/inputs/statsd/datadog.go index 06e7d486f0a51..59641a144cfbc 100644 --- a/plugins/inputs/statsd/datadog.go +++ b/plugins/inputs/statsd/datadog.go @@ -1,10 +1,10 @@ package statsd import ( - "bytes" "fmt" "log" "strconv" + "strings" "time" ) @@ -15,7 +15,7 @@ const ( // this is adapted from datadog's apache licensed version at // https://github.com/DataDog/datadog-agent/blob/fcfc74f106ab1bd6991dfc6a7061c558d934158a/pkg/dogstatsd/parser.go#L173 -func (s *Statsd) parseDataDogEventMessage(message []byte, defaultHostname string) error { +func (s *Statsd) parseDataDogEventMessage(now time.Time, message string, defaultHostname string) error { // _e{title.length,text.length}:title|text // [ // |d:date_happened @@ -25,25 +25,27 @@ func (s *Statsd) parseDataDogEventMessage(message []byte, defaultHostname string // |s:source_type_nam // |#tag1,tag2 // ] - - messageRaw := bytes.SplitN(message, []byte(":"), 2) + // + // + // tag is key:value + messageRaw := strings.SplitN(message, ":", 2) if len(messageRaw) < 2 || len(messageRaw[0]) < 7 || len(messageRaw[1]) < 3 { return fmt.Errorf("Invalid message format") } header := messageRaw[0] message = messageRaw[1] - rawLen := bytes.SplitN(header[3:], []byte(","), 2) + rawLen := strings.SplitN(header[3:], ",", 2) if len(rawLen) != 2 { return fmt.Errorf("Invalid message format") } - titleLen, err := strconv.ParseInt(string(rawLen[0]), 10, 64) + titleLen, err := strconv.ParseInt(rawLen[0], 10, 64) if err != nil { return fmt.Errorf("Invalid message format, could not parse title.length: '%s'", rawLen[0]) } - textLen, err := strconv.ParseInt(string(rawLen[1][:len(rawLen[1])-1]), 10, 64) + textLen, err := strconv.ParseInt(rawLen[1][:len(rawLen[1])-1], 10, 64) if err != nil { return fmt.Errorf("Invalid message format, could not parse text.length: '%s'", rawLen[0]) } @@ -59,96 +61,96 @@ func (s *Statsd) parseDataDogEventMessage(message []byte, defaultHostname string return fmt.Errorf("Invalid event message format: empty 'title' or 'text' field") } - // event := metrics.Event{ - // Priority: metrics.EventPriorityNormal, - // AlertType: metrics.EventAlertTypeInfo, - // Title: string(rawTitle), - // Text: string(bytes.Replace(rawText, []byte("\\n"), []byte("\n"), -1)), - // } - // Handle hostname, with a priority to the h: field, then the host: // tag and finally the defaultHostname value // Metadata m := cachedEvent{ - name: string(rawTitle), + name: rawTitle, } - m.tags = make(map[string]string, bytes.Count(message[1:], []byte{','})+1) // allocate for the approximate number of tags - m.fields = make(map[string]interface{}, 8) + m.tags = make(map[string]string, strings.Count(message[1:], ",")+2) // allocate for the approximate number of tags + m.fields = make(map[string]interface{}, 9) m.fields["alert_type"] = "info" // default event type m.fields["text"] = string(rawText) - m.tags["hostname"] = defaultHostname + m.tags["source"] = defaultHostname + m.ts = now if len(message) > 1 { - rawMetadataFields := bytes.Split(message[1:], []byte{'|'}) + rawMetadataFields := strings.Split(message[1:], "|") for i := range rawMetadataFields { if len(rawMetadataFields[i]) < 2 { log.Printf("W! [inputs.statsd] too short metadata field") } - switch string(rawMetadataFields[i]) { + switch rawMetadataFields[i] { case "d:": - ts, err := strconv.ParseInt(string(rawMetadataFields[i][2:]), 10, 64) + ts, err := strconv.ParseInt(rawMetadataFields[i][2:], 10, 64) if err != nil { log.Printf("W! [inputs.statsd] skipping timestamp: %s", err) continue } - m.ts = time.Unix(ts, 0) + m.fields["ts"] = ts case "p:": - switch string(rawMetadataFields[i][2:]) { + switch rawMetadataFields[i][2:] { case priorityLow: m.fields["priority"] = priorityLow case priorityNormal: m.fields["priority"] = priorityNormal default: - log.Printf("W! [inputs.statsd] skipping priority: %s", err) + log.Printf("W! [inputs.statsd] skipping priority") continue } case "h:": - m.tags["hostname"] = string(rawMetadataFields[i][2:]) + m.tags["source"] = rawMetadataFields[i][2:] case "t:": - switch string(rawMetadataFields[i][2:]) { + switch rawMetadataFields[i][2:] { case "error": m.fields["alert_type"] = "error" case "warning": m.fields["alert_type"] = "warning" case "success": m.fields["alert_type"] = "success" - case "info": - m.fields["alert_type"] = "info" + case "info": // already set for info default: - log.Printf("W! [inputs.statsd] skipping priority: %s", err) + log.Printf("W! [inputs.statsd] skipping alert type") continue } case "k:": // TODO(docmerlin): does this make sense? - m.tags["aggregation_key"] = string(rawMetadataFields[i][2:]) + m.tags["aggregation_key"] = rawMetadataFields[i][2:] case "s:": - m.fields["source_type_name"] = string(rawMetadataFields[i][2:]) + m.fields["source_type_name"] = rawMetadataFields[i][2:] case "#": - parseDataDogTags(m.tags, string(rawMetadataFields[i][2:])) - //event.Tags, hostFromTags = parseTags(, defaultHostname) + parseDataDogTags(m.tags, rawMetadataFields[i][2:]) default: log.Printf("W! [inputs.statsd] unknown metadata type: '%s'", rawMetadataFields[i]) } } } + s.Lock() + s.events = append(s.events, m) + s.Unlock() return nil } func parseDataDogTags(tags map[string]string, message string) { - //tags := make(map[string]string, strings.Count(message, ",")) - start := 0 + start, i := 0, 0 var k, v string - for i := range message { - switch message[i] { - case ',': + var inVal bool // check if we are parsing the value part of the tag + for i = range message { + if message[i] == ',' { v = message[start:i] start = i + 1 - if k == "" || v == "" { + if k == "" { continue } tags[k] = v - case ':': + k, v, inVal = "", "", false // reset state vars + } else if message[i] == ':' && !inVal { k = message[start:i] start = i + 1 + inVal = true } } + // grab the last value + if k != "" { + tags[k] = message[start : i+1] + } } diff --git a/plugins/inputs/statsd/running_stats.go b/plugins/inputs/statsd/running_stats.go index 2395ab143f45d..6f8045b4279f2 100644 --- a/plugins/inputs/statsd/running_stats.go +++ b/plugins/inputs/statsd/running_stats.go @@ -49,7 +49,7 @@ func (rs *RunningStats) AddValue(v float64) { } // These are used for the running mean and variance - rs.n += 1 + rs.n++ rs.ex += v - rs.k rs.ex2 += (v - rs.k) * (v - rs.k) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 64db733ad4094..b3f888cb6a2c3 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -95,7 +95,7 @@ type Statsd struct { malformed int // Channel for all incoming statsd packets - in chan *bytes.Buffer + in chan input done chan struct{} // Cache gauges, counters & sets so they can be aggregated as they arrive @@ -105,7 +105,7 @@ type Statsd struct { counters map[string]cachedcounter sets map[string]cachedset timings map[string]cachedtimings - events map[string]cachedEvent + events []cachedEvent // bucket -> influx templates Templates []string @@ -136,6 +136,12 @@ type Statsd struct { bufPool sync.Pool } +type input struct { + *bytes.Buffer + time.Time + net.Addr +} + // One statsd metric, form is :||@ type metric struct { name string @@ -328,7 +334,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) - s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages) + s.in = make(chan input, s.AllowedPendingMessages) s.done = make(chan struct{}) s.accept = make(chan bool, s.MaxTCPConnections) s.conns = make(map[string]*net.TCPConn) @@ -452,7 +458,7 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { case <-s.done: return nil default: - n, _, err := conn.ReadFromUDP(buf) + n, addr, err := conn.ReadFromUDP(buf) if err != nil && !strings.Contains(err.Error(), "closed network") { log.Printf("E! Error READ: %s\n", err.Error()) continue @@ -462,7 +468,7 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { b.Write(buf[:n]) select { - case s.in <- b: + case s.in <- input{Buffer: b, Time: time.Now(), Addr: addr}: default: s.drops++ if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { @@ -481,12 +487,16 @@ func (s *Statsd) parser() error { select { case <-s.done: return nil - case buf := <-s.in: - lines := strings.Split(buf.String(), "\n") - s.bufPool.Put(buf) + case in := <-s.in: + lines := strings.Split(in.Buffer.String(), "\n") + s.bufPool.Put(in.Buffer) for _, line := range lines { line = strings.TrimSpace(line) - if line != "" { + switch { + case line == "": + case s.ParseDataDogEvents && len(line) > 2 && line[:2] == "_e": + s.parseDataDogEventMessage(time.Now(), line, in.Addr.String()) + default: s.parseStatsdLine(line) } } @@ -512,24 +522,7 @@ func (s *Statsd) parseStatsdLine(line string) error { for _, segment := range pipesplit { if len(segment) > 0 && segment[0] == '#' { // we have ourselves a tag; they are comma separated - tagstr := segment[1:] - tags := strings.Split(tagstr, ",") - for _, tag := range tags { - ts := strings.SplitN(tag, ":", 2) - var k, v string - switch len(ts) { - case 1: - // just a tag - k = ts[0] - v = "" - case 2: - k = ts[0] - v = ts[1] - } - if k != "" { - lineTags[k] = v - } - } + parseDataDogTags(lineTags, segment[1:]) } else { recombinedSegments = append(recombinedSegments, segment) } @@ -635,7 +628,7 @@ func (s *Statsd) parseStatsdLine(line string) error { case "h": m.tags["metric_type"] = "histogram" } - + fmt.Println("here: name:", m.name, lineTags) if len(lineTags) > 0 { for k, v := range lineTags { m.tags[k] = v @@ -844,7 +837,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { b.WriteByte('\n') select { - case s.in <- b: + case s.in <- input{Buffer: b, Time: time.Now(), Addr: conn.RemoteAddr()}: default: s.drops++ if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 1e50c8341f7d1..b99b4d51748f8 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -1,7 +1,6 @@ package statsd import ( - "bytes" "errors" "fmt" "net" @@ -17,8 +16,8 @@ const ( testMsg = "test.tcp.msg:100|c" ) -func newTestTcpListener() (*Statsd, chan *bytes.Buffer) { - in := make(chan *bytes.Buffer, 1500) +func newTestTCPListener() (*Statsd, chan input) { + in := make(chan input, 1500) listener := &Statsd{ Protocol: "tcp", ServiceAddress: "localhost:8125", @@ -35,7 +34,7 @@ func NewTestStatsd() *Statsd { // Make data structures s.done = make(chan struct{}) - s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages) + s.in = make(chan input, s.AllowedPendingMessages) s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) @@ -873,24 +872,28 @@ func TestParse_DataDogTags(t *testing.T) { "my_timer:3|ms|@0.1|#live,host:localhost", } - testTags := map[string]map[string]string{ + expectedTags := map[string]map[string]string{ "my_counter": { "host": "localhost", "environment": "prod", "endpoint": "/:tenant?/oauth/ro", + "metric_type": "counter", }, "my_gauge": { - "live": "", + "live": "", + "metric_type": "gauge", }, "my_set": { - "host": "localhost", + "host": "localhost", + "metric_type": "set", }, "my_timer": { - "live": "", - "host": "localhost", + "live": "", + "host": "localhost", + "metric_type": "timing", }, } @@ -901,21 +904,29 @@ func TestParse_DataDogTags(t *testing.T) { } } - sourceTags := map[string]map[string]string{ + actualTags := map[string]map[string]string{ "my_gauge": tagsForItem(s.gauges), "my_counter": tagsForItem(s.counters), "my_set": tagsForItem(s.sets), "my_timer": tagsForItem(s.timings), } - - for statName, tags := range testTags { - for k, v := range tags { - otherValue := sourceTags[statName][k] - if sourceTags[statName][k] != v { - t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue) + for name, tags := range expectedTags { + for expectedK, expectedV := range tags { + if expectedV != actualTags[name][expectedK] { + t.Errorf("failed: expected: %#v != %#v", tags, actualTags[name]) } } } + + // for statName, tags := range expectedTags { + // for expectedK, expectedV := range tags { + // otherValue := sourceTags[statName][k] + // if sourceTags[statName][expectedK] != v { + // t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue) + // fmt.Print(sourceTags[statName]) + // } + // } + // } } func tagsForItem(m interface{}) map[string]string { From 28cfc58cea525442dd9ac1a6400da39d08714bbd Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Thu, 2 May 2019 10:39:40 -0500 Subject: [PATCH 03/13] WIP and adding tests --- plugins/inputs/statsd/datadog.go | 29 ++- plugins/inputs/statsd/datadog_test.go | 292 ++++++++++++++++++++++++++ plugins/inputs/statsd/statsd.go | 4 +- plugins/inputs/statsd/statsd_test.go | 20 +- 4 files changed, 321 insertions(+), 24 deletions(-) create mode 100644 plugins/inputs/statsd/datadog_test.go diff --git a/plugins/inputs/statsd/datadog.go b/plugins/inputs/statsd/datadog.go index 59641a144cfbc..a363e65b31cef 100644 --- a/plugins/inputs/statsd/datadog.go +++ b/plugins/inputs/statsd/datadog.go @@ -13,9 +13,11 @@ const ( priorityLow = "low" ) +var uncommenter = strings.NewReplacer("\\n", "\n") + // this is adapted from datadog's apache licensed version at // https://github.com/DataDog/datadog-agent/blob/fcfc74f106ab1bd6991dfc6a7061c558d934158a/pkg/dogstatsd/parser.go#L173 -func (s *Statsd) parseDataDogEventMessage(now time.Time, message string, defaultHostname string) error { +func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostname string) error { // _e{title.length,text.length}:title|text // [ // |d:date_happened @@ -67,12 +69,18 @@ func (s *Statsd) parseDataDogEventMessage(now time.Time, message string, default m := cachedEvent{ name: rawTitle, } - m.tags = make(map[string]string, strings.Count(message[1:], ",")+2) // allocate for the approximate number of tags + m.tags = make(map[string]string, strings.Count(message, ",")+2) // allocate for the approximate number of tags m.fields = make(map[string]interface{}, 9) - m.fields["alert_type"] = "info" // default event type - m.fields["text"] = string(rawText) + m.fields["alert-type"] = "info" // default event type + m.fields["text"] = uncommenter.Replace(string(rawText)) m.tags["source"] = defaultHostname + m.fields["priority"] = priorityNormal m.ts = now + if len(message) == 0 { + s.events = append(s.events, m) + return nil + } + if len(message) > 1 { rawMetadataFields := strings.Split(message[1:], "|") for i := range rawMetadataFields { @@ -91,8 +99,7 @@ func (s *Statsd) parseDataDogEventMessage(now time.Time, message string, default switch rawMetadataFields[i][2:] { case priorityLow: m.fields["priority"] = priorityLow - case priorityNormal: - m.fields["priority"] = priorityNormal + case priorityNormal: // we already used this as a default default: log.Printf("W! [inputs.statsd] skipping priority") continue @@ -102,11 +109,11 @@ func (s *Statsd) parseDataDogEventMessage(now time.Time, message string, default case "t:": switch rawMetadataFields[i][2:] { case "error": - m.fields["alert_type"] = "error" + m.fields["alert-type"] = "error" case "warning": - m.fields["alert_type"] = "warning" + m.fields["alert-type"] = "warning" case "success": - m.fields["alert_type"] = "success" + m.fields["alert-type"] = "success" case "info": // already set for info default: log.Printf("W! [inputs.statsd] skipping alert type") @@ -114,9 +121,9 @@ func (s *Statsd) parseDataDogEventMessage(now time.Time, message string, default } case "k:": // TODO(docmerlin): does this make sense? - m.tags["aggregation_key"] = rawMetadataFields[i][2:] + m.tags["aggregation-key"] = rawMetadataFields[i][2:] case "s:": - m.fields["source_type_name"] = rawMetadataFields[i][2:] + m.fields["source-type-name"] = rawMetadataFields[i][2:] case "#": parseDataDogTags(m.tags, rawMetadataFields[i][2:]) default: diff --git a/plugins/inputs/statsd/datadog_test.go b/plugins/inputs/statsd/datadog_test.go new file mode 100644 index 0000000000000..34570dee0cb85 --- /dev/null +++ b/plugins/inputs/statsd/datadog_test.go @@ -0,0 +1,292 @@ +package statsd + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventMinimal(t *testing.T) { + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text", "default-hostname") + require.Nil(t, err) + e := s.events[0] + + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, now, e.fields["ts"]) + assert.Equal(t, nil, e.fields["ts"]) + assert.Equal(t, priorityNormal, e.fields["priority"]) + assert.Equal(t, "default-hostname", e.tags["source"]) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, nil, e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) +} + +func TestEventMultilinesText(t *testing.T) { + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,24}:test title|test\\line1\\nline2\\nline3", "default-hostname") + + require.Nil(t, err) + e := s.events[0] + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test\\line1\nline2\nline3", e.fields["text"]) + assert.Equal(t, nil, e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "default-hostname", e.tags["source"]) + assert.Equal(t, []string(nil), e.tags) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "", e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) +} + +func TestEventPipeInTitle(t *testing.T) { + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,24}:test|title|test\\line1\\nline2\\nline3", "default-hostname") + + require.Nil(t, err) + e := s.events[0] + assert.Equal(t, "test|title", e.name) + assert.Equal(t, "test\\line1\nline2\nline3", e.fields["text"]) + assert.Equal(t, nil, e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "default-hostname", e.tags["source"]) + assert.Len(t, e.tags, 1) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) +} + +func TestEventError(t *testing.T) { + now := time.Now() + s := NewTestStatsd() + // missing length header + err := s.parseEventMessage(now, "_e:title|text", "default-hostname") + assert.Error(t, err) + + // greater length than packet + err = s.parseEventMessage(now, "_e{10,10}:title|text", "default-hostname") + assert.Error(t, err) + + // zero length + err = s.parseEventMessage(now, "_e{0,0}:a|a", "default-hostname") + assert.Error(t, err) + + // missing title or text length + err = s.parseEventMessage(now, "_e{5555:title|text", "default-hostname") + assert.Error(t, err) + + // missing wrong len format + err = s.parseEventMessage(now, "_e{a,1}:title|text", "default-hostname") + assert.Error(t, err) + + err = s.parseEventMessage(now, "_e{1,a}:title|text", "default-hostname") + assert.Error(t, err) + + // missing title or text length + err = s.parseEventMessage(now, "_e{5,}:title|text", "default-hostname") + assert.Error(t, err) + + err = s.parseEventMessage(now, "_e{,4}:title|text", "default-hostname") + assert.Error(t, err) + + err = s.parseEventMessage(now, "_e{}:title|text", "default-hostname") + assert.Error(t, err) + + err = s.parseEventMessage(now, "_e{,}:title|text", "default-hostname") + assert.Error(t, err) + + // not enough information + err = s.parseEventMessage(now, "_e|text", "default-hostname") + assert.Error(t, err) + + err = s.parseEventMessage(now, "_e:|text", "default-hostname") + assert.Error(t, err) + + // invalid timestamp + err = s.parseEventMessage(now, "_e{5,4}:title|text|d:abc", "default-hostname") + assert.NoError(t, err) + + // invalid priority + err = s.parseEventMessage(now, "_e{5,4}:title|text|p:urgent", "default-hostname") + assert.NoError(t, err) + + // invalid priority + err = s.parseEventMessage(now, "_e{5,4}:title|text|p:urgent", "default-hostname") + assert.NoError(t, err) + + // invalid alert type + err = s.parseEventMessage(now, "_e{5,4}:title|text|t:test", "default-hostname") + assert.NoError(t, err) + + // unknown metadata + err = s.parseEventMessage(now, "_e{5,4}:title|text|x:1234", "default-hostname") + assert.NoError(t, err) +} + +/* +func TestEventMetadataTimestamp(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|d:21"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(21), e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "default-hostname", e.tags["source"]) + assert.Equal(t, []string(nil), e.tags) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "", e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataPriority(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|p:low"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, metrics.EventPriorityLow, e.fields["priority"]) + assert.Equal(t, "default-hostname", e.tags["source"]) + assert.Equal(t, []string(nil), e.tags) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "", e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataHostname(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|h:localhost"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "localhost", e.tags["source"]) + assert.Equal(t, []string(nil), e.tags) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "", e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataHostnameInTag(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#host:localhost"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "localhost", e.tags["source"]) + assert.Equal(t, []string{}, e.tags) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "", e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataEmptyHostTag(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#host:,other:tag"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "", e.tags["source"]) + assert.Equal(t, []string{"other:tag"}, e.tags) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "", e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataAlertType(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|t:warning"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "default-hostname", e.tags["source"]) + assert.Equal(t, []string(nil), e.tags) + assert.Equal(t, metrics.EventAlertTypeWarning, e.fields["alert-type"]) + assert.Equal(t, "", e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataAggregatioKey(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|k:some aggregation key"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "default-hostname", e.tags["source"]) + assert.Equal(t, []string(nil), e.tags) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "some aggregation key", e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataSourceType(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|s:this is the source"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "default-hostname", e.tags["source"]) + assert.Equal(t, []string(nil), e.tags) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "", e.fields["aggregation-key"]) + assert.Equal(t, "this is the source", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataTags(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#tag1,tag2:test"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, "normal", e.fields["priority"]) + assert.Equal(t, "default-hostname", e.tags["source"]) + assert.Equal(t, []string{"tag1", "tag2:test"}, e.tags) + assert.Equal(t, "info", e.fields["alert-type"]) + assert.Equal(t, "", e.fields["aggregation-key"]) + assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataMultiple(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|t:warning|d:12345|p:low|h:some.host|k:aggKey|s:source test|#tag1,tag2:test"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.name) + assert.Equal(t, "test text", e.fields["text"]) + assert.Equal(t, int64(12345), e.fields["ts"]) + assert.Equal(t, metrics.EventPriorityLow, e.fields["priority"]) + assert.Equal(t, "some.host", e.tags["source"]) + assert.Equal(t, []string{"tag1", "tag2:test"}, e.tags) + assert.Equal(t, metrics.EventAlertTypeWarning, e.fields["alert-type"]) + assert.Equal(t, "aggKey", e.fields["aggregation-key"]) + assert.Equal(t, "source test", e.tags["source-type-name"]) + assert.Equal(t, "", e.EventType) +} +*/ diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index b3f888cb6a2c3..e0bdaecf8d613 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -310,7 +310,6 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { if s.DeleteSets { s.sets = make(map[string]cachedset) } - return nil } @@ -495,7 +494,7 @@ func (s *Statsd) parser() error { switch { case line == "": case s.ParseDataDogEvents && len(line) > 2 && line[:2] == "_e": - s.parseDataDogEventMessage(time.Now(), line, in.Addr.String()) + s.parseEventMessage(time.Now(), line, in.Addr.String()) default: s.parseStatsdLine(line) } @@ -628,7 +627,6 @@ func (s *Statsd) parseStatsdLine(line string) error { case "h": m.tags["metric_type"] = "histogram" } - fmt.Println("here: name:", m.name, lineTags) if len(lineTags) > 0 { for k, v := range lineTags { m.tags[k] = v diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index b99b4d51748f8..efac25520e2b9 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -188,7 +188,7 @@ func BenchmarkTCP(b *testing.B) { // Valid lines should be parsed and their values should be cached func TestParse_ValidLines(t *testing.T) { s := NewTestStatsd() - valid_lines := []string{ + validLines := []string{ "valid:45|c", "valid:45|s", "valid:45|g", @@ -196,7 +196,7 @@ func TestParse_ValidLines(t *testing.T) { "valid.timer:45|h", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -209,7 +209,7 @@ func TestParse_Gauges(t *testing.T) { s := NewTestStatsd() // Test that gauge +- values work - valid_lines := []string{ + validLines := []string{ "plus.minus:100|g", "plus.minus:-10|g", "plus.minus:+30|g", @@ -227,7 +227,7 @@ func TestParse_Gauges(t *testing.T) { "scientific.notation.minus:4.7E-5|g", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -285,7 +285,7 @@ func TestParse_Sets(t *testing.T) { s := NewTestStatsd() // Test that sets work - valid_lines := []string{ + validLines := []string{ "unique.user.ids:100|s", "unique.user.ids:100|s", "unique.user.ids:100|s", @@ -305,7 +305,7 @@ func TestParse_Sets(t *testing.T) { "string.sets:bar|s", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -347,7 +347,7 @@ func TestParse_Counters(t *testing.T) { s := NewTestStatsd() // Test that counters work - valid_lines := []string{ + validLines := []string{ "small.inc:1|c", "big.inc:100|c", "big.inc:1|c", @@ -362,7 +362,7 @@ func TestParse_Counters(t *testing.T) { "negative.test:-5|c", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -414,7 +414,7 @@ func TestParse_Timings(t *testing.T) { acc := &testutil.Accumulator{} // Test that counters work - valid_lines := []string{ + validLines := []string{ "test.timing:1|ms", "test.timing:11|ms", "test.timing:1|ms", @@ -422,7 +422,7 @@ func TestParse_Timings(t *testing.T) { "test.timing:1|ms", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) From 7d46c8228582839d3a65bce42481498b16c31ec4 Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Thu, 2 May 2019 13:50:49 -0500 Subject: [PATCH 04/13] WIP --- plugins/inputs/statsd/datadog.go | 28 +++-- plugins/inputs/statsd/datadog_test.go | 152 ++++++++++++++------------ 2 files changed, 101 insertions(+), 79 deletions(-) diff --git a/plugins/inputs/statsd/datadog.go b/plugins/inputs/statsd/datadog.go index a363e65b31cef..93cedce8c0dfe 100644 --- a/plugins/inputs/statsd/datadog.go +++ b/plugins/inputs/statsd/datadog.go @@ -87,7 +87,7 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam if len(rawMetadataFields[i]) < 2 { log.Printf("W! [inputs.statsd] too short metadata field") } - switch rawMetadataFields[i] { + switch rawMetadataFields[i][:2] { case "d:": ts, err := strconv.ParseInt(rawMetadataFields[i][2:], 10, 64) if err != nil { @@ -124,13 +124,21 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam m.tags["aggregation-key"] = rawMetadataFields[i][2:] case "s:": m.fields["source-type-name"] = rawMetadataFields[i][2:] - case "#": - parseDataDogTags(m.tags, rawMetadataFields[i][2:]) default: - log.Printf("W! [inputs.statsd] unknown metadata type: '%s'", rawMetadataFields[i]) + if rawMetadataFields[i][0] == '#' { + parseDataDogTags(m.tags, rawMetadataFields[i][1:]) + } else { + log.Printf("W! [inputs.statsd] unknown metadata type: '%s'", rawMetadataFields[i]) + } } } } + // host is a magic tag in the system, and it expects it to replace the result of h: if it is present + // telegraf will add a"host" tag anyway with different meaning than dogstatsd, so we need to switch these out + if host, ok := m.tags["host"]; ok { + delete(m.tags, "host") + m.tags["source"] = host + } s.Lock() s.events = append(s.events, m) s.Unlock() @@ -139,17 +147,19 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam func parseDataDogTags(tags map[string]string, message string) { start, i := 0, 0 - var k, v string + var k string var inVal bool // check if we are parsing the value part of the tag for i = range message { if message[i] == ',' { - v = message[start:i] - start = i + 1 if k == "" { + k = message[start:i] + tags[k] = "" + start = i + 1 continue } - tags[k] = v - k, v, inVal = "", "", false // reset state vars + tags[k] = message[start:i] + start = i + 1 + k, inVal = "", false // reset state vars } else if message[i] == ':' && !inVal { k = message[start:i] start = i + 1 diff --git a/plugins/inputs/statsd/datadog_test.go b/plugins/inputs/statsd/datadog_test.go index 34570dee0cb85..bd50f8fba4a10 100644 --- a/plugins/inputs/statsd/datadog_test.go +++ b/plugins/inputs/statsd/datadog_test.go @@ -17,13 +17,13 @@ func TestEventMinimal(t *testing.T) { assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, now, e.fields["ts"]) + assert.Equal(t, now, e.ts) assert.Equal(t, nil, e.fields["ts"]) assert.Equal(t, priorityNormal, e.fields["priority"]) assert.Equal(t, "default-hostname", e.tags["source"]) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, nil, e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) } func TestEventMultilinesText(t *testing.T) { @@ -38,10 +38,10 @@ func TestEventMultilinesText(t *testing.T) { assert.Equal(t, nil, e.fields["ts"]) assert.Equal(t, "normal", e.fields["priority"]) assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, []string(nil), e.tags) + assert.Len(t, e.tags, 1) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) } func TestEventPipeInTitle(t *testing.T) { @@ -129,164 +129,176 @@ func TestEventError(t *testing.T) { assert.NoError(t, err) } -/* func TestEventMetadataTimestamp(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|d:21"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|d:21", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) assert.Equal(t, int64(21), e.fields["ts"]) assert.Equal(t, "normal", e.fields["priority"]) assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, []string(nil), e.tags) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) } func TestEventMetadataPriority(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|p:low"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|p:low", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(0), e.fields["ts"]) - assert.Equal(t, metrics.EventPriorityLow, e.fields["priority"]) + assert.Equal(t, nil, e.fields["ts"]) + assert.Equal(t, "low", e.fields["priority"]) assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, []string(nil), e.tags) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) } func TestEventMetadataHostname(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|h:localhost"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|h:localhost", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, nil, e.fields["ts"]) assert.Equal(t, "normal", e.fields["priority"]) assert.Equal(t, "localhost", e.tags["source"]) - assert.Equal(t, []string(nil), e.tags) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) } func TestEventMetadataHostnameInTag(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#host:localhost"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|#host:localhost", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, nil, e.fields["ts"]) assert.Equal(t, "normal", e.fields["priority"]) assert.Equal(t, "localhost", e.tags["source"]) - assert.Equal(t, []string{}, e.tags) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) } func TestEventMetadataEmptyHostTag(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#host:,other:tag"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|#host:,other:tag", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, nil, e.fields["ts"]) assert.Equal(t, "normal", e.fields["priority"]) assert.Equal(t, "", e.tags["source"]) - assert.Equal(t, []string{"other:tag"}, e.tags) + assert.Equal(t, map[string]string{"other": "tag", "source": ""}, e.tags) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) } func TestEventMetadataAlertType(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|t:warning"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|t:warning", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, nil, e.fields["ts"]) assert.Equal(t, "normal", e.fields["priority"]) assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, []string(nil), e.tags) - assert.Equal(t, metrics.EventAlertTypeWarning, e.fields["alert-type"]) - assert.Equal(t, "", e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, "warning", e.fields["alert-type"]) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) + } func TestEventMetadataAggregatioKey(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|k:some aggregation key"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|k:some aggregation key", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, nil, e.fields["ts"]) assert.Equal(t, "normal", e.fields["priority"]) assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, []string(nil), e.tags) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "some aggregation key", e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, "some aggregation key", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) } func TestEventMetadataSourceType(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|s:this is the source"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|s:this is the source", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, nil, e.fields["ts"]) assert.Equal(t, "normal", e.fields["priority"]) assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, []string(nil), e.tags) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.fields["aggregation-key"]) - assert.Equal(t, "this is the source", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, "this is the source", e.fields["source-type-name"]) } func TestEventMetadataTags(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#tag1,tag2:test"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|#tag1,tag2:test", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(0), e.fields["ts"]) + assert.Equal(t, nil, e.fields["ts"]) assert.Equal(t, "normal", e.fields["priority"]) assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, []string{"tag1", "tag2:test"}, e.tags) + assert.Equal(t, map[string]string{"tag1": "", "tag2": "test", "source": "default-hostname"}, e.tags) assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.fields["aggregation-key"]) - assert.Equal(t, "", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, "", e.tags["aggregation-key"]) + assert.Equal(t, nil, e.fields["source-type-name"]) } func TestEventMetadataMultiple(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|t:warning|d:12345|p:low|h:some.host|k:aggKey|s:source test|#tag1,tag2:test"), "default-hostname") + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text|t:warning|d:12345|p:low|h:some.host|k:aggKey|s:source test|#tag1,tag2:test", "default-hostname") require.Nil(t, err) + e := s.events[0] assert.Equal(t, "test title", e.name) assert.Equal(t, "test text", e.fields["text"]) assert.Equal(t, int64(12345), e.fields["ts"]) - assert.Equal(t, metrics.EventPriorityLow, e.fields["priority"]) + assert.Equal(t, "low", e.fields["priority"]) assert.Equal(t, "some.host", e.tags["source"]) - assert.Equal(t, []string{"tag1", "tag2:test"}, e.tags) - assert.Equal(t, metrics.EventAlertTypeWarning, e.fields["alert-type"]) - assert.Equal(t, "aggKey", e.fields["aggregation-key"]) - assert.Equal(t, "source test", e.tags["source-type-name"]) - assert.Equal(t, "", e.EventType) + assert.Equal(t, map[string]string{"aggregation-key": "aggKey", "tag1": "", "tag2": "test", "source": "some.host"}, e.tags) + assert.Equal(t, "warning", e.fields["alert-type"]) + assert.Equal(t, "aggKey", e.tags["aggregation-key"]) + assert.Equal(t, "source test", e.fields["source-type-name"]) } -*/ From 54c6ecdb6db491ddd7ca0e34a8efddedccbdfa51 Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Fri, 3 May 2019 21:12:46 -0500 Subject: [PATCH 05/13] WIP --- plugins/inputs/statsd/datadog_test.go | 49 ++++++++ plugins/inputs/statsd/statsd.go | 12 +- plugins/inputs/statsd/statsd_test.go | 172 ++++++++++++-------------- 3 files changed, 140 insertions(+), 93 deletions(-) diff --git a/plugins/inputs/statsd/datadog_test.go b/plugins/inputs/statsd/datadog_test.go index bd50f8fba4a10..112e1cee594e8 100644 --- a/plugins/inputs/statsd/datadog_test.go +++ b/plugins/inputs/statsd/datadog_test.go @@ -4,10 +4,59 @@ import ( "testing" "time" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestEventGather(t *testing.T) { + acc := &testutil.Accumulator{} + + now := time.Now() + s := NewTestStatsd() + err := s.parseEventMessage(now, "_e{10,9}:test title|test text", "default-hostname") + require.Nil(t, err) + err = s.parseEventMessage(now.Add(1), "_e{10,24}:test title|test\\line1\\nline2\\nline3", "default-hostname") + require.Nil(t, err) + err = s.parseEventMessage(now.Add(2), "_e{10,9}:test title|test text|d:21", "default-hostname") + require.Nil(t, err) + + err = s.Gather(acc) + require.Nil(t, err) + + assert.Equal(t, acc.NMetrics(), uint64(3)) + + assert.Equal(t, "test title", acc.Metrics[0].Measurement) + assert.Equal(t, "test title", acc.Metrics[1].Measurement) + assert.Equal(t, "test title", acc.Metrics[2].Measurement) + + assert.Equal(t, map[string]string{"source": "default-hostname"}, acc.Metrics[0].Tags) + assert.Equal(t, map[string]string{"source": "default-hostname"}, acc.Metrics[1].Tags) + assert.Equal(t, map[string]string{"source": "default-hostname"}, acc.Metrics[2].Tags) + + assert.Equal(t, + map[string]interface{}{ + "priority": priorityNormal, + "alert-type": "info", + "text": "test text", + }, + acc.Metrics[0].Fields) + assert.Equal(t, map[string]interface{}{ + "priority": priorityNormal, + "alert-type": "info", + "text": "test\\line1\nline2\nline3", + }, acc.Metrics[1].Fields) + assert.Equal(t, map[string]interface{}{ + "priority": priorityNormal, + "alert-type": "info", + "text": "test text", + "ts": int64(21), + }, acc.Metrics[2].Fields) +} + +// These tests adapted from tests in +// https://github.com/DataDog/datadog-agent/blob/master/pkg/dogstatsd/parser_test.go +// to ensure compatibility with the datadog-agent parser func TestEventMinimal(t *testing.T) { now := time.Now() s := NewTestStatsd() diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index e0bdaecf8d613..8984b1b35833c 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -21,7 +21,7 @@ import ( ) const ( - // UDP packet limit, see + // UDP_MAX_PACKET_SIZE is the UDP packet limit, see // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure UDP_MAX_PACKET_SIZE int = 64 * 1024 @@ -41,6 +41,7 @@ var dropwarn = "E! Error: statsd message queue full. " + var malformedwarn = "E! Statsd over TCP has received %d malformed packets" + " thus far." +// Statsd allows the importing of statsd and dogstatsd data. type Statsd struct { // Protocol used on listener - udp or tcp Protocol string `toml:"protocol"` @@ -61,6 +62,7 @@ type Statsd struct { DeleteCounters bool DeleteSets bool DeleteTimings bool + DeleteEvents bool ConvertNames bool // MetricSeparator is the separator between parts of the metric name. @@ -310,6 +312,13 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { if s.DeleteSets { s.sets = make(map[string]cachedset) } + for _, e := range s.events { + acc.AddFields(e.name, e.fields, e.tags, e.ts) + } + if s.DeleteEvents { + s.events = s.events[:0] + } + return nil } @@ -918,6 +927,7 @@ func init() { DeleteGauges: true, DeleteSets: true, DeleteTimings: true, + DeleteEvents: true, } }) } diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index efac25520e2b9..c297f590d4a75 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -273,7 +273,7 @@ func TestParse_Gauges(t *testing.T) { } for _, test := range validations { - err := test_validate_gauge(test.name, test.value, s.gauges) + err := testValidateGauge(test.name, test.value, s.gauges) if err != nil { t.Error(err.Error()) } @@ -335,7 +335,7 @@ func TestParse_Sets(t *testing.T) { } for _, test := range validations { - err := test_validate_set(test.name, test.value, s.sets) + err := testValidateSet(test.name, test.value, s.sets) if err != nil { t.Error(err.Error()) } @@ -400,7 +400,7 @@ func TestParse_Counters(t *testing.T) { } for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -463,7 +463,7 @@ func TestParseScientificNotation(t *testing.T) { // Invalid lines should return an error func TestParse_InvalidLines(t *testing.T) { s := NewTestStatsd() - invalid_lines := []string{ + invalidLines := []string{ "i.dont.have.a.pipe:45g", "i.dont.have.a.colon45|c", "invalid.metric.type:45|e", @@ -474,7 +474,7 @@ func TestParse_InvalidLines(t *testing.T) { "invalid.value:d11|c", "invalid.value:1d1|c", } - for _, line := range invalid_lines { + for _, line := range invalidLines { err := s.parseStatsdLine(line) if err == nil { t.Errorf("Parsing line %s should have resulted in an error\n", line) @@ -485,21 +485,21 @@ func TestParse_InvalidLines(t *testing.T) { // Invalid sample rates should be ignored and not applied func TestParse_InvalidSampleRate(t *testing.T) { s := NewTestStatsd() - invalid_lines := []string{ + invalidLines := []string{ "invalid.sample.rate:45|c|0.1", "invalid.sample.rate.2:45|c|@foo", "invalid.sample.rate:45|g|@0.1", "invalid.sample.rate:45|s|@0.1", } - for _, line := range invalid_lines { + for _, line := range invalidLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } } - counter_validations := []struct { + counterValidations := []struct { name string value int64 cache map[string]cachedcounter @@ -516,19 +516,19 @@ func TestParse_InvalidSampleRate(t *testing.T) { }, } - for _, test := range counter_validations { - err := test_validate_counter(test.name, test.value, test.cache) + for _, test := range counterValidations { + err := testValidateCounter(test.name, test.value, test.cache) if err != nil { t.Error(err.Error()) } } - err := test_validate_gauge("invalid_sample_rate", 45, s.gauges) + err := testValidateGauge("invalid_sample_rate", 45, s.gauges) if err != nil { t.Error(err.Error()) } - err = test_validate_set("invalid_sample_rate", 1, s.sets) + err = testValidateSet("invalid_sample_rate", 1, s.sets) if err != nil { t.Error(err.Error()) } @@ -537,12 +537,12 @@ func TestParse_InvalidSampleRate(t *testing.T) { // Names should be parsed like . -> _ func TestParse_DefaultNameParsing(t *testing.T) { s := NewTestStatsd() - valid_lines := []string{ + validLines := []string{ "valid:1|c", "valid.foo-bar:11|c", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -564,7 +564,7 @@ func TestParse_DefaultNameParsing(t *testing.T) { } for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -606,7 +606,7 @@ func TestParse_Template(t *testing.T) { // Validate counters for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -648,7 +648,7 @@ func TestParse_TemplateFilter(t *testing.T) { // Validate counters for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -686,7 +686,7 @@ func TestParse_TemplateSpecificity(t *testing.T) { // Validate counters for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -722,7 +722,7 @@ func TestParse_TemplateFields(t *testing.T) { } } - counter_tests := []struct { + counterTests := []struct { name string value int64 field string @@ -744,14 +744,14 @@ func TestParse_TemplateFields(t *testing.T) { }, } // Validate counters - for _, test := range counter_tests { - err := test_validate_counter(test.name, test.value, s.counters, test.field) + for _, test := range counterTests { + err := testValidateCounter(test.name, test.value, s.counters, test.field) if err != nil { t.Error(err.Error()) } } - gauge_tests := []struct { + gaugeTests := []struct { name string value float64 field string @@ -768,14 +768,14 @@ func TestParse_TemplateFields(t *testing.T) { }, } // Validate gauges - for _, test := range gauge_tests { - err := test_validate_gauge(test.name, test.value, s.gauges, test.field) + for _, test := range gaugeTests { + err := testValidateGauge(test.name, test.value, s.gauges, test.field) if err != nil { t.Error(err.Error()) } } - set_tests := []struct { + setTests := []struct { name string value int64 field string @@ -792,8 +792,8 @@ func TestParse_TemplateFields(t *testing.T) { }, } // Validate sets - for _, test := range set_tests { - err := test_validate_set(test.name, test.value, s.sets, test.field) + for _, test := range setTests { + err := testValidateSet(test.name, test.value, s.sets, test.field) if err != nil { t.Error(err.Error()) } @@ -917,16 +917,6 @@ func TestParse_DataDogTags(t *testing.T) { } } } - - // for statName, tags := range expectedTags { - // for expectedK, expectedV := range tags { - // otherValue := sourceTags[statName][k] - // if sourceTags[statName][expectedK] != v { - // t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue) - // fmt.Print(sourceTags[statName]) - // } - // } - // } } func tagsForItem(m interface{}) map[string]string { @@ -956,8 +946,8 @@ func TestParseName(t *testing.T) { s := NewTestStatsd() tests := []struct { - in_name string - out_name string + inName string + outName string }{ { "foobar", @@ -974,9 +964,9 @@ func TestParseName(t *testing.T) { } for _, test := range tests { - name, _, _ := s.parseName(test.in_name) - if name != test.out_name { - t.Errorf("Expected: %s, got %s", test.out_name, name) + name, _, _ := s.parseName(test.inName) + if name != test.outName { + t.Errorf("Expected: %s, got %s", test.outName, name) } } @@ -984,8 +974,8 @@ func TestParseName(t *testing.T) { s.MetricSeparator = "." tests = []struct { - in_name string - out_name string + inName string + outName string }{ { "foobar", @@ -1002,9 +992,9 @@ func TestParseName(t *testing.T) { } for _, test := range tests { - name, _, _ := s.parseName(test.in_name) - if name != test.out_name { - t.Errorf("Expected: %s, got %s", test.out_name, name) + name, _, _ := s.parseName(test.inName) + if name != test.outName { + t.Errorf("Expected: %s, got %s", test.outName, name) } } } @@ -1015,12 +1005,12 @@ func TestParse_MeasurementsWithSameName(t *testing.T) { s := NewTestStatsd() // Test that counters work - valid_lines := []string{ + validLines := []string{ "test.counter,host=localhost:1|c", "test.counter,host=localhost,region=west:1|c", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -1035,7 +1025,7 @@ func TestParse_MeasurementsWithSameName(t *testing.T) { // Test that measurements with multiple bits, are treated as different outputs // but are equal to their single-measurement representation func TestParse_MeasurementsWithMultipleValues(t *testing.T) { - single_lines := []string{ + singleLines := []string{ "valid.multiple:0|ms|@0.1", "valid.multiple:0|ms|", "valid.multiple:1|ms", @@ -1061,7 +1051,7 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { "valid.multiple.mixed:1|g", } - multiple_lines := []string{ + multipleLines := []string{ "valid.multiple:0|ms|@0.1:0|ms|:1|ms", "valid.multiple.duplicate:1|c:1|c:2|c:1|c", "valid.multiple.duplicate:1|h:1|h:2|h:1|h", @@ -1070,28 +1060,28 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { "valid.multiple.mixed:1|c:1|ms:2|s:1|g", } - s_single := NewTestStatsd() - s_multiple := NewTestStatsd() + sSingle := NewTestStatsd() + sMultiple := NewTestStatsd() - for _, line := range single_lines { - err := s_single.parseStatsdLine(line) + for _, line := range singleLines { + err := sSingle.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } } - for _, line := range multiple_lines { - err := s_multiple.parseStatsdLine(line) + for _, line := range multipleLines { + err := sMultiple.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } } - if len(s_single.timings) != 3 { - t.Errorf("Expected 3 measurement, found %d", len(s_single.timings)) + if len(sSingle.timings) != 3 { + t.Errorf("Expected 3 measurement, found %d", len(sSingle.timings)) } - if cachedtiming, ok := s_single.timings["metric_type=timingvalid_multiple"]; !ok { + if cachedtiming, ok := sSingle.timings["metric_type=timingvalid_multiple"]; !ok { t.Errorf("Expected cached measurement with hash 'metric_type=timingvalid_multiple' not found") } else { if cachedtiming.name != "valid_multiple" { @@ -1111,60 +1101,60 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { } } - // test if s_single and s_multiple did compute the same stats for valid.multiple.duplicate - if err := test_validate_set("valid_multiple_duplicate", 2, s_single.sets); err != nil { + // test if sSingle and sMultiple did compute the same stats for valid.multiple.duplicate + if err := testValidateSet("valid_multiple_duplicate", 2, sSingle.sets); err != nil { t.Error(err.Error()) } - if err := test_validate_set("valid_multiple_duplicate", 2, s_multiple.sets); err != nil { + if err := testValidateSet("valid_multiple_duplicate", 2, sMultiple.sets); err != nil { t.Error(err.Error()) } - if err := test_validate_counter("valid_multiple_duplicate", 5, s_single.counters); err != nil { + if err := testValidateCounter("valid_multiple_duplicate", 5, sSingle.counters); err != nil { t.Error(err.Error()) } - if err := test_validate_counter("valid_multiple_duplicate", 5, s_multiple.counters); err != nil { + if err := testValidateCounter("valid_multiple_duplicate", 5, sMultiple.counters); err != nil { t.Error(err.Error()) } - if err := test_validate_gauge("valid_multiple_duplicate", 1, s_single.gauges); err != nil { + if err := testValidateGauge("valid_multiple_duplicate", 1, sSingle.gauges); err != nil { t.Error(err.Error()) } - if err := test_validate_gauge("valid_multiple_duplicate", 1, s_multiple.gauges); err != nil { + if err := testValidateGauge("valid_multiple_duplicate", 1, sMultiple.gauges); err != nil { t.Error(err.Error()) } - // test if s_single and s_multiple did compute the same stats for valid.multiple.mixed - if err := test_validate_set("valid_multiple_mixed", 1, s_single.sets); err != nil { + // test if sSingle and sMultiple did compute the same stats for valid.multiple.mixed + if err := testValidateSet("valid_multiple_mixed", 1, sSingle.sets); err != nil { t.Error(err.Error()) } - if err := test_validate_set("valid_multiple_mixed", 1, s_multiple.sets); err != nil { + if err := testValidateSet("valid_multiple_mixed", 1, sMultiple.sets); err != nil { t.Error(err.Error()) } - if err := test_validate_counter("valid_multiple_mixed", 1, s_single.counters); err != nil { + if err := testValidateCounter("valid_multiple_mixed", 1, sSingle.counters); err != nil { t.Error(err.Error()) } - if err := test_validate_counter("valid_multiple_mixed", 1, s_multiple.counters); err != nil { + if err := testValidateCounter("valid_multiple_mixed", 1, sMultiple.counters); err != nil { t.Error(err.Error()) } - if err := test_validate_gauge("valid_multiple_mixed", 1, s_single.gauges); err != nil { + if err := testValidateGauge("valid_multiple_mixed", 1, sSingle.gauges); err != nil { t.Error(err.Error()) } - if err := test_validate_gauge("valid_multiple_mixed", 1, s_multiple.gauges); err != nil { + if err := testValidateGauge("valid_multiple_mixed", 1, sMultiple.gauges); err != nil { t.Error(err.Error()) } } // Tests low-level functionality of timings when multiple fields is enabled // and a measurement template has been defined which can parse field names -func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) { +func TestParse_TimingsMultipleFieldsWithTemplate(t *testing.T) { s := NewTestStatsd() s.Templates = []string{"measurement.field"} s.Percentiles = []int{90} @@ -1215,7 +1205,7 @@ func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) { // Tests low-level functionality of timings when multiple fields is enabled // but a measurement template hasn't been defined so we can't parse field names // In this case the behaviour should be the same as normal behaviour -func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) { +func TestParse_TimingsMultipleFieldsWithoutTemplate(t *testing.T) { s := NewTestStatsd() s.Templates = []string{} s.Percentiles = []int{90} @@ -1431,14 +1421,14 @@ func TestParse_Gauges_Delete(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } - err = test_validate_gauge("current_users", 100, s.gauges) + err = testValidateGauge("current_users", 100, s.gauges) if err != nil { t.Error(err.Error()) } s.Gather(fakeacc) - err = test_validate_gauge("current_users", 100, s.gauges) + err = testValidateGauge("current_users", 100, s.gauges) if err == nil { t.Error("current_users_gauge metric should have been deleted") } @@ -1457,14 +1447,14 @@ func TestParse_Sets_Delete(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } - err = test_validate_set("unique_user_ids", 1, s.sets) + err = testValidateSet("unique_user_ids", 1, s.sets) if err != nil { t.Error(err.Error()) } s.Gather(fakeacc) - err = test_validate_set("unique_user_ids", 1, s.sets) + err = testValidateSet("unique_user_ids", 1, s.sets) if err == nil { t.Error("unique_user_ids_set metric should have been deleted") } @@ -1483,14 +1473,14 @@ func TestParse_Counters_Delete(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } - err = test_validate_counter("total_users", 100, s.counters) + err = testValidateCounter("total_users", 100, s.counters) if err != nil { t.Error(err.Error()) } s.Gather(fakeacc) - err = test_validate_counter("total_users", 100, s.counters) + err = testValidateCounter("total_users", 100, s.counters) if err == nil { t.Error("total_users_counter metric should have been deleted") } @@ -1516,7 +1506,7 @@ func TestParseKeyValue(t *testing.T) { // Test utility functions -func test_validate_set( +func testValidateSet( name string, value int64, cache map[string]cachedset, @@ -1538,17 +1528,16 @@ func test_validate_set( } } if !found { - return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) + return fmt.Errorf("test Error: Metric name %s not found", name) } if value != int64(len(metric.fields[f])) { - return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", - name, value, len(metric.fields[f]))) + return fmt.Errorf("measurement: %s, expected %d, actual %d", name, value, len(metric.fields[f])) } return nil } -func test_validate_counter( +func testValidateCounter( name string, valueExpected int64, cache map[string]cachedcounter, @@ -1570,17 +1559,16 @@ func test_validate_counter( } } if !found { - return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) + return fmt.Errorf("test Error: Metric name %s not found", name) } if valueExpected != valueActual { - return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", - name, valueExpected, valueActual)) + return fmt.Errorf("measurement: %s, expected %d, actual %d", name, valueExpected, valueActual) } return nil } -func test_validate_gauge( +func testValidateGauge( name string, valueExpected float64, cache map[string]cachedgauge, @@ -1602,7 +1590,7 @@ func test_validate_gauge( } } if !found { - return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) + return fmt.Errorf("Test Error: Metric name %s not found\n", name) } if valueExpected != valueActual { From dc4fe8ac165ea27c2f58e9f1cc108a48b849dfd9 Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Mon, 6 May 2019 12:22:07 -0500 Subject: [PATCH 06/13] config and readme --- etc/telegraf.conf | 1 + plugins/inputs/statsd/README.md | 2 ++ plugins/inputs/statsd/statsd.go | 1 + 3 files changed, 4 insertions(+) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index c386d171f9ed1..cd5b83c6a9456 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -5017,6 +5017,7 @@ # ## Parses tags in the datadog statsd format # ## http://docs.datadoghq.com/guides/dogstatsd/ # parse_data_dog_tags = false +# parse_data_dog_events = false # # ## Statsd data translation templates, more info can be read here: # ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index c1093bf397b10..b0df29c9db028 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -43,6 +43,7 @@ ## Parses tags in the datadog statsd format ## http://docs.datadoghq.com/guides/dogstatsd/ parse_data_dog_tags = false + parse_data_dog_events = false ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md @@ -185,6 +186,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time. - **templates** []string: Templates for transforming statsd buckets into influx measurements and tags. - **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) +- **parse_data_dog_events** boolean: Enable parsing of events in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) ### Statsd bucket -> InfluxDB line-protocol Templates diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 8984b1b35833c..ba737f3a0419c 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -234,6 +234,7 @@ const sampleConfig = ` ## Parses tags in the datadog statsd format ## http://docs.datadoghq.com/guides/dogstatsd/ parse_data_dog_tags = false + parse_data_dog_events = false ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md From 6b0bc6c995085d54e57767091a2b8eb7d2754521 Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Wed, 8 May 2019 18:46:54 -0500 Subject: [PATCH 07/13] WIP --- docs/LICENSE_OF_DEPENDENCIES.md | 3 + etc/telegraf.conf | 1 - plugins/inputs/statsd/README.md | 9 +- plugins/inputs/statsd/datadog.go | 132 ++--- plugins/inputs/statsd/datadog_test.go | 683 +++++++++++++++----------- plugins/inputs/statsd/statsd.go | 58 +-- plugins/inputs/statsd/statsd_test.go | 12 +- 7 files changed, 520 insertions(+), 378 deletions(-) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 5b6faf4c9eb6f..e0531210ec237 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -135,3 +135,6 @@ following works: - gopkg.in/olivere/elastic.v5 [MIT License](https://github.com/olivere/elastic/blob/v5.0.76/LICENSE) - gopkg.in/tomb.v1 [BSD 3-Clause Clear License](https://github.com/go-tomb/tomb/blob/v1/LICENSE) - gopkg.in/yaml.v2 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v2.2.2/LICENSE) + +## telegraf used and modified code from these projects +- github.com/DataDog/datadog-agent [Apache License 2.0](https://github.com/DataDog/datadog-agent/LICENSE) \ No newline at end of file diff --git a/etc/telegraf.conf b/etc/telegraf.conf index cd5b83c6a9456..c386d171f9ed1 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -5017,7 +5017,6 @@ # ## Parses tags in the datadog statsd format # ## http://docs.datadoghq.com/guides/dogstatsd/ # parse_data_dog_tags = false -# parse_data_dog_events = false # # ## Statsd data translation templates, more info can be read here: # ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index b0df29c9db028..4b632af618322 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -34,6 +34,7 @@ ## Reset timings & histograms every interval (default=true) delete_timings = true + ## Percentiles to calculate for timing & histogram stats percentiles = [90] @@ -42,8 +43,13 @@ ## Parses tags in the datadog statsd format ## http://docs.datadoghq.com/guides/dogstatsd/ + ## deprecated in 1.10; use datadog_extensions option instead parse_data_dog_tags = false - parse_data_dog_events = false + + ## Parses extensions to statsd in the datadog statsd format + ## currently supports metrics and datadog tags. + ## http://docs.datadoghq.com/guides/dogstatsd/ + datadog_extensions = false ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md @@ -177,6 +183,7 @@ to allow. Used when protocol is set to tcp. - **delete_counters** boolean: Delete counters on every collection interval - **delete_sets** boolean: Delete set counters on every collection interval - **delete_timings** boolean: Delete timings on every collection interval + - **percentiles** []int: Percentiles to calculate for timing & histogram stats - **allowed_pending_messages** integer: Number of messages allowed to queue up waiting to be processed. When this fills, messages will be dropped and logged. diff --git a/plugins/inputs/statsd/datadog.go b/plugins/inputs/statsd/datadog.go index 93cedce8c0dfe..f7da285c5cd98 100644 --- a/plugins/inputs/statsd/datadog.go +++ b/plugins/inputs/statsd/datadog.go @@ -6,11 +6,18 @@ import ( "strconv" "strings" "time" + + tmetric "github.com/influxdata/telegraf/metric" ) const ( priorityNormal = "normal" priorityLow = "low" + + eventInfo = "info" + eventWarning = "warning" + eventError = "error" + eventSuccess = "success" ) var uncommenter = strings.NewReplacer("\\n", "\n") @@ -66,82 +73,78 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam // Handle hostname, with a priority to the h: field, then the host: // tag and finally the defaultHostname value // Metadata - m := cachedEvent{ + m := event{ name: rawTitle, } m.tags = make(map[string]string, strings.Count(message, ",")+2) // allocate for the approximate number of tags m.fields = make(map[string]interface{}, 9) - m.fields["alert-type"] = "info" // default event type + m.fields["alert_type"] = eventInfo // default event type m.fields["text"] = uncommenter.Replace(string(rawText)) + // host is a magic tag in the system, and it expects it to replace the result of h: if it is present + // telegraf will add a"host" tag anyway with different meaning than dogstatsd, so we need to use source instead of host. + m.tags["source"] = defaultHostname m.fields["priority"] = priorityNormal m.ts = now - if len(message) == 0 { - s.events = append(s.events, m) + if len(message) < 2 { + newM, err := tmetric.New(m.name, m.tags, m.fields, m.ts) + if err != nil { + return err + } + s.acc.AddMetric(newM) return nil } - if len(message) > 1 { - rawMetadataFields := strings.Split(message[1:], "|") - for i := range rawMetadataFields { - if len(rawMetadataFields[i]) < 2 { - log.Printf("W! [inputs.statsd] too short metadata field") + rawMetadataFields := strings.Split(message[1:], "|") + for i := range rawMetadataFields { + if len(rawMetadataFields[i]) < 2 { + log.Printf("W! [inputs.statsd] too short metadata field") + } + switch rawMetadataFields[i][:2] { + case "d:": + ts, err := strconv.ParseInt(rawMetadataFields[i][2:], 10, 64) + if err != nil { + continue + } + m.fields["ts"] = ts + case "p:": + switch rawMetadataFields[i][2:] { + case priorityLow: + m.fields["priority"] = priorityLow + case priorityNormal: // we already used this as a default + default: + continue } - switch rawMetadataFields[i][:2] { - case "d:": - ts, err := strconv.ParseInt(rawMetadataFields[i][2:], 10, 64) - if err != nil { - log.Printf("W! [inputs.statsd] skipping timestamp: %s", err) - continue - } - m.fields["ts"] = ts - case "p:": - switch rawMetadataFields[i][2:] { - case priorityLow: - m.fields["priority"] = priorityLow - case priorityNormal: // we already used this as a default - default: - log.Printf("W! [inputs.statsd] skipping priority") - continue - } - case "h:": - m.tags["source"] = rawMetadataFields[i][2:] - case "t:": - switch rawMetadataFields[i][2:] { - case "error": - m.fields["alert-type"] = "error" - case "warning": - m.fields["alert-type"] = "warning" - case "success": - m.fields["alert-type"] = "success" - case "info": // already set for info - default: - log.Printf("W! [inputs.statsd] skipping alert type") - continue - } - case "k:": - // TODO(docmerlin): does this make sense? - m.tags["aggregation-key"] = rawMetadataFields[i][2:] - case "s:": - m.fields["source-type-name"] = rawMetadataFields[i][2:] + case "h:": + m.tags["source"] = rawMetadataFields[i][2:] + case "t:": + switch rawMetadataFields[i][2:] { + case "error", "warning", "success", "info": + m.fields["alert_type"] = rawMetadataFields[i][2:] // already set for info default: - if rawMetadataFields[i][0] == '#' { - parseDataDogTags(m.tags, rawMetadataFields[i][1:]) - } else { - log.Printf("W! [inputs.statsd] unknown metadata type: '%s'", rawMetadataFields[i]) - } + continue + } + case "k:": + m.tags["aggregation_key"] = rawMetadataFields[i][2:] + case "s:": + m.fields["source_type_name"] = rawMetadataFields[i][2:] + default: + if rawMetadataFields[i][0] == '#' { + parseDataDogTags(m.tags, rawMetadataFields[i][1:]) + } else { + return fmt.Errorf("unknown metadata type: '%s'", rawMetadataFields[i]) } } } - // host is a magic tag in the system, and it expects it to replace the result of h: if it is present - // telegraf will add a"host" tag anyway with different meaning than dogstatsd, so we need to switch these out if host, ok := m.tags["host"]; ok { delete(m.tags, "host") m.tags["source"] = host } - s.Lock() - s.events = append(s.events, m) - s.Unlock() + newM, err := tmetric.New(m.name, m.tags, m.fields, m.ts) + if err != nil { + return err + } + s.acc.AddMetric(newM) return nil } @@ -153,11 +156,15 @@ func parseDataDogTags(tags map[string]string, message string) { if message[i] == ',' { if k == "" { k = message[start:i] - tags[k] = "" + tags[k] = "true" // this is because influx doesn't support empty tags start = i + 1 continue } - tags[k] = message[start:i] + v := message[start:i] + if v == "" { + v = "true" + } + tags[k] = v start = i + 1 k, inVal = "", false // reset state vars } else if message[i] == ':' && !inVal { @@ -166,8 +173,15 @@ func parseDataDogTags(tags map[string]string, message string) { inVal = true } } + if k == "" && start < i+1 { + tags[message[start:i+1]] = "true" + } // grab the last value if k != "" { - tags[k] = message[start : i+1] + if start < i+1 { + tags[k] = message[start : i+1] + return + } + tags[k] = "true" } } diff --git a/plugins/inputs/statsd/datadog_test.go b/plugins/inputs/statsd/datadog_test.go index 112e1cee594e8..d1a5b39002194 100644 --- a/plugins/inputs/statsd/datadog_test.go +++ b/plugins/inputs/statsd/datadog_test.go @@ -5,349 +5,468 @@ import ( "time" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestEventGather(t *testing.T) { - acc := &testutil.Accumulator{} - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text", "default-hostname") - require.Nil(t, err) - err = s.parseEventMessage(now.Add(1), "_e{10,24}:test title|test\\line1\\nline2\\nline3", "default-hostname") - require.Nil(t, err) - err = s.parseEventMessage(now.Add(2), "_e{10,9}:test title|test text|d:21", "default-hostname") - require.Nil(t, err) - - err = s.Gather(acc) - require.Nil(t, err) - - assert.Equal(t, acc.NMetrics(), uint64(3)) - - assert.Equal(t, "test title", acc.Metrics[0].Measurement) - assert.Equal(t, "test title", acc.Metrics[1].Measurement) - assert.Equal(t, "test title", acc.Metrics[2].Measurement) - - assert.Equal(t, map[string]string{"source": "default-hostname"}, acc.Metrics[0].Tags) - assert.Equal(t, map[string]string{"source": "default-hostname"}, acc.Metrics[1].Tags) - assert.Equal(t, map[string]string{"source": "default-hostname"}, acc.Metrics[2].Tags) - - assert.Equal(t, - map[string]interface{}{ - "priority": priorityNormal, - "alert-type": "info", - "text": "test text", + type expected struct { + title string + tags map[string]string + fields map[string]interface{} + } + tests := []struct { + name string + message string + hostname string + now time.Time + err bool + expected expected + }{{ + name: "basic", + message: "_e{10,9}:test title|test text", + hostname: "default-hostname", + now: now, + err: false, + expected: expected{ + title: "test title", + tags: map[string]string{"source": "default-hostname"}, + fields: map[string]interface{}{ + "priority": priorityNormal, + "alert_type": "info", + "text": "test text", + }, + }, + }, + { + name: "escape some stuff", + message: "_e{10,24}:test title|test\\line1\\nline2\\nline3", + hostname: "default-hostname", + now: now.Add(1), + err: false, + expected: expected{ + title: "test title", + tags: map[string]string{"source": "default-hostname"}, + fields: map[string]interface{}{ + "priority": priorityNormal, + "alert_type": "info", + "text": "test\\line1\nline2\nline3", + }, + }, }, - acc.Metrics[0].Fields) - assert.Equal(t, map[string]interface{}{ - "priority": priorityNormal, - "alert-type": "info", - "text": "test\\line1\nline2\nline3", - }, acc.Metrics[1].Fields) - assert.Equal(t, map[string]interface{}{ - "priority": priorityNormal, - "alert-type": "info", - "text": "test text", - "ts": int64(21), - }, acc.Metrics[2].Fields) + { + name: "custom time", + message: "_e{10,9}:test title|test text|d:21", + hostname: "default-hostname", + now: now.Add(2), + err: false, + expected: expected{ + title: "test title", + tags: map[string]string{"source": "default-hostname"}, + fields: map[string]interface{}{ + "priority": priorityNormal, + "alert_type": "info", + "text": "test text", + "ts": int64(21), + }, + }, + }, + } + acc := &testutil.Accumulator{} + s := NewTestStatsd() + s.acc = acc + + for i := range tests { + t.Run(tests[i].name, func(t *testing.T) { + err := s.parseEventMessage(tests[i].now, tests[i].message, tests[i].hostname) + if tests[i].err { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + require.Equal(t, uint64(i+1), acc.NMetrics()) + + require.Nil(t, err) + require.Equal(t, tests[i].expected.title, acc.Metrics[i].Measurement) + require.Equal(t, tests[i].expected.tags, acc.Metrics[i].Tags) + require.Equal(t, tests[i].expected.fields, acc.Metrics[i].Fields) + }) + } } // These tests adapted from tests in // https://github.com/DataDog/datadog-agent/blob/master/pkg/dogstatsd/parser_test.go // to ensure compatibility with the datadog-agent parser -func TestEventMinimal(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text", "default-hostname") - require.Nil(t, err) - e := s.events[0] - - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, now, e.ts) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, priorityNormal, e.fields["priority"]) - assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) -} -func TestEventMultilinesText(t *testing.T) { +func TestEvents(t *testing.T) { now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,24}:test title|test\\line1\\nline2\\nline3", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test\\line1\nline2\nline3", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Len(t, e.tags, 1) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) -} - -func TestEventPipeInTitle(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,24}:test|title|test\\line1\\nline2\\nline3", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test|title", e.name) - assert.Equal(t, "test\\line1\nline2\nline3", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Len(t, e.tags, 1) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) + type args struct { + now time.Time + message string + hostname string + } + type expected struct { + title string + text interface{} + now time.Time + ts interface{} + priority string + source string + alertType interface{} + aggregationKey string + sourceTypeName interface{} + checkTags map[string]string + } + + tests := []struct { + name string + args args + expected expected + }{ + { + name: "event minimal", + args: args{ + now: now, + message: "_e{10,9}:test title|test text", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now, + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "", + }, + }, + { + name: "event multilines text", + args: args{ + now: now.Add(1), + message: "_e{10,24}:test title|test\\line1\\nline2\\nline3", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test\\line1\nline2\nline3", + now: now.Add(1), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "", + }, + }, + { + name: "event pipe in title", + args: args{ + now: now.Add(2), + message: "_e{10,24}:test|title|test\\line1\\nline2\\nline3", + hostname: "default-hostname", + }, + expected: expected{ + title: "test|title", + text: "test\\line1\nline2\nline3", + now: now.Add(2), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "", + }, + }, + { + name: "event metadata timestamp", + args: args{ + now: now.Add(3), + message: "_e{10,9}:test title|test text|d:21", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(3), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "", + ts: int64(21), + }, + }, + { + name: "event metadata priority", + args: args{ + now: now.Add(4), + message: "_e{10,9}:test title|test text|p:low", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(4), + priority: priorityLow, + source: "default-hostname", + alertType: eventInfo, + }, + }, + { + name: "event metadata hostname", + args: args{ + now: now.Add(5), + message: "_e{10,9}:test title|test text|h:localhost", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(5), + priority: priorityNormal, + source: "localhost", + alertType: eventInfo, + }, + }, + { + name: "event metadata hostname in tag", + args: args{ + now: now.Add(6), + message: "_e{10,9}:test title|test text|#host:localhost", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(6), + priority: priorityNormal, + source: "localhost", + alertType: eventInfo, + }, + }, + { + name: "event metadata empty host tag", + args: args{ + now: now.Add(7), + message: "_e{10,9}:test title|test text|#host:,other:tag", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(7), + priority: priorityNormal, + source: "true", + alertType: eventInfo, + checkTags: map[string]string{"other": "tag", "source": "true"}, + }, + }, + { + name: "event metadata alert type", + args: args{ + now: now.Add(8), + message: "_e{10,9}:test title|test text|t:warning", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(8), + priority: priorityNormal, + source: "default-hostname", + alertType: eventWarning, + }, + }, + { + name: "event metadata aggregation key", + args: args{ + now: now.Add(9), + message: "_e{10,9}:test title|test text|k:some aggregation key", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(9), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "some aggregation key", + }, + }, + { + name: "event metadata aggregation key", + args: args{ + now: now.Add(10), + message: "_e{10,9}:test title|test text|k:some aggregation key", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(10), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "some aggregation key", + }, + }, + { + name: "event metadata source type", + args: args{ + now: now.Add(11), + message: "_e{10,9}:test title|test text|s:this is the source", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(11), + priority: priorityNormal, + source: "default-hostname", + sourceTypeName: "this is the source", + alertType: eventInfo, + }, + }, + { + name: "event metadata source type", + args: args{ + now: now.Add(11), + message: "_e{10,9}:test title|test text|s:this is the source", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(11), + priority: priorityNormal, + source: "default-hostname", + sourceTypeName: "this is the source", + alertType: eventInfo, + }, + }, + { + name: "event metadata source tags", + args: args{ + now: now.Add(11), + message: "_e{10,9}:test title|test text|#tag1,tag2:test", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(11), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + checkTags: map[string]string{"tag1": "true", "tag2": "test", "source": "default-hostname"}, + }, + }, + { + name: "event metadata multiple", + args: args{ + now: now.Add(11), + message: "_e{10,9}:test title|test text|t:warning|d:12345|p:low|h:some.host|k:aggKey|s:source test|#tag1,tag2:test", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(11), + priority: priorityLow, + source: "some.host", + ts: int64(12345), + alertType: eventWarning, + aggregationKey: "aggKey", + sourceTypeName: "source test", + checkTags: map[string]string{"aggregation_key": "aggKey", "tag1": "true", "tag2": "test", "source": "some.host"}, + }, + }, + } + for i := range tests { + t.Run(tests[i].name, func(t *testing.T) { + s := NewTestStatsd() + acc := &testutil.Accumulator{} + s.acc = acc + err := s.parseEventMessage(tests[i].args.now, tests[i].args.message, tests[i].args.hostname) + require.Nil(t, err) + m := acc.Metrics[0] + require.Equal(t, tests[i].expected.title, m.Measurement) + require.Equal(t, tests[i].expected.text, m.Fields["text"]) + require.Equal(t, tests[i].expected.now, m.Time) + require.Equal(t, tests[i].expected.ts, m.Fields["ts"]) + require.Equal(t, tests[i].expected.priority, m.Fields["priority"]) + require.Equal(t, tests[i].expected.source, m.Tags["source"]) + require.Equal(t, tests[i].expected.alertType, m.Fields["alert_type"]) + require.Equal(t, tests[i].expected.aggregationKey, m.Tags["aggregation_key"]) + require.Equal(t, tests[i].expected.sourceTypeName, m.Fields["source_type_name"]) + if tests[i].expected.checkTags != nil { + require.Equal(t, tests[i].expected.checkTags, m.Tags) + } + }) + } } func TestEventError(t *testing.T) { now := time.Now() s := NewTestStatsd() + s.acc = &testutil.Accumulator{} // missing length header err := s.parseEventMessage(now, "_e:title|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) // greater length than packet err = s.parseEventMessage(now, "_e{10,10}:title|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) // zero length err = s.parseEventMessage(now, "_e{0,0}:a|a", "default-hostname") - assert.Error(t, err) + require.Error(t, err) // missing title or text length err = s.parseEventMessage(now, "_e{5555:title|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) // missing wrong len format err = s.parseEventMessage(now, "_e{a,1}:title|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) err = s.parseEventMessage(now, "_e{1,a}:title|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) // missing title or text length err = s.parseEventMessage(now, "_e{5,}:title|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) err = s.parseEventMessage(now, "_e{,4}:title|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) err = s.parseEventMessage(now, "_e{}:title|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) err = s.parseEventMessage(now, "_e{,}:title|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) // not enough information err = s.parseEventMessage(now, "_e|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) err = s.parseEventMessage(now, "_e:|text", "default-hostname") - assert.Error(t, err) + require.Error(t, err) // invalid timestamp err = s.parseEventMessage(now, "_e{5,4}:title|text|d:abc", "default-hostname") - assert.NoError(t, err) + require.NoError(t, err) // invalid priority err = s.parseEventMessage(now, "_e{5,4}:title|text|p:urgent", "default-hostname") - assert.NoError(t, err) + require.NoError(t, err) // invalid priority err = s.parseEventMessage(now, "_e{5,4}:title|text|p:urgent", "default-hostname") - assert.NoError(t, err) + require.NoError(t, err) // invalid alert type err = s.parseEventMessage(now, "_e{5,4}:title|text|t:test", "default-hostname") - assert.NoError(t, err) + require.NoError(t, err) // unknown metadata err = s.parseEventMessage(now, "_e{5,4}:title|text|x:1234", "default-hostname") - assert.NoError(t, err) -} - -func TestEventMetadataTimestamp(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|d:21", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(21), e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) -} - -func TestEventMetadataPriority(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|p:low", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "low", e.fields["priority"]) - assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) -} - -func TestEventMetadataHostname(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|h:localhost", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "localhost", e.tags["source"]) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) -} - -func TestEventMetadataHostnameInTag(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|#host:localhost", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "localhost", e.tags["source"]) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) -} - -func TestEventMetadataEmptyHostTag(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|#host:,other:tag", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "", e.tags["source"]) - assert.Equal(t, map[string]string{"other": "tag", "source": ""}, e.tags) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) -} - -func TestEventMetadataAlertType(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|t:warning", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, "warning", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) - -} - -func TestEventMetadataAggregatioKey(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|k:some aggregation key", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "some aggregation key", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) -} - -func TestEventMetadataSourceType(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|s:this is the source", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, "this is the source", e.fields["source-type-name"]) -} - -func TestEventMetadataTags(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|#tag1,tag2:test", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, nil, e.fields["ts"]) - assert.Equal(t, "normal", e.fields["priority"]) - assert.Equal(t, "default-hostname", e.tags["source"]) - assert.Equal(t, map[string]string{"tag1": "", "tag2": "test", "source": "default-hostname"}, e.tags) - assert.Equal(t, "info", e.fields["alert-type"]) - assert.Equal(t, "", e.tags["aggregation-key"]) - assert.Equal(t, nil, e.fields["source-type-name"]) -} - -func TestEventMetadataMultiple(t *testing.T) { - now := time.Now() - s := NewTestStatsd() - err := s.parseEventMessage(now, "_e{10,9}:test title|test text|t:warning|d:12345|p:low|h:some.host|k:aggKey|s:source test|#tag1,tag2:test", "default-hostname") - - require.Nil(t, err) - e := s.events[0] - assert.Equal(t, "test title", e.name) - assert.Equal(t, "test text", e.fields["text"]) - assert.Equal(t, int64(12345), e.fields["ts"]) - assert.Equal(t, "low", e.fields["priority"]) - assert.Equal(t, "some.host", e.tags["source"]) - assert.Equal(t, map[string]string{"aggregation-key": "aggKey", "tag1": "", "tag2": "test", "source": "some.host"}, e.tags) - assert.Equal(t, "warning", e.fields["alert-type"]) - assert.Equal(t, "aggKey", e.tags["aggregation-key"]) - assert.Equal(t, "source test", e.fields["source-type-name"]) + require.Error(t, err) } diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index ba737f3a0419c..24164dcc4a725 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -62,18 +62,18 @@ type Statsd struct { DeleteCounters bool DeleteSets bool DeleteTimings bool - DeleteEvents bool ConvertNames bool // MetricSeparator is the separator between parts of the metric name. MetricSeparator string // This flag enables parsing of tags in the dogstatsd extension to the // statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/) - ParseDataDogTags bool + ParseDataDogTags bool // depreciated in 1.10; use datadog_extensions - // This flag enables parsing of data dog events from the dogstatsd extension to - // the statsd protocol - ParseDataDogEvents bool + // Parses extensions to statsd in the datadog statsd format + // currently supports metrics and datadog tags. + // http://docs.datadoghq.com/guides/dogstatsd/ + DataDogExtensions bool // UDPPacketSize is deprecated, it's only here for legacy support // we now always create 1 max size buffer and then copy only what we need @@ -107,7 +107,6 @@ type Statsd struct { counters map[string]cachedcounter sets map[string]cachedset timings map[string]cachedtimings - events []cachedEvent // bucket -> influx templates Templates []string @@ -184,7 +183,8 @@ type cachedtimings struct { tags map[string]string } -type cachedEvent struct { +//this is used internally for building out an event +type event struct { name string fields map[string]interface{} tags map[string]string @@ -224,7 +224,7 @@ const sampleConfig = ` delete_sets = true ## Reset timings & histograms every interval (default=true) delete_timings = true - + ## Percentiles to calculate for timing & histogram stats percentiles = [90] @@ -234,6 +234,9 @@ const sampleConfig = ` ## Parses tags in the datadog statsd format ## http://docs.datadoghq.com/guides/dogstatsd/ parse_data_dog_tags = false + + ## Parses events in the datadog statsd format + ## http://docs.datadoghq.com/guides/dogstatsd/ parse_data_dog_events = false ## Statsd data translation templates, more info can be read here: @@ -261,12 +264,12 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { defer s.Unlock() now := time.Now() - for _, metric := range s.timings { + for _, m := range s.timings { // Defining a template to parse field names for timers allows us to split // out multiple fields per timer. In this case we prefix each stat with the // field name and store these all in a single measurement. fields := make(map[string]interface{}) - for fieldName, stats := range metric.fields { + for fieldName, stats := range m.fields { var prefix string if fieldName != defaultFieldName { prefix = fieldName + "_" @@ -283,47 +286,43 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { } } - acc.AddFields(metric.name, fields, metric.tags, now) + acc.AddFields(m.name, fields, m.tags, now) } if s.DeleteTimings { s.timings = make(map[string]cachedtimings) } - for _, metric := range s.gauges { - acc.AddGauge(metric.name, metric.fields, metric.tags, now) + for _, m := range s.gauges { + acc.AddGauge(m.name, m.fields, m.tags, now) } if s.DeleteGauges { s.gauges = make(map[string]cachedgauge) } - for _, metric := range s.counters { - acc.AddCounter(metric.name, metric.fields, metric.tags, now) + for _, m := range s.counters { + acc.AddCounter(m.name, m.fields, m.tags, now) } if s.DeleteCounters { s.counters = make(map[string]cachedcounter) } - for _, metric := range s.sets { + for _, m := range s.sets { fields := make(map[string]interface{}) - for field, set := range metric.fields { + for field, set := range m.fields { fields[field] = int64(len(set)) } - acc.AddFields(metric.name, fields, metric.tags, now) + acc.AddFields(m.name, fields, m.tags, now) } if s.DeleteSets { s.sets = make(map[string]cachedset) } - for _, e := range s.events { - acc.AddFields(e.name, e.fields, e.tags, e.ts) - } - if s.DeleteEvents { - s.events = s.events[:0] - } - return nil } func (s *Statsd) Start(_ telegraf.Accumulator) error { + if s.ParseDataDogTags { + s.DataDogExtensions = true + } // Make data structures s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) @@ -503,8 +502,8 @@ func (s *Statsd) parser() error { line = strings.TrimSpace(line) switch { case line == "": - case s.ParseDataDogEvents && len(line) > 2 && line[:2] == "_e": - s.parseEventMessage(time.Now(), line, in.Addr.String()) + case s.DataDogExtensions && strings.HasPrefix(line, "_e"): + s.parseEventMessage(in.Time, line, in.Addr.String()) default: s.parseStatsdLine(line) } @@ -521,6 +520,9 @@ func (s *Statsd) parseStatsdLine(line string) error { lineTags := make(map[string]string) if s.ParseDataDogTags { + log.Printf("I! WARNING statsd: parse_data_dog_tags config option is deprecated, please use datadog_extensions instead") + } + if s.ParseDataDogTags || s.DataDogExtensions { recombinedSegments := make([]string, 0) // datadog tags look like this: // users.online:1|c|@0.5|#country:china,environment:production @@ -531,6 +533,7 @@ func (s *Statsd) parseStatsdLine(line string) error { for _, segment := range pipesplit { if len(segment) > 0 && segment[0] == '#' { // we have ourselves a tag; they are comma separated + fmt.Println("************************", segment[1:]) parseDataDogTags(lineTags, segment[1:]) } else { recombinedSegments = append(recombinedSegments, segment) @@ -928,7 +931,6 @@ func init() { DeleteGauges: true, DeleteSets: true, DeleteTimings: true, - DeleteEvents: true, } }) } diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index c297f590d4a75..3b6c1770256d2 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -1,7 +1,6 @@ package statsd import ( - "errors" "fmt" "net" "testing" @@ -863,7 +862,7 @@ func TestParse_Tags(t *testing.T) { // Test that DataDog tags are parsed func TestParse_DataDogTags(t *testing.T) { s := NewTestStatsd() - s.ParseDataDogTags = true + s.DataDogExtensions = true lines := []string{ "my_counter:1|c|#host:localhost,environment:prod,endpoint:/:tenant?/oauth/ro", @@ -881,7 +880,7 @@ func TestParse_DataDogTags(t *testing.T) { }, "my_gauge": { - "live": "", + "live": "true", "metric_type": "gauge", }, @@ -891,7 +890,7 @@ func TestParse_DataDogTags(t *testing.T) { }, "my_timer": { - "live": "", + "live": "true", "host": "localhost", "metric_type": "timing", }, @@ -1590,12 +1589,11 @@ func testValidateGauge( } } if !found { - return fmt.Errorf("Test Error: Metric name %s not found\n", name) + return fmt.Errorf("test Error: Metric name %s not found", name) } if valueExpected != valueActual { - return errors.New(fmt.Sprintf("Measurement: %s, expected %f, actual %f\n", - name, valueExpected, valueActual)) + return fmt.Errorf("Measurement: %s, expected %f, actual %f", name, valueExpected, valueActual) } return nil } From 0eff26b5bf22a395591fc6e4dbd2654b9736cb2b Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Wed, 8 May 2019 19:15:54 -0500 Subject: [PATCH 08/13] WIP --- plugins/inputs/statsd/statsd.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 24164dcc4a725..97384a7c73f76 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "net" + "net/url" "sort" "strconv" "strings" @@ -140,7 +141,7 @@ type Statsd struct { type input struct { *bytes.Buffer time.Time - net.Addr + Addr string } // One statsd metric, form is :||@ @@ -474,9 +475,8 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { b := s.bufPool.Get().(*bytes.Buffer) b.Reset() b.Write(buf[:n]) - select { - case s.in <- input{Buffer: b, Time: time.Now(), Addr: addr}: + case s.in <- input{Buffer: b, Time: time.Now(), Addr: addr.IP.String()}: default: s.drops++ if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { @@ -503,7 +503,7 @@ func (s *Statsd) parser() error { switch { case line == "": case s.DataDogExtensions && strings.HasPrefix(line, "_e"): - s.parseEventMessage(in.Time, line, in.Addr.String()) + s.parseEventMessage(in.Time, line, in.Addr) default: s.parseStatsdLine(line) } @@ -824,7 +824,17 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { s.forget(id) s.CurrentConnections.Incr(-1) }() - + addr := conn.RemoteAddr() + a, err := url.Parse(addr.String()) + var host string + if err != nil { + // this should never happen because the conn handler should give us parsable addresses, + // but if it does we will know + host = "badhost" + log.Printf("E! failed to parse %s\n", addr) + } else { + host = a.Host + } var n int scanner := bufio.NewScanner(conn) for { @@ -848,7 +858,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { b.WriteByte('\n') select { - case s.in <- input{Buffer: b, Time: time.Now(), Addr: conn.RemoteAddr()}: + case s.in <- input{Buffer: b, Time: time.Now(), Addr: host}: default: s.drops++ if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { From b8d9a30c2d88df15a92619b907c68f282e3ce50c Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Thu, 9 May 2019 11:44:52 -0500 Subject: [PATCH 09/13] cleanup --- plugins/inputs/statsd/datadog.go | 6 +++--- plugins/inputs/statsd/statsd.go | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/statsd/datadog.go b/plugins/inputs/statsd/datadog.go index f7da285c5cd98..71d530a07508c 100644 --- a/plugins/inputs/statsd/datadog.go +++ b/plugins/inputs/statsd/datadog.go @@ -1,8 +1,8 @@ package statsd import ( + "errors" "fmt" - "log" "strconv" "strings" "time" @@ -98,7 +98,7 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam rawMetadataFields := strings.Split(message[1:], "|") for i := range rawMetadataFields { if len(rawMetadataFields[i]) < 2 { - log.Printf("W! [inputs.statsd] too short metadata field") + return errors.New("too short metadata field") } switch rawMetadataFields[i][:2] { case "d:": @@ -119,7 +119,7 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam m.tags["source"] = rawMetadataFields[i][2:] case "t:": switch rawMetadataFields[i][2:] { - case "error", "warning", "success", "info": + case eventError, eventWarning, eventSuccess, eventInfo: m.fields["alert_type"] = rawMetadataFields[i][2:] // already set for info default: continue diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 97384a7c73f76..25d11c0794f21 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -533,7 +533,6 @@ func (s *Statsd) parseStatsdLine(line string) error { for _, segment := range pipesplit { if len(segment) > 0 && segment[0] == '#' { // we have ourselves a tag; they are comma separated - fmt.Println("************************", segment[1:]) parseDataDogTags(lineTags, segment[1:]) } else { recombinedSegments = append(recombinedSegments, segment) From 4dbf9477822acc3c8528cb16ebce62cfe622661e Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Thu, 9 May 2019 19:10:03 -0500 Subject: [PATCH 10/13] Second round of review --- plugins/inputs/statsd/datadog.go | 69 +++++++++++---------------- plugins/inputs/statsd/datadog_test.go | 6 +++ plugins/inputs/statsd/statsd.go | 29 ++++------- 3 files changed, 42 insertions(+), 62 deletions(-) diff --git a/plugins/inputs/statsd/datadog.go b/plugins/inputs/statsd/datadog.go index 71d530a07508c..f2785ff38ec16 100644 --- a/plugins/inputs/statsd/datadog.go +++ b/plugins/inputs/statsd/datadog.go @@ -1,13 +1,14 @@ package statsd +// this is adapted from datadog's apache licensed version at +// https://github.com/DataDog/datadog-agent/blob/fcfc74f106ab1bd6991dfc6a7061c558d934158a/pkg/dogstatsd/parser.go#L173 + import ( "errors" "fmt" "strconv" "strings" "time" - - tmetric "github.com/influxdata/telegraf/metric" ) const ( @@ -22,8 +23,6 @@ const ( var uncommenter = strings.NewReplacer("\\n", "\n") -// this is adapted from datadog's apache licensed version at -// https://github.com/DataDog/datadog-agent/blob/fcfc74f106ab1bd6991dfc6a7061c558d934158a/pkg/dogstatsd/parser.go#L173 func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostname string) error { // _e{title.length,text.length}:title|text // [ @@ -53,7 +52,9 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam if err != nil { return fmt.Errorf("Invalid message format, could not parse title.length: '%s'", rawLen[0]) } - + if len(rawLen[1]) < 1 { + return fmt.Errorf("Invalid message format, could not parse text.length: '%s'", rawLen[0]) + } textLen, err := strconv.ParseInt(rawLen[1][:len(rawLen[1])-1], 10, 64) if err != nil { return fmt.Errorf("Invalid message format, could not parse text.length: '%s'", rawLen[0]) @@ -70,28 +71,16 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam return fmt.Errorf("Invalid event message format: empty 'title' or 'text' field") } - // Handle hostname, with a priority to the h: field, then the host: - // tag and finally the defaultHostname value - // Metadata - m := event{ - name: rawTitle, - } - m.tags = make(map[string]string, strings.Count(message, ",")+2) // allocate for the approximate number of tags - m.fields = make(map[string]interface{}, 9) - m.fields["alert_type"] = eventInfo // default event type - m.fields["text"] = uncommenter.Replace(string(rawText)) - // host is a magic tag in the system, and it expects it to replace the result of h: if it is present - // telegraf will add a"host" tag anyway with different meaning than dogstatsd, so we need to use source instead of host. - - m.tags["source"] = defaultHostname - m.fields["priority"] = priorityNormal - m.ts = now + name := rawTitle + tags := make(map[string]string, strings.Count(message, ",")+2) // allocate for the approximate number of tags + fields := make(map[string]interface{}, 9) + fields["alert_type"] = eventInfo // default event type + fields["text"] = uncommenter.Replace(string(rawText)) + tags["source"] = defaultHostname // Use source tag because host is reserved tag key in Telegraf. + fields["priority"] = priorityNormal + ts := now if len(message) < 2 { - newM, err := tmetric.New(m.name, m.tags, m.fields, m.ts) - if err != nil { - return err - } - s.acc.AddMetric(newM) + s.acc.AddFields(name, fields, tags, ts) return nil } @@ -106,45 +95,43 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam if err != nil { continue } - m.fields["ts"] = ts + fields["ts"] = ts case "p:": switch rawMetadataFields[i][2:] { case priorityLow: - m.fields["priority"] = priorityLow + fields["priority"] = priorityLow case priorityNormal: // we already used this as a default default: continue } case "h:": - m.tags["source"] = rawMetadataFields[i][2:] + tags["source"] = rawMetadataFields[i][2:] case "t:": switch rawMetadataFields[i][2:] { case eventError, eventWarning, eventSuccess, eventInfo: - m.fields["alert_type"] = rawMetadataFields[i][2:] // already set for info + fields["alert_type"] = rawMetadataFields[i][2:] // already set for info default: continue } case "k:": - m.tags["aggregation_key"] = rawMetadataFields[i][2:] + tags["aggregation_key"] = rawMetadataFields[i][2:] case "s:": - m.fields["source_type_name"] = rawMetadataFields[i][2:] + fields["source_type_name"] = rawMetadataFields[i][2:] default: if rawMetadataFields[i][0] == '#' { - parseDataDogTags(m.tags, rawMetadataFields[i][1:]) + parseDataDogTags(tags, rawMetadataFields[i][1:]) } else { return fmt.Errorf("unknown metadata type: '%s'", rawMetadataFields[i]) } } } - if host, ok := m.tags["host"]; ok { - delete(m.tags, "host") - m.tags["source"] = host - } - newM, err := tmetric.New(m.name, m.tags, m.fields, m.ts) - if err != nil { - return err + // Use source tag because host is reserved tag key in Telegraf. + // In datadog the host tag and `h:` are interchangable, so we have to chech for the host tag. + if host, ok := tags["host"]; ok { + delete(tags, "host") + tags["source"] = host } - s.acc.AddMetric(newM) + s.acc.AddFields(name, fields, tags, ts) return nil } diff --git a/plugins/inputs/statsd/datadog_test.go b/plugins/inputs/statsd/datadog_test.go index d1a5b39002194..61762a2c451fe 100644 --- a/plugins/inputs/statsd/datadog_test.go +++ b/plugins/inputs/statsd/datadog_test.go @@ -434,6 +434,12 @@ func TestEventError(t *testing.T) { err = s.parseEventMessage(now, "_e{5,}:title|text", "default-hostname") require.Error(t, err) + err = s.parseEventMessage(now, "_e{100,:title|text", "default-hostname") + require.Error(t, err) + + err = s.parseEventMessage(now, "_e,100:title|text", "default-hostname") + require.Error(t, err) + err = s.parseEventMessage(now, "_e{,4}:title|text", "default-hostname") require.Error(t, err) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 25d11c0794f21..94172e245bac2 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -157,7 +157,6 @@ type metric struct { additive bool samplerate float64 tags map[string]string - ts time.Time // for events } type cachedset struct { @@ -185,12 +184,6 @@ type cachedtimings struct { } //this is used internally for building out an event -type event struct { - name string - fields map[string]interface{} - tags map[string]string - ts time.Time -} func (_ *Statsd) Description() string { return "Statsd UDP/TCP Server" @@ -225,7 +218,7 @@ const sampleConfig = ` delete_sets = true ## Reset timings & histograms every interval (default=true) delete_timings = true - + ## Percentiles to calculate for timing & histogram stats percentiles = [90] @@ -236,9 +229,8 @@ const sampleConfig = ` ## http://docs.datadoghq.com/guides/dogstatsd/ parse_data_dog_tags = false - ## Parses events in the datadog statsd format - ## http://docs.datadoghq.com/guides/dogstatsd/ - parse_data_dog_events = false + ## Parses datadog extensions to the statsd format + datadog_extensions = false ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md @@ -323,6 +315,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { func (s *Statsd) Start(_ telegraf.Accumulator) error { if s.ParseDataDogTags { s.DataDogExtensions = true + log.Printf("W! [inputs.statsd] The parse_data_dog_tags option is deprecated, use datadog_extensions instead.") } // Make data structures s.gauges = make(map[string]cachedgauge) @@ -519,10 +512,7 @@ func (s *Statsd) parseStatsdLine(line string) error { defer s.Unlock() lineTags := make(map[string]string) - if s.ParseDataDogTags { - log.Printf("I! WARNING statsd: parse_data_dog_tags config option is deprecated, please use datadog_extensions instead") - } - if s.ParseDataDogTags || s.DataDogExtensions { + if s.DataDogExtensions { recombinedSegments := make([]string, 0) // datadog tags look like this: // users.online:1|c|@0.5|#country:china,environment:production @@ -824,15 +814,12 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { s.CurrentConnections.Incr(-1) }() addr := conn.RemoteAddr() - a, err := url.Parse(addr.String()) - var host string + parsedURL, err := url.Parse(addr.String()) if err != nil { // this should never happen because the conn handler should give us parsable addresses, // but if it does we will know - host = "badhost" log.Printf("E! failed to parse %s\n", addr) - } else { - host = a.Host + return // close the connetion and return } var n int scanner := bufio.NewScanner(conn) @@ -857,7 +844,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { b.WriteByte('\n') select { - case s.in <- input{Buffer: b, Time: time.Now(), Addr: host}: + case s.in <- input{Buffer: b, Time: time.Now(), Addr: parsedURL.Host}: default: s.drops++ if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { From 4903f3122afe121d0646d029d1796e0eb538212f Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Thu, 9 May 2019 19:25:59 -0500 Subject: [PATCH 11/13] WIP --- plugins/inputs/statsd/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index 4b632af618322..5f59d4e7252ba 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -193,7 +193,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time. - **templates** []string: Templates for transforming statsd buckets into influx measurements and tags. - **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) -- **parse_data_dog_events** boolean: Enable parsing of events in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) +- **datadog_extensions** boolean: Enable parsing of DataDog's extensions to dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) ### Statsd bucket -> InfluxDB line-protocol Templates From a5757cb069b71fa5e213306790ccce9792028f78 Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Thu, 9 May 2019 19:45:11 -0500 Subject: [PATCH 12/13] fix log statements --- plugins/inputs/statsd/statsd.go | 36 ++++++++++++++++----------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 94172e245bac2..fd17f4f9859a9 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -35,11 +35,11 @@ const ( MaxTCPConnections = 250 ) -var dropwarn = "E! Error: statsd message queue full. " + +var dropwarn = "E! [inputs.statsd] Error: statsd message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" -var malformedwarn = "E! Statsd over TCP has received %d malformed packets" + +var malformedwarn = "E! [inputs.statsd] Statsd over TCP has received %d malformed packets" + " thus far." // Statsd allows the importing of statsd and dogstatsd data. @@ -350,7 +350,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { } if s.ConvertNames { - log.Printf("I! WARNING statsd: convert_names config option is deprecated," + + log.Printf("W! [inputs.statsd] statsd: convert_names config option is deprecated," + " please use metric_separator instead") } @@ -369,7 +369,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { return err } - log.Println("I! Statsd UDP listener listening on: ", conn.LocalAddr().String()) + log.Println("I! [inputs.statsd] Statsd UDP listener listening on: ", conn.LocalAddr().String()) s.UDPlistener = conn s.wg.Add(1) @@ -387,7 +387,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { return err } - log.Println("I! TCP Statsd listening on: ", listener.Addr().String()) + log.Println("I! [inputs.statsd] TCP Statsd listening on: ", listener.Addr().String()) s.TCPlistener = listener s.wg.Add(1) @@ -403,7 +403,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { defer s.wg.Done() s.parser() }() - log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress) + log.Printf("I! [inputs.statsd] Started the statsd service on %s\n", s.ServiceAddress) return nil } @@ -462,7 +462,7 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { default: n, addr, err := conn.ReadFromUDP(buf) if err != nil && !strings.Contains(err.Error(), "closed network") { - log.Printf("E! Error READ: %s\n", err.Error()) + log.Printf("E! [inputs.statsd] Error READ: %s\n", err.Error()) continue } b := s.bufPool.Get().(*bytes.Buffer) @@ -534,7 +534,7 @@ func (s *Statsd) parseStatsdLine(line string) error { // Validate splitting the line on ":" bits := strings.Split(line, ":") if len(bits) < 2 { - log.Printf("E! Error: splitting ':', Unable to parse metric: %s\n", line) + log.Printf("E! [inputs.statsd] Error: splitting ':', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") } @@ -550,11 +550,11 @@ func (s *Statsd) parseStatsdLine(line string) error { // Validate splitting the bit on "|" pipesplit := strings.Split(bit, "|") if len(pipesplit) < 2 { - log.Printf("E! Error: splitting '|', Unable to parse metric: %s\n", line) + log.Printf("E! [inputs.statsd] Error: splitting '|', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") } else if len(pipesplit) > 2 { sr := pipesplit[2] - errmsg := "E! Error: parsing sample rate, %s, it must be in format like: " + + errmsg := "E! [inputs.statsd] parsing sample rate, %s, it must be in format like: " + "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" if strings.Contains(sr, "@") && len(sr) > 1 { samplerate, err := strconv.ParseFloat(sr[1:], 64) @@ -574,14 +574,14 @@ func (s *Statsd) parseStatsdLine(line string) error { case "g", "c", "s", "ms", "h": m.mtype = pipesplit[1] default: - log.Printf("E! Error: Statsd Metric type %s unsupported", pipesplit[1]) + log.Printf("E! [inputs.statsd] Error: Statsd Metric type %s unsupported", pipesplit[1]) return errors.New("Error Parsing statsd line") } // Parse the value if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") { if m.mtype != "g" && m.mtype != "c" { - log.Printf("E! Error: +- values are only supported for gauges & counters: %s\n", line) + log.Printf("E! [inputs.statsd] Error: +- values are only supported for gauges & counters: %s\n", line) return errors.New("Error Parsing statsd line") } m.additive = true @@ -591,7 +591,7 @@ func (s *Statsd) parseStatsdLine(line string) error { case "g", "ms", "h": v, err := strconv.ParseFloat(pipesplit[0], 64) if err != nil { - log.Printf("E! Error: parsing value to float64: %s\n", line) + log.Printf("E! [inputs.statsd] Error: parsing value to float64: %s\n", line) return errors.New("Error Parsing statsd line") } m.floatvalue = v @@ -601,7 +601,7 @@ func (s *Statsd) parseStatsdLine(line string) error { if err != nil { v2, err2 := strconv.ParseFloat(pipesplit[0], 64) if err2 != nil { - log.Printf("E! Error: parsing value to int64: %s\n", line) + log.Printf("E! [inputs.statsd] Error: parsing value to int64: %s\n", line) return errors.New("Error Parsing statsd line") } v = int64(v2) @@ -818,7 +818,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { if err != nil { // this should never happen because the conn handler should give us parsable addresses, // but if it does we will know - log.Printf("E! failed to parse %s\n", addr) + log.Printf("E! [inputs.statsd] failed to parse %s\n", addr) return // close the connetion and return } var n int @@ -858,8 +858,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { // refuser refuses a TCP connection func (s *Statsd) refuser(conn *net.TCPConn) { conn.Close() - log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr()) - log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" + + log.Printf("I! [inputs.statsd] Refused TCP Connection from %s", conn.RemoteAddr()) + log.Printf("I! [inputs.statsd] WARNING: Maximum TCP Connections reached, you may want to" + " adjust max_tcp_connections") } @@ -879,7 +879,7 @@ func (s *Statsd) remember(id string, conn *net.TCPConn) { func (s *Statsd) Stop() { s.Lock() - log.Println("I! Stopping the statsd service") + log.Println("I! [inputs.statsd] Stopping the statsd service") close(s.done) if s.isUDP() { s.UDPlistener.Close() From fa4b6e7982fb6198d4150155b54598879f0d5e78 Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Mon, 13 May 2019 19:04:29 -0500 Subject: [PATCH 13/13] fix panic shutdown for statsd --- plugins/inputs/statsd/README.md | 2 -- plugins/inputs/statsd/statsd.go | 18 +++++++++++------- plugins/inputs/statsd/statsd_test.go | 1 - 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index 5f59d4e7252ba..a33480f61c7a5 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -34,7 +34,6 @@ ## Reset timings & histograms every interval (default=true) delete_timings = true - ## Percentiles to calculate for timing & histogram stats percentiles = [90] @@ -183,7 +182,6 @@ to allow. Used when protocol is set to tcp. - **delete_counters** boolean: Delete counters on every collection interval - **delete_sets** boolean: Delete set counters on every collection interval - **delete_timings** boolean: Delete timings on every collection interval - - **percentiles** []int: Percentiles to calculate for timing & histogram stats - **allowed_pending_messages** integer: Number of messages allowed to queue up waiting to be processed. When this fills, messages will be dropped and logged. diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index fd17f4f9859a9..7408482b6f982 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -74,7 +74,7 @@ type Statsd struct { // Parses extensions to statsd in the datadog statsd format // currently supports metrics and datadog tags. // http://docs.datadoghq.com/guides/dogstatsd/ - DataDogExtensions bool + DataDogExtensions bool `toml:"datadog_extensions"` // UDPPacketSize is deprecated, it's only here for legacy support // we now always create 1 max size buffer and then copy only what we need @@ -183,8 +183,6 @@ type cachedtimings struct { tags map[string]string } -//this is used internally for building out an event - func (_ *Statsd) Description() string { return "Statsd UDP/TCP Server" } @@ -461,15 +459,21 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { return nil default: n, addr, err := conn.ReadFromUDP(buf) - if err != nil && !strings.Contains(err.Error(), "closed network") { - log.Printf("E! [inputs.statsd] Error READ: %s\n", err.Error()) - continue + if err != nil { + if !strings.Contains(err.Error(), "closed network") { + log.Printf("E! [inputs.statsd] Error READ: %s\n", err.Error()) + continue + } + return err } b := s.bufPool.Get().(*bytes.Buffer) b.Reset() b.Write(buf[:n]) select { - case s.in <- input{Buffer: b, Time: time.Now(), Addr: addr.IP.String()}: + case s.in <- input{ + Buffer: b, + Time: time.Now(), + Addr: addr.IP.String()}: default: s.drops++ if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 3b6c1770256d2..4a856902d122a 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -1504,7 +1504,6 @@ func TestParseKeyValue(t *testing.T) { } // Test utility functions - func testValidateSet( name string, value int64,