Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop all unsupported Prometheus values written to the remote write endpoint #12813

Merged
merged 1 commit into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions prometheus/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,18 @@ const (
measurementTagKey = "_measurement"
)

var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not supported")
// A DroppedValuesError is returned when the prometheus write request contains
// unsupported float64 values.
type DroppedValuesError struct {
nan uint64
ninf uint64
inf uint64
}

// Error returns a descriptive error of the values dropped.
func (e DroppedValuesError) Error() string {
return fmt.Sprintf("dropped unsupported Prometheus values: [NaN = %d, +Inf = %d, -Inf = %d]", e.nan, e.inf, e.ninf)
}

// WriteRequestToPoints converts a Prometheus remote write request of time series and their
// samples into Points that can be written into Influx
Expand All @@ -41,7 +52,8 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
}
points := make([]models.Point, 0, maxPoints)

var droppedNaN error
// Track any dropped values.
var nan, inf, ninf uint64

for _, ts := range req.Timeseries {
measurement := measurementName
Expand All @@ -55,9 +67,14 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
}

for _, s := range ts.Samples {
// skip NaN values, which are valid in Prometheus
if math.IsNaN(s.Value) {
droppedNaN = ErrNaNDropped
if v := s.Value; math.IsNaN(v) {
nan++
continue
} else if math.IsInf(v, -1) {
ninf++
continue
} else if math.IsInf(v, 1) {
inf++
continue
}

Expand All @@ -68,11 +85,14 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
if err != nil {
return nil, err
}

points = append(points, p)
}
}
return points, droppedNaN

if nan+inf+ninf > 0 {
return points, DroppedValuesError{nan: nan, inf: inf, ninf: ninf}
}
return points, nil
}

// ReadRequestToInfluxStorageRequest converts a Prometheus remote read request into one using the
Expand Down
3 changes: 2 additions & 1 deletion services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,8 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
h.Logger.Info("Prom write handler", zap.Error(err))
}

if err != prometheus.ErrNaNDropped {
// Check if the error was from something other than dropping invalid values.
if _, ok := err.(prometheus.DroppedValuesError); !ok {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down
192 changes: 177 additions & 15 deletions services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,91 @@ func TestHandler_Query_CloseNotify(t *testing.T) {
}
}

// Ensure the prometheus remote write works
// Ensure the prometheus remote write works with valid values.
func TestHandler_PromWrite(t *testing.T) {
req := &remote.WriteRequest{
Timeseries: []*remote.TimeSeries{
{
Labels: []*remote.LabelPair{
{Name: "host", Value: "a"},
{Name: "region", Value: "west"},
},
Samples: []*remote.Sample{
{TimestampMs: 1, Value: 1.2},
{TimestampMs: 3, Value: 14.5},
{TimestampMs: 6, Value: 222.99},
},
},
},
}

data, err := proto.Marshal(req)
if err != nil {
t.Fatal("couldn't marshal prometheus request")
}
compressed := snappy.Encode(nil, data)

b := bytes.NewReader(compressed)
h := NewHandler(false)
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
return &meta.DatabaseInfo{}
}

var called bool
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
called = true

if got, exp := len(points), 3; got != exp {
t.Fatalf("got %d points, expected %d\n\npoints:\n%v", got, exp, points)
}

expFields := []models.Fields{
models.Fields{"value": req.Timeseries[0].Samples[0].Value},
models.Fields{"value": req.Timeseries[0].Samples[1].Value},
models.Fields{"value": req.Timeseries[0].Samples[2].Value},
}

expTS := []int64{
req.Timeseries[0].Samples[0].TimestampMs * int64(time.Millisecond),
req.Timeseries[0].Samples[1].TimestampMs * int64(time.Millisecond),
req.Timeseries[0].Samples[2].TimestampMs * int64(time.Millisecond),
}

for i, point := range points {
if got, exp := point.UnixNano(), expTS[i]; got != exp {
t.Fatalf("got time %d, expected %d\npoint:\n%v", got, exp, point)
}

exp := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
if got := point.Tags(); !reflect.DeepEqual(got, exp) {
t.Fatalf("got tags: %v, expected: %v\npoint:\n%v", got, exp, point)
}

gotFields, err := point.Fields()
if err != nil {
t.Fatal(err.Error())
}

if got, exp := gotFields, expFields[i]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got fields %v, expected %v\npoint:\n%v", got, exp, point)
}
}
return nil
}

w := httptest.NewRecorder()
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b))
if !called {
t.Fatal("WritePoints: expected call")
}

if w.Code != http.StatusNoContent {
t.Fatalf("unexpected status: %d", w.Code)
}
}

// Ensure the prometheus remote write works with invalid values.
func TestHandler_PromWrite_Dropped(t *testing.T) {
req := &remote.WriteRequest{
Timeseries: []*remote.TimeSeries{
{
Expand All @@ -553,6 +636,13 @@ func TestHandler_PromWrite(t *testing.T) {
Samples: []*remote.Sample{
{TimestampMs: 1, Value: 1.2},
{TimestampMs: 2, Value: math.NaN()},
{TimestampMs: 3, Value: 14.5},
{TimestampMs: 4, Value: math.Inf(-1)},
{TimestampMs: 5, Value: math.Inf(1)},
{TimestampMs: 6, Value: 222.99},
{TimestampMs: 7, Value: math.Inf(-1)},
{TimestampMs: 8, Value: math.Inf(1)},
{TimestampMs: 9, Value: math.Inf(1)},
},
},
},
Expand All @@ -569,26 +659,45 @@ func TestHandler_PromWrite(t *testing.T) {
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
return &meta.DatabaseInfo{}
}
called := false

var called bool
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
called = true
point := points[0]
if point.UnixNano() != int64(time.Millisecond) {
t.Fatalf("Exp point time %d but got %d", int64(time.Millisecond), point.UnixNano())

if got, exp := len(points), 3; got != exp {
t.Fatalf("got %d points, expected %d\n\npoints:\n%v", got, exp, points)
}
tags := point.Tags()
expectedTags := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
if !reflect.DeepEqual(tags, expectedTags) {
t.Fatalf("tags don't match\n\texp: %v\n\tgot: %v", expectedTags, tags)

expFields := []models.Fields{
models.Fields{"value": req.Timeseries[0].Samples[0].Value},
models.Fields{"value": req.Timeseries[0].Samples[2].Value},
models.Fields{"value": req.Timeseries[0].Samples[5].Value},
}

fields, err := point.Fields()
if err != nil {
t.Fatal(err.Error())
expTS := []int64{
req.Timeseries[0].Samples[0].TimestampMs * int64(time.Millisecond),
req.Timeseries[0].Samples[2].TimestampMs * int64(time.Millisecond),
req.Timeseries[0].Samples[5].TimestampMs * int64(time.Millisecond),
}
expFields := models.Fields{"value": 1.2}
if !reflect.DeepEqual(fields, expFields) {
t.Fatalf("fields don't match\n\texp: %v\n\tgot: %v", expFields, fields)

for i, point := range points {
if got, exp := point.UnixNano(), expTS[i]; got != exp {
t.Fatalf("got time %d, expected %d\npoint:\n%v", got, exp, point)
}

exp := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
if got := point.Tags(); !reflect.DeepEqual(got, exp) {
t.Fatalf("got tags: %v, expected: %v\npoint:\n%v", got, exp, point)
}

gotFields, err := point.Fields()
if err != nil {
t.Fatal(err.Error())
}

if got, exp := gotFields, expFields[i]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got fields %v, expected %v\npoint:\n%v", got, exp, point)
}
}
return nil
}
Expand All @@ -598,11 +707,64 @@ func TestHandler_PromWrite(t *testing.T) {
if !called {
t.Fatal("WritePoints: expected call")
}

if w.Code != http.StatusNoContent {
t.Fatalf("unexpected status: %d", w.Code)
}
}

func mustMakeBigString(sz int) string {
a := make([]byte, 0, sz)
for i := 0; i < cap(a); i++ {
a = append(a, 'a')
}
return string(a)
}

func TestHandler_PromWrite_Error(t *testing.T) {
req := &remote.WriteRequest{
Timeseries: []*remote.TimeSeries{
{
// Invalid tag key
Labels: []*remote.LabelPair{{Name: mustMakeBigString(models.MaxKeyLength), Value: "a"}},
Samples: []*remote.Sample{{TimestampMs: 1, Value: 1.2}},
},
},
}

data, err := proto.Marshal(req)
if err != nil {
t.Fatal("couldn't marshal prometheus request")
}
compressed := snappy.Encode(nil, data)

b := bytes.NewReader(compressed)
h := NewHandler(false)
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
return &meta.DatabaseInfo{}
}

var called bool
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
called = true
return nil
}

w := httptest.NewRecorder()
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b))
if w.Code != http.StatusBadRequest {
t.Fatalf("unexpected status: %d", w.Code)
}

if got, exp := strings.TrimSpace(w.Body.String()), `{"error":"max key length exceeded: 65572 \u003e 65535"}`; got != exp {
t.Fatalf("got error %q, expected %q", got, exp)
}

if called {
t.Fatal("WritePoints called but should not be")
}
}

// Ensure Prometheus remote read requests are converted to the correct InfluxQL query and
// data is returned
func TestHandler_PromRead(t *testing.T) {
Expand Down