Skip to content

Commit

Permalink
[coordinator] Validate and do not send invalid metrics to aggregator …
Browse files Browse the repository at this point in the history
…for aggregation (#2593)
  • Loading branch information
robskillington authored Sep 3, 2020
1 parent 225a0c4 commit f3f2864
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 7 deletions.
9 changes: 9 additions & 0 deletions src/cmd/services/m3coordinator/ingest/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func (d *downsamplerAndWriter) writeToDownsampler(
unit xtime.Unit,
overrides WriteOptions,
) (bool, error) {
if err := tags.Validate(); err != nil {
return false, err
}

appender, err := d.downsampler.NewMetricsAppender()
if err != nil {
return false, err
Expand Down Expand Up @@ -433,6 +437,11 @@ func (d *downsamplerAndWriter) writeAggregatedBatch(
appender.NextMetric()

value := iter.Current()
if err := value.Tags.Validate(); err != nil {
multiErr = multiErr.Add(err)
continue
}

for _, tag := range value.Tags.Tags {
appender.AddTag(tag.Name, tag.Value)
}
Expand Down
94 changes: 94 additions & 0 deletions src/cmd/services/m3coordinator/ingest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/query/storage/m3"
testm3 "github.com/m3db/m3/src/query/test/m3"
"github.com/m3db/m3/src/query/ts"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xsync "github.com/m3db/m3/src/x/sync"
Expand Down Expand Up @@ -80,6 +81,20 @@ var (
},
},
)
testBadTags = models.NewTags(3, nil).AddTags([]models.Tag{
{
Name: []byte("standard_tag"),
Value: []byte("standard_tag_value"),
},
{
Name: []byte("duplicate_tag"),
Value: []byte("duplicate_tag_value0"),
},
{
Name: []byte("duplicate_tag"),
Value: []byte("duplicate_tag_value1"),
},
})

testDatapoints1 = []ts.Datapoint{
{
Expand Down Expand Up @@ -214,6 +229,28 @@ func TestDownsampleAndWrite(t *testing.T) {
require.NoError(t, err)
}

func TestDownsampleAndWriteWithBadTags(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

downAndWrite, _, _ := newTestDownsamplerAndWriter(t, ctrl,
testDownsamplerAndWriterOptions{})

err := downAndWrite.Write(
context.Background(), testBadTags, testDatapoints1, xtime.Second, testAnnotation1, defaultOverride)
require.Error(t, err)

// Make sure we get a validation error for downsample code path
// and for the raw unaggregate write code path.
multiErr, ok := err.(xerrors.MultiError)
require.True(t, ok)
require.Equal(t, 2, multiErr.NumErrors())
// Make sure all are invalid params errors.
for _, err := range multiErr.Errors() {
require.True(t, xerrors.IsInvalidParams(err))
}
}

func TestDownsampleAndWriteWithDownsampleOverridesAndNoMappingRules(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -457,6 +494,63 @@ func TestDownsampleAndWriteBatch(t *testing.T) {
require.NoError(t, err)
}

func TestDownsampleAndWriteBatchBadTags(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

downAndWrite, downsampler, session := newTestDownsamplerAndWriter(t, ctrl,
testDownsamplerAndWriterOptions{})

var (
mockSamplesAppender = downsample.NewMockSamplesAppender(ctrl)
mockMetricsAppender = downsample.NewMockMetricsAppender(ctrl)
)

entries := []testIterEntry{
{tags: testBadTags, datapoints: testDatapoints1, attributes: testAttributesGauge, annotation: testAnnotation1},
{tags: testTags2, datapoints: testDatapoints2, attributes: testAttributesGauge, annotation: testAnnotation2},
}

// Only expect to write non-bad tags.
mockMetricsAppender.
EXPECT().
SamplesAppender(zeroDownsamplerAppenderOpts).
Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).Times(1)
for _, tag := range testTags2.Tags {
mockMetricsAppender.EXPECT().AddTag(tag.Name, tag.Value)
}
for _, dp := range testDatapoints2 {
mockSamplesAppender.EXPECT().AppendGaugeTimedSample(dp.Timestamp, dp.Value)
}
downsampler.EXPECT().NewMetricsAppender().Return(mockMetricsAppender, nil)

mockMetricsAppender.EXPECT().NextMetric().Times(2)
mockMetricsAppender.EXPECT().Finalize()

// Only expect to write non-bad tags.
for _, entry := range testEntries[1:] {
for _, dp := range entry.datapoints {
session.EXPECT().WriteTagged(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), dp.Value, gomock.Any(), entry.annotation,
)
}
}

iter := newTestIter(entries)
err := downAndWrite.WriteBatch(context.Background(), iter, WriteOptions{})
require.Error(t, err)

// Make sure we get a validation error for downsample code path
// and for the raw unaggregate write code path.
multiErr, ok := err.(xerrors.MultiError)
require.True(t, ok)
require.Equal(t, 2, multiErr.NumErrors())
// Make sure all are invalid params errors.
for _, err := range multiErr.Errors() {
require.True(t, xerrors.IsInvalidParams(err))
}
}

func TestDownsampleAndWriteBatchDifferentTypes(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
11 changes: 11 additions & 0 deletions src/query/models/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"

"github.com/m3db/m3/src/metrics/generated/proto/metricpb"
xerrors "github.com/m3db/m3/src/x/errors"

"github.com/cespare/xxhash/v2"
)
Expand Down Expand Up @@ -258,6 +259,16 @@ func (t Tags) Normalize() Tags {
// Validate will validate there are tag values, and the
// tags are ordered and there are no duplicates.
func (t Tags) Validate() error {
// Wrap call to validate to make sure a validation error
// is always an invalid parameters error so we return bad request
// instead of internal server error at higher in the stack.
if err := t.validate(); err != nil {
return xerrors.NewInvalidParamsError(err)
}
return nil
}

func (t Tags) validate() error {
n := t.Len()
if n == 0 {
return errNoTags
Expand Down
29 changes: 22 additions & 7 deletions src/query/models/tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/util/writer"
xerrors "github.com/m3db/m3/src/x/errors"
xtest "github.com/m3db/m3/src/x/test"

"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -425,13 +426,17 @@ func TestWriteTagLengthMeta(t *testing.T) {
func TestTagsValidateEmptyNameQuoted(t *testing.T) {
tags := NewTags(0, NewTagOptions().SetIDSchemeType(TypeQuoted))
tags = tags.AddTag(Tag{Name: []byte(""), Value: []byte("bar")})
require.Error(t, tags.Validate())
err := tags.Validate()
require.Error(t, err)
require.True(t, xerrors.IsInvalidParams(err))
}

func TestTagsValidateEmptyValueQuoted(t *testing.T) {
tags := NewTags(0, NewTagOptions().SetIDSchemeType(TypeQuoted))
tags = tags.AddTag(Tag{Name: []byte("foo"), Value: []byte("")})
require.Error(t, tags.Validate())
err := tags.Validate()
require.Error(t, err)
require.True(t, xerrors.IsInvalidParams(err))
}

func TestTagsValidateOutOfOrderQuoted(t *testing.T) {
Expand All @@ -446,7 +451,9 @@ func TestTagsValidateOutOfOrderQuoted(t *testing.T) {
Value: []byte("baz"),
},
}
require.Error(t, tags.Validate())
err := tags.Validate()
require.Error(t, err)
require.True(t, xerrors.IsInvalidParams(err))

// Test fixes after normalize.
tags.Normalize()
Expand All @@ -467,13 +474,17 @@ func TestTagsValidateDuplicateQuoted(t *testing.T) {
Name: []byte("foo"),
Value: []byte("qux"),
})
require.Error(t, tags.Validate())
err := tags.Validate()
require.Error(t, err)
require.True(t, xerrors.IsInvalidParams(err))
}

func TestTagsValidateEmptyNameGraphite(t *testing.T) {
tags := NewTags(0, NewTagOptions().SetIDSchemeType(TypeGraphite))
tags = tags.AddTag(Tag{Name: nil, Value: []byte("bar")})
require.Error(t, tags.Validate())
err := tags.Validate()
require.Error(t, err)
require.True(t, xerrors.IsInvalidParams(err))
}

func TestTagsValidateOutOfOrderGraphite(t *testing.T) {
Expand All @@ -488,7 +499,9 @@ func TestTagsValidateOutOfOrderGraphite(t *testing.T) {
Value: []byte("bar"),
},
}
require.Error(t, tags.Validate())
err := tags.Validate()
require.Error(t, err)
require.True(t, xerrors.IsInvalidParams(err))

// Test fixes after normalize.
tags.Normalize()
Expand All @@ -509,7 +522,9 @@ func TestTagsValidateDuplicateGraphite(t *testing.T) {
Name: graphite.TagName(1),
Value: []byte("baz"),
})
require.Error(t, tags.Validate())
err := tags.Validate()
require.Error(t, err)
require.True(t, xerrors.IsInvalidParams(err))
}

func buildTags(b *testing.B, count, length int, opts TagOptions, escape bool) Tags {
Expand Down

0 comments on commit f3f2864

Please sign in to comment.