From ffe52d3d9c2b6d758e94a6cb60f75756c5bbb541 Mon Sep 17 00:00:00 2001 From: Shreyas Srivatsan Date: Mon, 15 Jun 2020 09:14:47 -0700 Subject: [PATCH] Address comments --- .../services/m3coordinator/ingest/write.go | 33 ++++++++++++------- .../m3coordinator/ingest/write_test.go | 26 +++++++++------ src/query/api/experimental/annotated/iter.go | 32 ++++++++++++------ .../api/experimental/annotated/iter_test.go | 10 +++--- src/query/api/v1/handler/influxdb/write.go | 22 ++++++++++--- .../api/v1/handler/influxdb/write_test.go | 14 ++++---- .../api/v1/handler/prometheus/remote/write.go | 31 +++++++++++------ 7 files changed, 110 insertions(+), 58 deletions(-) diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index 33dffabf4f..24ab402c90 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -45,15 +45,24 @@ var ( } ) +// IterValue is the value returned by the iterator. +type IterValue struct { + Tags models.Tags + Datapoints ts.Datapoints + Attributes ts.SeriesAttributes + Unit xtime.Unit + Metadata ts.Metadata + Annotation []byte +} + // DownsampleAndWriteIter is an interface that can be implemented to use // the WriteBatch method. type DownsampleAndWriteIter interface { Next() bool - Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte) + Current() IterValue Reset() error Error() error SetCurrentMetadata(ts.Metadata) - CurrentMetadata() ts.Metadata } // DownsamplerAndWriter is the interface for the downsamplerAndWriter which @@ -377,8 +386,8 @@ func (d *downsamplerAndWriter) WriteBatch( } for iter.Next() { - tags, datapoints, _, unit, annotation := iter.Current() - if metadata := iter.CurrentMetadata(); metadata.DropUnaggregated { + value := iter.Current() + if value.Metadata.DropUnaggregated { d.metrics.dropped.Inc(1) continue } @@ -387,10 +396,10 @@ func (d *downsamplerAndWriter) WriteBatch( wg.Add(1) d.workerPool.Go(func() { err := d.writeWithOptions(ctx, storage.WriteQueryOptions{ - Tags: tags, - Datapoints: datapoints, - Unit: unit, - Annotation: annotation, + Tags: value.Tags, + Datapoints: value.Datapoints, + Unit: value.Unit, + Annotation: value.Annotation, Attributes: storageAttributesFromPolicy(p), }) if err != nil { @@ -424,8 +433,8 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( for iter.Next() { appender.Reset() - tags, datapoints, info, _, _ := iter.Current() - for _, tag := range tags.Tags { + value := iter.Current() + for _, tag := range value.Tags.Tags { appender.AddTag(tag.Name, tag.Value) } @@ -449,8 +458,8 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( iter.SetCurrentMetadata(ts.Metadata{DropUnaggregated: true}) } - for _, dp := range datapoints { - switch info.Type { + for _, dp := range value.Datapoints { + switch value.Attributes.Type { case ts.MetricTypeGauge: err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) case ts.MetricTypeCounter: diff --git a/src/cmd/services/m3coordinator/ingest/write_test.go b/src/cmd/services/m3coordinator/ingest/write_test.go index e6d768dac2..8b046797cc 100644 --- a/src/cmd/services/m3coordinator/ingest/write_test.go +++ b/src/cmd/services/m3coordinator/ingest/write_test.go @@ -164,13 +164,26 @@ func (i *testIter) Next() bool { return i.idx < len(i.entries) } -func (i *testIter) Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte) { +func (i *testIter) Current() IterValue { if len(i.entries) == 0 || i.idx < 0 || i.idx >= len(i.entries) { - return models.EmptyTags(), nil, ts.DefaultSeriesAttributes(), 0, nil + return IterValue{ + Tags: models.EmptyTags(), + Attributes: ts.DefaultSeriesAttributes(), + } } curr := i.entries[i.idx] - return curr.tags, curr.datapoints, curr.attributes, xtime.Second, curr.annotation + value := IterValue{ + Tags: curr.tags, + Datapoints: curr.datapoints, + Attributes: curr.attributes, + Unit: xtime.Second, + Annotation: curr.annotation, + } + if i.idx < len(i.metadatas) { + value.Metadata = i.metadatas[i.idx] + } + return value } func (i *testIter) Reset() error { @@ -186,13 +199,6 @@ func (i *testIter) SetCurrentMetadata(metadata ts.Metadata) { i.metadatas[i.idx] = metadata } -func (i *testIter) CurrentMetadata() ts.Metadata { - if len(i.metadatas) == 0 { - return ts.Metadata{} - } - return i.metadatas[i.idx] -} - func TestDownsampleAndWrite(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/src/query/api/experimental/annotated/iter.go b/src/query/api/experimental/annotated/iter.go index 80fe7d1baa..2d408df81e 100644 --- a/src/query/api/experimental/annotated/iter.go +++ b/src/query/api/experimental/annotated/iter.go @@ -29,6 +29,12 @@ import ( xtime "github.com/m3db/m3/src/x/time" ) +var defaultValue = ingest.IterValue{ + Tags: models.EmptyTags(), + Attributes: ts.DefaultSeriesAttributes(), + Metadata: ts.Metadata{}, +} + type datapoint struct { ts.Datapoint @@ -81,12 +87,22 @@ func (i *iter) Next() bool { return i.idx < len(i.tags) } -func (i *iter) Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte) { +func (i *iter) Current() ingest.IterValue { if len(i.tags) == 0 || i.idx < 0 || i.idx >= len(i.tags) { - return models.EmptyTags(), nil, ts.DefaultSeriesAttributes(), 0, nil + return defaultValue } curr := i.datapoints[i.idx] - return i.tags[i.idx], ts.Datapoints{curr.Datapoint}, ts.DefaultSeriesAttributes(), xtime.Millisecond, curr.annotation + value := ingest.IterValue{ + Tags: i.tags[i.idx], + Datapoints: ts.Datapoints{curr.Datapoint}, + Attributes: ts.DefaultSeriesAttributes(), + Unit: xtime.Millisecond, + Annotation: curr.annotation, + } + if i.idx < len(i.metadatas) { + value.Metadata = i.metadatas[i.idx] + } + return value } func (i *iter) Reset() error { @@ -102,12 +118,8 @@ func (i *iter) SetCurrentMetadata(metadata ts.Metadata) { if len(i.metadatas) == 0 { i.metadatas = make([]ts.Metadata, len(i.tags)) } - i.metadatas[i.idx] = metadata -} - -func (i *iter) CurrentMetadata() ts.Metadata { - if len(i.metadatas) == 0 { - return ts.Metadata{} + if i.idx < 0 || i.idx >= len(i.metadatas) { + return } - return i.metadatas[i.idx] + i.metadatas[i.idx] = metadata } diff --git a/src/query/api/experimental/annotated/iter_test.go b/src/query/api/experimental/annotated/iter_test.go index 369785fab1..69b7e4bdf9 100644 --- a/src/query/api/experimental/annotated/iter_test.go +++ b/src/query/api/experimental/annotated/iter_test.go @@ -155,11 +155,11 @@ func TestIter(t *testing.T) { func testOutput(t *testing.T, iter *iter, want iterOutput) { require.True(t, iter.Next()) - tags, datapoints, _, unit, annotation := iter.Current() - assert.Equal(t, want.tags, tags) - assert.Equal(t, want.datapoints, datapoints) - assert.Equal(t, want.unit, unit) - assert.Equal(t, want.annotation, annotation) + value := iter.Current() + assert.Equal(t, want.tags, value.Tags) + assert.Equal(t, want.datapoints, value.Datapoints) + assert.Equal(t, want.unit, value.Unit) + assert.Equal(t, want.annotation, value.Annotation) } type iterOutput struct { diff --git a/src/query/api/v1/handler/influxdb/write.go b/src/query/api/v1/handler/influxdb/write.go index d56bf0ea49..e69178cfb5 100644 --- a/src/query/api/v1/handler/influxdb/write.go +++ b/src/query/api/v1/handler/influxdb/write.go @@ -51,6 +51,12 @@ const ( InfluxWriteHTTPMethod = http.MethodPost ) +var defaultValue = ingest.IterValue{ + Tags: models.EmptyTags(), + Attributes: ts.DefaultSeriesAttributes(), + Metadata: ts.Metadata{}, +} + type ingestWriteHandler struct { handlerOpts options.HandlerOptions tagOpts models.TagOptions @@ -219,7 +225,7 @@ func determineTimeUnit(t time.Time) xtime.Unit { return xtime.Nanosecond } -func (ii *ingestIterator) Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte) { +func (ii *ingestIterator) Current() ingest.IterValue { if ii.pointIndex < len(ii.points) && ii.nextFieldIndex > 0 && len(ii.fields) > (ii.nextFieldIndex-1) { point := ii.points[ii.pointIndex] field := ii.fields[ii.nextFieldIndex-1] @@ -227,10 +233,18 @@ func (ii *ingestIterator) Current() (models.Tags, ts.Datapoints, ts.SeriesAttrib t := point.Time() - return tags, []ts.Datapoint{ts.Datapoint{Timestamp: t, - Value: field.value}}, ts.DefaultSeriesAttributes(), determineTimeUnit(t), nil + value := ingest.IterValue{ + Tags: tags, + Datapoints: []ts.Datapoint{ts.Datapoint{Timestamp: t, Value: field.value}}, + Attributes: ts.DefaultSeriesAttributes(), + Unit: determineTimeUnit(t), + } + if ii.pointIndex < len(ii.metadatas) { + value.Metadata = ii.metadatas[ii.pointIndex] + } + return value } - return models.EmptyTags(), nil, ts.DefaultSeriesAttributes(), 0, nil + return defaultValue } func (ii *ingestIterator) Reset() error { diff --git a/src/query/api/v1/handler/influxdb/write_test.go b/src/query/api/v1/handler/influxdb/write_test.go index e67862d810..49f9ad5fef 100644 --- a/src/query/api/v1/handler/influxdb/write_test.go +++ b/src/query/api/v1/handler/influxdb/write_test.go @@ -35,10 +35,10 @@ import ( // they are easiest for human to handle func (self *ingestIterator) pop(t *testing.T) string { if self.Next() { - tags, dp, _, _, _ := self.Current() - assert.Equal(t, 1, len(dp)) + value := self.Current() + assert.Equal(t, 1, len(value.Datapoints)) - return fmt.Sprintf("%s %v %s", tags.String(), dp[0].Value, dp[0].Timestamp) + return fmt.Sprintf("%s %v %s", value.Tags.String(), value.Datapoints[0].Value, value.Datapoints[0].Timestamp) } return "" } @@ -108,14 +108,14 @@ func TestIngestIteratorIssue2125(t *testing.T) { require.NoError(t, iter.Error()) assert.True(t, iter.Next()) - t1, _, _, _, _ := iter.Current() + value1 := iter.Current() assert.True(t, iter.Next()) - t2, _, _, _, _ := iter.Current() + value2 := iter.Current() require.NoError(t, iter.Error()) - assert.Equal(t, t1.String(), "__name__: measure_k1, lab: foo") - assert.Equal(t, t2.String(), "__name__: measure_k2, lab: foo") + assert.Equal(t, value1.Tags.String(), "__name__: measure_k1, lab: foo") + assert.Equal(t, value2.Tags.String(), "__name__: measure_k2, lab: foo") } func TestDetermineTimeUnit(t *testing.T) { diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index d4df6d50fd..ed1fba43aa 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -86,6 +86,12 @@ var ( Forever: &defaultForwardingRetryForever, Jitter: &defaultForwardingRetryJitter, } + + defaultValue = ingest.IterValue{ + Tags: models.EmptyTags(), + Attributes: ts.DefaultSeriesAttributes(), + Metadata: ts.Metadata{}, + } ) // PromWriteHandler represents a handler for prometheus write endpoint. @@ -585,12 +591,21 @@ func (i *promTSIter) Next() bool { return i.idx < len(i.tags) } -func (i *promTSIter) Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte) { +func (i *promTSIter) Current() ingest.IterValue { if len(i.tags) == 0 || i.idx < 0 || i.idx >= len(i.tags) { - return models.EmptyTags(), nil, ts.DefaultSeriesAttributes(), 0, nil + return defaultValue } - return i.tags[i.idx], i.datapoints[i.idx], i.attributes[i.idx], xtime.Millisecond, nil + value := ingest.IterValue{ + Tags: i.tags[i.idx], + Datapoints: i.datapoints[i.idx], + Attributes: ts.DefaultSeriesAttributes(), + Unit: xtime.Millisecond, + } + if i.idx < len(i.metadatas) { + value.Metadata = i.metadatas[i.idx] + } + return value } func (i *promTSIter) Reset() error { @@ -606,12 +621,8 @@ func (i *promTSIter) SetCurrentMetadata(metadata ts.Metadata) { if len(i.metadatas) == 0 { i.metadatas = make([]ts.Metadata, len(i.tags)) } - i.metadatas[i.idx] = metadata -} - -func (i *promTSIter) CurrentMetadata() ts.Metadata { - if len(i.metadatas) == 0 { - return ts.Metadata{} + if i.idx < 0 || i.idx >= len(i.metadatas) { + return } - return i.metadatas[i.idx] + i.metadatas[i.idx] = metadata }