Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow metrics to be unserializable in influx.Reader #4047

Merged
merged 2 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions plugins/serializers/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package influx

import (
"bytes"
"errors"
"fmt"
"io"
"log"
"math"
"sort"
"strconv"
Expand All @@ -26,14 +27,28 @@ const (
UintSupport FieldTypeSupport = 1 << iota
)

// MetricError is an error causing a metric to be unserializable.
type MetricError struct {
s string
}

func (e MetricError) Error() string {
return e.s
}

// FieldError is an error causing a field to be unserializable.
type FieldError struct {
s string
}

func (e FieldError) Error() string {
return e.s
}

var (
ErrNeedMoreSpace = errors.New("need more space")
ErrInvalidName = errors.New("invalid name")
ErrInvalidFieldKey = errors.New("invalid field key")
ErrInvalidFieldType = errors.New("invalid field type")
ErrFieldIsNaN = errors.New("is NaN")
ErrFieldIsInf = errors.New("is Inf")
ErrNoFields = errors.New("no fields")
ErrNeedMoreSpace = &MetricError{"need more space"}
ErrInvalidName = &MetricError{"invalid name"}
ErrNoFields = &MetricError{"no serializable fields"}
)

// Serializer is a serializer for line protocol.
Expand Down Expand Up @@ -148,7 +163,7 @@ func (s *Serializer) buildFieldPair(key string, value interface{}) error {
// Some keys are not encodeable as line protocol, such as those with a
// trailing '\' or empty strings.
if key == "" {
return ErrInvalidFieldKey
return &FieldError{"invalid field key"}
}

s.pair = append(s.pair, key...)
Expand Down Expand Up @@ -182,6 +197,9 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
for _, field := range m.FieldList() {
err = s.buildFieldPair(field.Key, field.Value)
if err != nil {
log.Printf(
"D! [serializers.influx] could not serialize field %q: %v; discarding field",
field.Key, err)
continue
}

Expand Down Expand Up @@ -262,20 +280,21 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er
return appendIntField(buf, v), nil
case float64:
if math.IsNaN(v) {
return nil, ErrFieldIsNaN
return nil, &FieldError{"is NaN"}
}

if math.IsInf(v, 0) {
return nil, ErrFieldIsInf
return nil, &FieldError{"is Inf"}
}

return appendFloatField(buf, v), nil
case string:
return appendStringField(buf, v), nil
case bool:
return appendBoolField(buf, v), nil
default:
return buf, &FieldError{fmt.Sprintf("invalid value type: %T", v)}
}
return buf, ErrInvalidFieldType
}

func appendUintField(buf []byte, value uint64) []byte {
Expand Down
26 changes: 21 additions & 5 deletions plugins/serializers/influx/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package influx

import (
"bytes"
"fmt"
"io"
"log"

"github.com/influxdata/telegraf"
)
Expand Down Expand Up @@ -47,11 +49,25 @@ func (r *reader) Read(p []byte) (int, error) {
return 0, io.EOF
}

_, err := r.serializer.Write(r.buf, r.metrics[r.offset])
r.offset += 1
if err != nil {
r.buf.Reset()
return 0, err
for _, metric := range r.metrics[r.offset:] {
_, err := r.serializer.Write(r.buf, metric)
r.offset += 1
if err != nil {
r.buf.Reset()
switch err.(type) {
case *MetricError:
// Since we are serializing an array of metrics, don't fail
// the entire batch just because of one unserializable metric.
log.Printf(
"D! [serializers.influx] could not serialize metric %q: %v; discarding metric",
metric.Name(), err)
continue
default:
fmt.Println(err)
return 0, err
}
}
break
}

return r.buf.Read(p)
Expand Down
56 changes: 56 additions & 0 deletions plugins/serializers/influx/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,62 @@ func TestReader(t *testing.T) {
},
expected: []byte("cpu value=42 0\n"),
},
{
name: "continue on failed metrics",
maxLineBytes: 4096,
bufferSize: 15,
input: []telegraf.Metric{
MustMetric(
metric.New(
"",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
},
expected: []byte("cpu value=42 0\n"),
},
{
name: "last metric failed regression",
maxLineBytes: 4096,
bufferSize: 15,
input: []telegraf.Metric{
MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
MustMetric(
metric.New(
"",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
},
expected: []byte("cpu value=42 0\n"),
},
}

for _, tt := range tests {
Expand Down