diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index 7d68ed3df5c4d..926bfcb34b3bc 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -2,8 +2,9 @@ package influx import ( "bytes" - "errors" + "fmt" "io" + "log" "math" "sort" "strconv" @@ -26,14 +27,28 @@ const ( UintSupport FieldTypeSupport = 1 << iota ) +// MetricError is an error causing a metric to be unserializable. +type MetricError struct { + s string +} + +func (e MetricError) Error() string { + return e.s +} + +// FieldError is an error causing a field to be unserializable. +type FieldError struct { + s string +} + +func (e FieldError) Error() string { + return e.s +} + var ( - ErrNeedMoreSpace = errors.New("need more space") - ErrInvalidName = errors.New("invalid name") - ErrInvalidFieldKey = errors.New("invalid field key") - ErrInvalidFieldType = errors.New("invalid field type") - ErrFieldIsNaN = errors.New("is NaN") - ErrFieldIsInf = errors.New("is Inf") - ErrNoFields = errors.New("no fields") + ErrNeedMoreSpace = &MetricError{"need more space"} + ErrInvalidName = &MetricError{"invalid name"} + ErrNoFields = &MetricError{"no serializable fields"} ) // Serializer is a serializer for line protocol. @@ -148,7 +163,7 @@ func (s *Serializer) buildFieldPair(key string, value interface{}) error { // Some keys are not encodeable as line protocol, such as those with a // trailing '\' or empty strings. if key == "" { - return ErrInvalidFieldKey + return &FieldError{"invalid field key"} } s.pair = append(s.pair, key...) @@ -182,6 +197,9 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { for _, field := range m.FieldList() { err = s.buildFieldPair(field.Key, field.Value) if err != nil { + log.Printf( + "D! [serializers.influx] could not serialize field %q: %v; discarding field", + field.Key, err) continue } @@ -262,11 +280,11 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er return appendIntField(buf, v), nil case float64: if math.IsNaN(v) { - return nil, ErrFieldIsNaN + return nil, &FieldError{"is NaN"} } if math.IsInf(v, 0) { - return nil, ErrFieldIsInf + return nil, &FieldError{"is Inf"} } return appendFloatField(buf, v), nil @@ -274,8 +292,9 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er return appendStringField(buf, v), nil case bool: return appendBoolField(buf, v), nil + default: + return buf, &FieldError{fmt.Sprintf("invalid value type: %T", v)} } - return buf, ErrInvalidFieldType } func appendUintField(buf []byte, value uint64) []byte { diff --git a/plugins/serializers/influx/reader.go b/plugins/serializers/influx/reader.go index 590e2fafdf706..4a755c88ddd81 100644 --- a/plugins/serializers/influx/reader.go +++ b/plugins/serializers/influx/reader.go @@ -2,7 +2,9 @@ package influx import ( "bytes" + "fmt" "io" + "log" "github.com/influxdata/telegraf" ) @@ -47,11 +49,25 @@ func (r *reader) Read(p []byte) (int, error) { return 0, io.EOF } - _, err := r.serializer.Write(r.buf, r.metrics[r.offset]) - r.offset += 1 - if err != nil { - r.buf.Reset() - return 0, err + for _, metric := range r.metrics[r.offset:] { + _, err := r.serializer.Write(r.buf, metric) + r.offset += 1 + if err != nil { + r.buf.Reset() + switch err.(type) { + case *MetricError: + // Since we are serializing an array of metrics, don't fail + // the entire batch just because of one unserializable metric. + log.Printf( + "D! [serializers.influx] could not serialize metric %q: %v; discarding metric", + metric.Name(), err) + continue + default: + fmt.Println(err) + return 0, err + } + } + break } return r.buf.Read(p) diff --git a/plugins/serializers/influx/reader_test.go b/plugins/serializers/influx/reader_test.go index 5dca215950105..642b71b1cfc3e 100644 --- a/plugins/serializers/influx/reader_test.go +++ b/plugins/serializers/influx/reader_test.go @@ -83,6 +83,62 @@ func TestReader(t *testing.T) { }, expected: []byte("cpu value=42 0\n"), }, + { + name: "continue on failed metrics", + maxLineBytes: 4096, + bufferSize: 15, + input: []telegraf.Metric{ + MustMetric( + metric.New( + "", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + }, + expected: []byte("cpu value=42 0\n"), + }, + { + name: "last metric failed regression", + maxLineBytes: 4096, + bufferSize: 15, + input: []telegraf.Metric{ + MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + MustMetric( + metric.New( + "", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + }, + expected: []byte("cpu value=42 0\n"), + }, } for _, tt := range tests {