diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index fd2cd2204fd6e..b9b1cdf57d969 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -28,10 +28,6 @@ import ( "github.com/grafana/loki/pkg/util/validation" ) -const ( - metricName = "logs" -) - var ( ingesterAppends = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "loki", @@ -209,14 +205,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log validatedSamplesCount := 0 for _, stream := range req.Streams { - if err := d.validator.ValidateLabels(userID, stream.Labels); err != nil { + if err := d.validator.ValidateLabels(userID, stream); err != nil { validationErr = err continue } entries := make([]logproto.Entry, 0, len(stream.Entries)) for _, entry := range stream.Entries { - if err := d.validator.ValidateEntry(userID, entry); err != nil { + if err := d.validator.ValidateEntry(userID, stream.Labels, entry); err != nil { validationErr = err continue } @@ -241,11 +237,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log now := time.Now() if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) { - // Return a 4xx here to have the client discard the data and not retry. If a client - // is sending too much data consistently we will unlikely ever catch up otherwise. + // Return a 429 to indicate to the client they are being rate limited validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount)) validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize)) - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d bytes) exceeded while adding %d lines for a total size of %d bytes", int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)) } const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 827bfa1d7917c..3926e19ba57a8 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -57,13 +57,13 @@ func TestDistributor(t *testing.T) { }, { lines: 100, - expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (100 bytes) exceeded while adding 100 lines for a total size of 1000 bytes"), + expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(100, 100, 1000)), }, { lines: 100, maxLineSize: 1, expectedResponse: success, - expectedError: httpgrpc.Errorf(http.StatusBadRequest, "max line size (1B) exceeded while adding (10B) size line"), + expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(1, 10, "{foo=\"bar\"}")), }, { lines: 100, @@ -116,9 +116,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionBurstSizeMB: 10 * (1.0 / float64(bytesInMB)), pushes: []testPush{ {bytes: 5, expectedError: nil}, - {bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10 bytes) exceeded while adding 1 lines for a total size of 6 bytes")}, + {bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 6))}, {bytes: 5, expectedError: nil}, - {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10 bytes) exceeded while adding 1 lines for a total size of 1 bytes")}, + {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 1))}, }, }, "global strategy: limit should be evenly shared across distributors": { @@ -128,9 +128,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionBurstSizeMB: 5 * (1.0 / float64(bytesInMB)), pushes: []testPush{ {bytes: 3, expectedError: nil}, - {bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5 bytes) exceeded while adding 1 lines for a total size of 3 bytes")}, + {bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 3))}, {bytes: 2, expectedError: nil}, - {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5 bytes) exceeded while adding 1 lines for a total size of 1 bytes")}, + {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))}, }, }, "global strategy: burst should set to each distributor": { @@ -140,9 +140,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionBurstSizeMB: 20 * (1.0 / float64(bytesInMB)), pushes: []testPush{ {bytes: 15, expectedError: nil}, - {bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5 bytes) exceeded while adding 1 lines for a total size of 6 bytes")}, + {bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 6))}, {bytes: 5, expectedError: nil}, - {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5 bytes) exceeded while adding 1 lines for a total size of 1 bytes")}, + {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))}, }, }, } diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 25865de3e5abd..a414f6fc0f97c 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -2,16 +2,14 @@ package distributor import ( "errors" - "net/http" - "time" - cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" - cortex_validation "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/httpgrpc" + "net/http" + "strings" + "time" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/util" - "github.com/grafana/loki/pkg/util/flagext" "github.com/grafana/loki/pkg/util/validation" ) @@ -27,11 +25,17 @@ func NewValidator(l Limits) (*Validator, error) { } // ValidateEntry returns an error if the entry is invalid -func (v Validator) ValidateEntry(userID string, entry logproto.Entry) error { - if err := cortex_validation.ValidateSample(v, userID, metricName, cortex_client.Sample{ - TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond), - }); err != nil { - return err +func (v Validator) ValidateEntry(userID string, labels string, entry logproto.Entry) error { + if v.RejectOldSamples(userID) && entry.Timestamp.UnixNano() < time.Now().Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano() { + validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, userID).Inc() + validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, userID).Add(float64(len(entry.Line))) + return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(labels, entry.Timestamp)) + } + + if entry.Timestamp.UnixNano() > time.Now().Add(v.CreationGracePeriod(userID)).UnixNano() { + validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, userID).Inc() + validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, userID).Add(float64(len(entry.Line))) + return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(labels, entry.Timestamp)) } if maxSize := v.MaxLineSize(userID); maxSize != 0 && len(entry.Line) > maxSize { @@ -40,20 +44,16 @@ func (v Validator) ValidateEntry(userID string, entry logproto.Entry) error { // but the upstream cortex_validation pkg uses it, so we keep this // for parity. validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, userID).Inc() - return httpgrpc.Errorf( - http.StatusBadRequest, - "max line size (%s) exceeded while adding (%s) size line", - flagext.ByteSize(uint64(maxSize)).String(), - flagext.ByteSize(uint64(len(entry.Line))).String(), - ) + validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, userID).Add(float64(len(entry.Line))) + return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels)) } return nil } // Validate labels returns an error if the labels are invalid -func (v Validator) ValidateLabels(userID string, labels string) error { - ls, err := util.ToClientLabels(labels) +func (v Validator) ValidateLabels(userID string, stream *logproto.Stream) error { + ls, err := util.ToClientLabels(stream.Labels) if err != nil { // I wish we didn't return httpgrpc errors here as it seems // an orthogonal concept (we need not use ValidateLabels in this context) @@ -61,5 +61,42 @@ func (v Validator) ValidateLabels(userID string, labels string) error { // for parity. return httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err) } - return cortex_validation.ValidateLabels(v, userID, ls) + + numLabelNames := len(ls) + if numLabelNames > v.MaxLabelNamesPerSeries(userID) { + validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc() + bytes := 0 + for _, e := range stream.Entries { + bytes += len(e.Line) + } + validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Add(float64(bytes)) + return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(cortex_client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, v.MaxLabelNamesPerSeries(userID))) + } + + maxLabelNameLength := v.MaxLabelNameLength(userID) + maxLabelValueLength := v.MaxLabelValueLength(userID) + lastLabelName := "" + for _, l := range ls { + if len(l.Name) > maxLabelNameLength { + updateMetrics(validation.LabelNameTooLong, userID, stream) + return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name)) + } else if len(l.Value) > maxLabelValueLength { + updateMetrics(validation.LabelValueTooLong, userID, stream) + return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value)) + } else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 { + updateMetrics(validation.DuplicateLabelNames, userID, stream) + return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name)) + } + lastLabelName = l.Name + } + return nil +} + +func updateMetrics(reason, userID string, stream *logproto.Stream) { + validation.DiscardedSamples.WithLabelValues(reason, userID).Inc() + bytes := 0 + for _, e := range stream.Entries { + bytes += len(e.Line) + } + validation.DiscardedBytes.WithLabelValues(reason, userID).Add(float64(bytes)) } diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go new file mode 100644 index 0000000000000..94daeeb44f15a --- /dev/null +++ b/pkg/distributor/validator_test.go @@ -0,0 +1,154 @@ +package distributor + +import ( + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/util/validation" + "github.com/stretchr/testify/assert" + "github.com/weaveworks/common/httpgrpc" + "net/http" + "testing" + "time" +) + +var testStreamLabels = "FIXME" +var testTime = time.Now() + +func TestValidator_ValidateEntry(t *testing.T) { + tests := []struct { + name string + userID string + overrides validation.TenantLimits + entry logproto.Entry + expected error + }{ + { + "test valid", + "test", + nil, + logproto.Entry{Timestamp: testTime, Line: "test"}, + nil, + }, + { + "test too old", + "test", + func(userID string) *validation.Limits { + return &validation.Limits{ + RejectOldSamples: true, + RejectOldSamplesMaxAge: 1 * time.Hour, + } + }, + logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"}, + httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(testStreamLabels, testTime.Add(-time.Hour*5))), + }, + { + "test too new", + "test", + nil, + logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"}, + httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(testStreamLabels, testTime.Add(time.Hour*5))), + }, + { + "line too long", + "test", + func(userID string) *validation.Limits { + return &validation.Limits{ + MaxLineSize: 10, + } + }, + logproto.Entry{Timestamp: testTime, Line: "12345678901"}, + httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(10, 11, testStreamLabels)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &validation.Limits{} + flagext.DefaultValues(l) + o, err := validation.NewOverrides(*l, tt.overrides) + assert.NoError(t, err) + v, err := NewValidator(o) + assert.NoError(t, err) + + err = v.ValidateEntry(tt.userID, testStreamLabels, tt.entry) + assert.Equal(t, tt.expected, err) + }) + } +} + +func TestValidator_ValidateLabels(t *testing.T) { + tests := []struct { + name string + userID string + overrides validation.TenantLimits + labels string + expected error + }{ + { + "test valid", + "test", + nil, + "{foo=\"bar\"}", + nil, + }, + { + "test too many labels", + "test", + func(userID string) *validation.Limits { + return &validation.Limits{MaxLabelNamesPerSeries: 2} + }, + "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", + httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg("{fed=\"bears\", foo=\"bar\", food=\"bars\"}", 3, 2)), + }, + { + "label name too long", + "test", + func(userID string) *validation.Limits { + return &validation.Limits{ + MaxLabelNamesPerSeries: 2, + MaxLabelNameLength: 5, + } + }, + "{fooooo=\"bar\"}", + httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg("{fooooo=\"bar\"}", "fooooo")), + }, + { + "label value too long", + "test", + func(userID string) *validation.Limits { + return &validation.Limits{ + MaxLabelNamesPerSeries: 2, + MaxLabelNameLength: 5, + MaxLabelValueLength: 5, + } + }, + "{foo=\"barrrrrr\"}", + httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg("{foo=\"barrrrrr\"}", "barrrrrr")), + }, + { + "duplicate label", + "test", + func(userID string) *validation.Limits { + return &validation.Limits{ + MaxLabelNamesPerSeries: 2, + MaxLabelNameLength: 5, + MaxLabelValueLength: 5, + } + }, + "{foo=\"bar\", foo=\"barf\"}", + httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg("{foo=\"bar\", foo=\"barf\"}", "foo")), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &validation.Limits{} + flagext.DefaultValues(l) + o, err := validation.NewOverrides(*l, tt.overrides) + assert.NoError(t, err) + v, err := NewValidator(o) + assert.NoError(t, err) + + err = v.ValidateLabels(tt.userID, &logproto.Stream{Labels: tt.labels}) + assert.Equal(t, tt.expected, err) + }) + } +} diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 5104457c67db4..b43502f2e1ed4 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -2,6 +2,7 @@ package ingester import ( "context" + "github.com/go-kit/kit/log/level" "github.com/grafana/loki/pkg/util/validation" "net/http" "sync" @@ -170,7 +171,8 @@ func (i *instance) getOrCreateStream(pushReqStream *logproto.Stream) (*stream, e bytes += len(e.Line) } validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error()) + level.Warn(cutil.Logger).Log("message", "could not create new stream for tenant", "error", err) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg()) } sortedLabels := i.index.Add(labels, fp) diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 97f54caa15e20..c286485374f10 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -1,18 +1,43 @@ package validation -import "github.com/prometheus/client_golang/prometheus" +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "time" +) const ( discardReasonLabel = "reason" // RateLimited is one of the values for the reason to discard samples. // Declared here to avoid duplication in ingester and distributor. - RateLimited = "rate_limited" + RateLimited = "rate_limited" + rateLimitErrorMsg = "Ingestion rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, reduce log volume or contact your Loki administrator to see if the limit can be increased" // LineTooLong is a reason for discarding too long log lines. - LineTooLong = "line_too_long" + LineTooLong = "line_too_long" + lineTooLongErrorMsg = "Max entry size '%d' bytes exceeded for stream '%s' while adding an entry with length '%d' bytes" // StreamLimit is a reason for discarding lines when we can't create a new stream // because the limit of active streams has been reached. - StreamLimit = "stream_limit" + StreamLimit = "stream_limit" + streamLimitErrorMsg = "Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased" + // GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age` + GreaterThanMaxSampleAge = "greater_than_max_sample_age" + greaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v" + // TooFarInFuture is a reason for discarding log lines which are newer than the current time + `creation_grace_period` + TooFarInFuture = "too_far_in_future" + tooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v" + // MaxLabelNamesPerSeries is a reason for discarding a log line which has too many label names + MaxLabelNamesPerSeries = "max_label_names_per_series" + maxLabelNamesPerSeriesErrorMsg = "entry for stream '%s' has %d label names; limit %d" + // LabelNameTooLong is a reason for discarding a log line which has a label name too long + LabelNameTooLong = "label_name_too_long" + labelNameTooLongErrorMsg = "stream '%s' has label name too long: '%s'" + // LabelValueTooLong is a reason for discarding a log line which has a lable value too long + LabelValueTooLong = "label_value_too_long" + labelValueTooLongErrorMsg = "stream '%s' has label value too long: '%s'" + // DuplicateLabelNames is a reason for discarding a log line which has duplicate label names + DuplicateLabelNames = "duplicate_label_names" + duplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'" ) // DiscardedBytes is a metric of the total discarded bytes, by reason. @@ -38,3 +63,48 @@ var DiscardedSamples = prometheus.NewCounterVec( func init() { prometheus.MustRegister(DiscardedSamples, DiscardedBytes) } + +// RateLimitedErrorMsg returns an error string for rate limited requests +func RateLimitedErrorMsg(limit, lines, bytes int) string { + return fmt.Sprintf(rateLimitErrorMsg, limit, lines, bytes) +} + +// LineTooLongErrorMsg returns an error string for a line which is too long +func LineTooLongErrorMsg(maxLength, entryLength int, stream string) string { + return fmt.Sprintf(lineTooLongErrorMsg, maxLength, stream, entryLength) +} + +// StreamLimitErrorMsg returns an error string for requests refused for exceeding active stream limits +func StreamLimitErrorMsg() string { + return fmt.Sprintf(streamLimitErrorMsg) +} + +// GreaterThanMaxSampleAgeErrorMsg returns an error string for a line with a timestamp too old +func GreaterThanMaxSampleAgeErrorMsg(stream string, timestamp time.Time) string { + return fmt.Sprintf(greaterThanMaxSampleAgeErrorMsg, stream, timestamp) +} + +// TooFarInFutureErrorMsg returns an error string for a line with a timestamp too far in the future +func TooFarInFutureErrorMsg(stream string, timestamp time.Time) string { + return fmt.Sprintf(tooFarInFutureErrorMsg, stream, timestamp) +} + +// MaxLabelNamesPerSeriesErrorMsg returns an error string for a stream with too many labels +func MaxLabelNamesPerSeriesErrorMsg(stream string, labelCount, labelLimit int) string { + return fmt.Sprintf(maxLabelNamesPerSeriesErrorMsg, stream, labelCount, labelLimit) +} + +// LabelNameTooLongErrorMsg returns an error string for a stream with a label name too long +func LabelNameTooLongErrorMsg(stream, label string) string { + return fmt.Sprintf(labelNameTooLongErrorMsg, stream, label) +} + +// LabelValueTooLongErrorMsg returns an error string for a stream with a label value too long +func LabelValueTooLongErrorMsg(stream, labelValue string) string { + return fmt.Sprintf(labelValueTooLongErrorMsg, stream, labelValue) +} + +// DuplicateLabelNamesErrorMsg returns an error string for a stream which has duplicate labels +func DuplicateLabelNamesErrorMsg(stream, label string) string { + return fmt.Sprintf(duplicateLabelNamesErrorMsg, stream, label) +}