Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: refactor validation and improve error messages #2021

Merged
merged 5 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ import (
"github.com/grafana/loki/pkg/util/validation"
)

const (
metricName = "logs"
)

var (
ingesterAppends = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Expand Down Expand Up @@ -209,14 +205,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesCount := 0

for _, stream := range req.Streams {
if err := d.validator.ValidateLabels(userID, stream.Labels); err != nil {
if err := d.validator.ValidateLabels(userID, stream); err != nil {
validationErr = err
continue
}

entries := make([]logproto.Entry, 0, len(stream.Entries))
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(userID, entry); err != nil {
if err := d.validator.ValidateEntry(userID, stream.Labels, entry); err != nil {
owen-d marked this conversation as resolved.
Show resolved Hide resolved
validationErr = err
continue
}
Expand All @@ -241,11 +237,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) {
// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
// Return a 429 to indicate to the client they are being rate limited
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d bytes) exceeded while adding %d lines for a total size of %d bytes", int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize))
}

const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
Expand Down
16 changes: 8 additions & 8 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func TestDistributor(t *testing.T) {
},
{
lines: 100,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (100 bytes) exceeded while adding 100 lines for a total size of 1000 bytes"),
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(100, 100, 1000)),
},
{
lines: 100,
maxLineSize: 1,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "max line size (1B) exceeded while adding (10B) size line"),
expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(1, 10, "{foo=\"bar\"}")),
},
{
lines: 100,
Expand Down Expand Up @@ -116,9 +116,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSizeMB: 10 * (1.0 / float64(bytesInMB)),
pushes: []testPush{
{bytes: 5, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10 bytes) exceeded while adding 1 lines for a total size of 6 bytes")},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 6))},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10 bytes) exceeded while adding 1 lines for a total size of 1 bytes")},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 1))},
},
},
"global strategy: limit should be evenly shared across distributors": {
Expand All @@ -128,9 +128,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSizeMB: 5 * (1.0 / float64(bytesInMB)),
pushes: []testPush{
{bytes: 3, expectedError: nil},
{bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5 bytes) exceeded while adding 1 lines for a total size of 3 bytes")},
{bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 3))},
{bytes: 2, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5 bytes) exceeded while adding 1 lines for a total size of 1 bytes")},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))},
},
},
"global strategy: burst should set to each distributor": {
Expand All @@ -140,9 +140,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSizeMB: 20 * (1.0 / float64(bytesInMB)),
pushes: []testPush{
{bytes: 15, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5 bytes) exceeded while adding 1 lines for a total size of 6 bytes")},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 6))},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5 bytes) exceeded while adding 1 lines for a total size of 1 bytes")},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))},
},
},
}
Expand Down
75 changes: 56 additions & 19 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package distributor

import (
"errors"
"net/http"
"time"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"
"github.com/weaveworks/common/httpgrpc"
"net/http"
"strings"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/util/validation"
)

Expand All @@ -27,11 +25,17 @@ func NewValidator(l Limits) (*Validator, error) {
}

// ValidateEntry returns an error if the entry is invalid
func (v Validator) ValidateEntry(userID string, entry logproto.Entry) error {
if err := cortex_validation.ValidateSample(v, userID, metricName, cortex_client.Sample{
TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond),
}); err != nil {
return err
func (v Validator) ValidateEntry(userID string, labels string, entry logproto.Entry) error {
if v.RejectOldSamples(userID) && entry.Timestamp.UnixNano() < time.Now().Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano() {
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(labels, entry.Timestamp))
}

if entry.Timestamp.UnixNano() > time.Now().Add(v.CreationGracePeriod(userID)).UnixNano() {
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(labels, entry.Timestamp))
}

if maxSize := v.MaxLineSize(userID); maxSize != 0 && len(entry.Line) > maxSize {
Expand All @@ -40,26 +44,59 @@ func (v Validator) ValidateEntry(userID string, entry logproto.Entry) error {
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, userID).Inc()
return httpgrpc.Errorf(
http.StatusBadRequest,
"max line size (%s) exceeded while adding (%s) size line",
flagext.ByteSize(uint64(maxSize)).String(),
flagext.ByteSize(uint64(len(entry.Line))).String(),
)
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels))
}

return nil
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, labels string) error {
ls, err := util.ToClientLabels(labels)
func (v Validator) ValidateLabels(userID string, stream *logproto.Stream) error {
ls, err := util.ToClientLabels(stream.Labels)
if err != nil {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
return httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
}
return cortex_validation.ValidateLabels(v, userID, ls)

numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(cortex_client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, v.MaxLabelNamesPerSeries(userID)))
}

maxLabelNameLength := v.MaxLabelNameLength(userID)
maxLabelValueLength := v.MaxLabelValueLength(userID)
lastLabelName := ""
for _, l := range ls {
if len(l.Name) > maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name))
} else if len(l.Value) > maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value))
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name))
}
lastLabelName = l.Name
}
return nil
}

func updateMetrics(reason, userID string, stream *logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(reason, userID).Add(float64(bytes))
}
154 changes: 154 additions & 0 deletions pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package distributor

import (
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/common/httpgrpc"
"net/http"
"testing"
"time"
)

var testStreamLabels = "FIXME"
var testTime = time.Now()

func TestValidator_ValidateEntry(t *testing.T) {
tests := []struct {
name string
userID string
overrides validation.TenantLimits
entry logproto.Entry
expected error
}{
{
"test valid",
"test",
nil,
logproto.Entry{Timestamp: testTime, Line: "test"},
nil,
},
{
"test too old",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
RejectOldSamples: true,
RejectOldSamplesMaxAge: 1 * time.Hour,
}
},
logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(testStreamLabels, testTime.Add(-time.Hour*5))),
},
{
"test too new",
"test",
nil,
logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(testStreamLabels, testTime.Add(time.Hour*5))),
},
{
"line too long",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
MaxLineSize: 10,
}
},
logproto.Entry{Timestamp: testTime, Line: "12345678901"},
httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(10, 11, testStreamLabels)),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &validation.Limits{}
flagext.DefaultValues(l)
o, err := validation.NewOverrides(*l, tt.overrides)
assert.NoError(t, err)
v, err := NewValidator(o)
assert.NoError(t, err)

err = v.ValidateEntry(tt.userID, testStreamLabels, tt.entry)
assert.Equal(t, tt.expected, err)
})
}
}

func TestValidator_ValidateLabels(t *testing.T) {
tests := []struct {
name string
userID string
overrides validation.TenantLimits
labels string
expected error
}{
{
"test valid",
"test",
nil,
"{foo=\"bar\"}",
nil,
},
{
"test too many labels",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{MaxLabelNamesPerSeries: 2}
},
"{foo=\"bar\",food=\"bars\",fed=\"bears\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg("{fed=\"bears\", foo=\"bar\", food=\"bars\"}", 3, 2)),
},
{
"label name too long",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
MaxLabelNamesPerSeries: 2,
MaxLabelNameLength: 5,
}
},
"{fooooo=\"bar\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg("{fooooo=\"bar\"}", "fooooo")),
},
{
"label value too long",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
MaxLabelNamesPerSeries: 2,
MaxLabelNameLength: 5,
MaxLabelValueLength: 5,
}
},
"{foo=\"barrrrrr\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg("{foo=\"barrrrrr\"}", "barrrrrr")),
},
{
"duplicate label",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
MaxLabelNamesPerSeries: 2,
MaxLabelNameLength: 5,
MaxLabelValueLength: 5,
}
},
"{foo=\"bar\", foo=\"barf\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg("{foo=\"bar\", foo=\"barf\"}", "foo")),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &validation.Limits{}
flagext.DefaultValues(l)
o, err := validation.NewOverrides(*l, tt.overrides)
assert.NoError(t, err)
v, err := NewValidator(o)
assert.NoError(t, err)

err = v.ValidateLabels(tt.userID, &logproto.Stream{Labels: tt.labels})
assert.Equal(t, tt.expected, err)
})
}
}
4 changes: 3 additions & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"context"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/util/validation"
"net/http"
"sync"
Expand Down Expand Up @@ -170,7 +171,8 @@ func (i *instance) getOrCreateStream(pushReqStream *logproto.Stream) (*stream, e
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
level.Warn(cutil.Logger).Log("message", "could not create new stream for tenant", "error", err)
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg())
}

sortedLabels := i.index.Add(labels, fp)
Expand Down
Loading