From f3f286463a7cfa9469b9d50fc118bebfce38ee43 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Thu, 3 Sep 2020 17:34:28 -0400 Subject: [PATCH] [coordinator] Validate and do not send invalid metrics to aggregator for aggregation (#2593) --- .../services/m3coordinator/ingest/write.go | 9 ++ .../m3coordinator/ingest/write_test.go | 94 +++++++++++++++++++ src/query/models/tags.go | 11 +++ src/query/models/tags_test.go | 29 ++++-- 4 files changed, 136 insertions(+), 7 deletions(-) diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index 12ff0476a8..9889c23d89 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -229,6 +229,10 @@ func (d *downsamplerAndWriter) writeToDownsampler( unit xtime.Unit, overrides WriteOptions, ) (bool, error) { + if err := tags.Validate(); err != nil { + return false, err + } + appender, err := d.downsampler.NewMetricsAppender() if err != nil { return false, err @@ -433,6 +437,11 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( appender.NextMetric() value := iter.Current() + if err := value.Tags.Validate(); err != nil { + multiErr = multiErr.Add(err) + continue + } + for _, tag := range value.Tags.Tags { appender.AddTag(tag.Name, tag.Value) } diff --git a/src/cmd/services/m3coordinator/ingest/write_test.go b/src/cmd/services/m3coordinator/ingest/write_test.go index 0af2afe6ac..43281876c1 100644 --- a/src/cmd/services/m3coordinator/ingest/write_test.go +++ b/src/cmd/services/m3coordinator/ingest/write_test.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/query/storage/m3" testm3 "github.com/m3db/m3/src/query/test/m3" "github.com/m3db/m3/src/query/ts" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xsync "github.com/m3db/m3/src/x/sync" @@ -80,6 +81,20 @@ var ( }, }, ) + testBadTags = models.NewTags(3, nil).AddTags([]models.Tag{ + { + Name: []byte("standard_tag"), + Value: []byte("standard_tag_value"), + }, + { + Name: []byte("duplicate_tag"), + Value: []byte("duplicate_tag_value0"), + }, + { + Name: []byte("duplicate_tag"), + Value: []byte("duplicate_tag_value1"), + }, + }) testDatapoints1 = []ts.Datapoint{ { @@ -214,6 +229,28 @@ func TestDownsampleAndWrite(t *testing.T) { require.NoError(t, err) } +func TestDownsampleAndWriteWithBadTags(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + downAndWrite, _, _ := newTestDownsamplerAndWriter(t, ctrl, + testDownsamplerAndWriterOptions{}) + + err := downAndWrite.Write( + context.Background(), testBadTags, testDatapoints1, xtime.Second, testAnnotation1, defaultOverride) + require.Error(t, err) + + // Make sure we get a validation error for downsample code path + // and for the raw unaggregate write code path. + multiErr, ok := err.(xerrors.MultiError) + require.True(t, ok) + require.Equal(t, 2, multiErr.NumErrors()) + // Make sure all are invalid params errors. + for _, err := range multiErr.Errors() { + require.True(t, xerrors.IsInvalidParams(err)) + } +} + func TestDownsampleAndWriteWithDownsampleOverridesAndNoMappingRules(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -457,6 +494,63 @@ func TestDownsampleAndWriteBatch(t *testing.T) { require.NoError(t, err) } +func TestDownsampleAndWriteBatchBadTags(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + downAndWrite, downsampler, session := newTestDownsamplerAndWriter(t, ctrl, + testDownsamplerAndWriterOptions{}) + + var ( + mockSamplesAppender = downsample.NewMockSamplesAppender(ctrl) + mockMetricsAppender = downsample.NewMockMetricsAppender(ctrl) + ) + + entries := []testIterEntry{ + {tags: testBadTags, datapoints: testDatapoints1, attributes: testAttributesGauge, annotation: testAnnotation1}, + {tags: testTags2, datapoints: testDatapoints2, attributes: testAttributesGauge, annotation: testAnnotation2}, + } + + // Only expect to write non-bad tags. + mockMetricsAppender. + EXPECT(). + SamplesAppender(zeroDownsamplerAppenderOpts). + Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).Times(1) + for _, tag := range testTags2.Tags { + mockMetricsAppender.EXPECT().AddTag(tag.Name, tag.Value) + } + for _, dp := range testDatapoints2 { + mockSamplesAppender.EXPECT().AppendGaugeTimedSample(dp.Timestamp, dp.Value) + } + downsampler.EXPECT().NewMetricsAppender().Return(mockMetricsAppender, nil) + + mockMetricsAppender.EXPECT().NextMetric().Times(2) + mockMetricsAppender.EXPECT().Finalize() + + // Only expect to write non-bad tags. + for _, entry := range testEntries[1:] { + for _, dp := range entry.datapoints { + session.EXPECT().WriteTagged( + gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), dp.Value, gomock.Any(), entry.annotation, + ) + } + } + + iter := newTestIter(entries) + err := downAndWrite.WriteBatch(context.Background(), iter, WriteOptions{}) + require.Error(t, err) + + // Make sure we get a validation error for downsample code path + // and for the raw unaggregate write code path. + multiErr, ok := err.(xerrors.MultiError) + require.True(t, ok) + require.Equal(t, 2, multiErr.NumErrors()) + // Make sure all are invalid params errors. + for _, err := range multiErr.Errors() { + require.True(t, xerrors.IsInvalidParams(err)) + } +} + func TestDownsampleAndWriteBatchDifferentTypes(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/src/query/models/tags.go b/src/query/models/tags.go index 17595e9396..08abde0087 100644 --- a/src/query/models/tags.go +++ b/src/query/models/tags.go @@ -28,6 +28,7 @@ import ( "strings" "github.com/m3db/m3/src/metrics/generated/proto/metricpb" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/cespare/xxhash/v2" ) @@ -258,6 +259,16 @@ func (t Tags) Normalize() Tags { // Validate will validate there are tag values, and the // tags are ordered and there are no duplicates. func (t Tags) Validate() error { + // Wrap call to validate to make sure a validation error + // is always an invalid parameters error so we return bad request + // instead of internal server error at higher in the stack. + if err := t.validate(); err != nil { + return xerrors.NewInvalidParamsError(err) + } + return nil +} + +func (t Tags) validate() error { n := t.Len() if n == 0 { return errNoTags diff --git a/src/query/models/tags_test.go b/src/query/models/tags_test.go index 0669ae44fe..310eae7191 100644 --- a/src/query/models/tags_test.go +++ b/src/query/models/tags_test.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/util/writer" + xerrors "github.com/m3db/m3/src/x/errors" xtest "github.com/m3db/m3/src/x/test" "github.com/cespare/xxhash/v2" @@ -425,13 +426,17 @@ func TestWriteTagLengthMeta(t *testing.T) { func TestTagsValidateEmptyNameQuoted(t *testing.T) { tags := NewTags(0, NewTagOptions().SetIDSchemeType(TypeQuoted)) tags = tags.AddTag(Tag{Name: []byte(""), Value: []byte("bar")}) - require.Error(t, tags.Validate()) + err := tags.Validate() + require.Error(t, err) + require.True(t, xerrors.IsInvalidParams(err)) } func TestTagsValidateEmptyValueQuoted(t *testing.T) { tags := NewTags(0, NewTagOptions().SetIDSchemeType(TypeQuoted)) tags = tags.AddTag(Tag{Name: []byte("foo"), Value: []byte("")}) - require.Error(t, tags.Validate()) + err := tags.Validate() + require.Error(t, err) + require.True(t, xerrors.IsInvalidParams(err)) } func TestTagsValidateOutOfOrderQuoted(t *testing.T) { @@ -446,7 +451,9 @@ func TestTagsValidateOutOfOrderQuoted(t *testing.T) { Value: []byte("baz"), }, } - require.Error(t, tags.Validate()) + err := tags.Validate() + require.Error(t, err) + require.True(t, xerrors.IsInvalidParams(err)) // Test fixes after normalize. tags.Normalize() @@ -467,13 +474,17 @@ func TestTagsValidateDuplicateQuoted(t *testing.T) { Name: []byte("foo"), Value: []byte("qux"), }) - require.Error(t, tags.Validate()) + err := tags.Validate() + require.Error(t, err) + require.True(t, xerrors.IsInvalidParams(err)) } func TestTagsValidateEmptyNameGraphite(t *testing.T) { tags := NewTags(0, NewTagOptions().SetIDSchemeType(TypeGraphite)) tags = tags.AddTag(Tag{Name: nil, Value: []byte("bar")}) - require.Error(t, tags.Validate()) + err := tags.Validate() + require.Error(t, err) + require.True(t, xerrors.IsInvalidParams(err)) } func TestTagsValidateOutOfOrderGraphite(t *testing.T) { @@ -488,7 +499,9 @@ func TestTagsValidateOutOfOrderGraphite(t *testing.T) { Value: []byte("bar"), }, } - require.Error(t, tags.Validate()) + err := tags.Validate() + require.Error(t, err) + require.True(t, xerrors.IsInvalidParams(err)) // Test fixes after normalize. tags.Normalize() @@ -509,7 +522,9 @@ func TestTagsValidateDuplicateGraphite(t *testing.T) { Name: graphite.TagName(1), Value: []byte("baz"), }) - require.Error(t, tags.Validate()) + err := tags.Validate() + require.Error(t, err) + require.True(t, xerrors.IsInvalidParams(err)) } func buildTags(b *testing.B, count, length int, opts TagOptions, escape bool) Tags {