Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan committed Jun 15, 2020
1 parent 82f1d79 commit ffe52d3
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 58 deletions.
33 changes: 21 additions & 12 deletions src/cmd/services/m3coordinator/ingest/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,24 @@ var (
}
)

// IterValue is the value returned by the iterator.
type IterValue struct {
Tags models.Tags
Datapoints ts.Datapoints
Attributes ts.SeriesAttributes
Unit xtime.Unit
Metadata ts.Metadata
Annotation []byte
}

// DownsampleAndWriteIter is an interface that can be implemented to use
// the WriteBatch method.
type DownsampleAndWriteIter interface {
Next() bool
Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte)
Current() IterValue
Reset() error
Error() error
SetCurrentMetadata(ts.Metadata)
CurrentMetadata() ts.Metadata
}

// DownsamplerAndWriter is the interface for the downsamplerAndWriter which
Expand Down Expand Up @@ -377,8 +386,8 @@ func (d *downsamplerAndWriter) WriteBatch(
}

for iter.Next() {
tags, datapoints, _, unit, annotation := iter.Current()
if metadata := iter.CurrentMetadata(); metadata.DropUnaggregated {
value := iter.Current()
if value.Metadata.DropUnaggregated {
d.metrics.dropped.Inc(1)
continue
}
Expand All @@ -387,10 +396,10 @@ func (d *downsamplerAndWriter) WriteBatch(
wg.Add(1)
d.workerPool.Go(func() {
err := d.writeWithOptions(ctx, storage.WriteQueryOptions{
Tags: tags,
Datapoints: datapoints,
Unit: unit,
Annotation: annotation,
Tags: value.Tags,
Datapoints: value.Datapoints,
Unit: value.Unit,
Annotation: value.Annotation,
Attributes: storageAttributesFromPolicy(p),
})
if err != nil {
Expand Down Expand Up @@ -424,8 +433,8 @@ func (d *downsamplerAndWriter) writeAggregatedBatch(

for iter.Next() {
appender.Reset()
tags, datapoints, info, _, _ := iter.Current()
for _, tag := range tags.Tags {
value := iter.Current()
for _, tag := range value.Tags.Tags {
appender.AddTag(tag.Name, tag.Value)
}

Expand All @@ -449,8 +458,8 @@ func (d *downsamplerAndWriter) writeAggregatedBatch(
iter.SetCurrentMetadata(ts.Metadata{DropUnaggregated: true})
}

for _, dp := range datapoints {
switch info.Type {
for _, dp := range value.Datapoints {
switch value.Attributes.Type {
case ts.MetricTypeGauge:
err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value)
case ts.MetricTypeCounter:
Expand Down
26 changes: 16 additions & 10 deletions src/cmd/services/m3coordinator/ingest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,26 @@ func (i *testIter) Next() bool {
return i.idx < len(i.entries)
}

func (i *testIter) Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte) {
func (i *testIter) Current() IterValue {
if len(i.entries) == 0 || i.idx < 0 || i.idx >= len(i.entries) {
return models.EmptyTags(), nil, ts.DefaultSeriesAttributes(), 0, nil
return IterValue{
Tags: models.EmptyTags(),
Attributes: ts.DefaultSeriesAttributes(),
}
}

curr := i.entries[i.idx]
return curr.tags, curr.datapoints, curr.attributes, xtime.Second, curr.annotation
value := IterValue{
Tags: curr.tags,
Datapoints: curr.datapoints,
Attributes: curr.attributes,
Unit: xtime.Second,
Annotation: curr.annotation,
}
if i.idx < len(i.metadatas) {
value.Metadata = i.metadatas[i.idx]
}
return value
}

func (i *testIter) Reset() error {
Expand All @@ -186,13 +199,6 @@ func (i *testIter) SetCurrentMetadata(metadata ts.Metadata) {
i.metadatas[i.idx] = metadata
}

func (i *testIter) CurrentMetadata() ts.Metadata {
if len(i.metadatas) == 0 {
return ts.Metadata{}
}
return i.metadatas[i.idx]
}

func TestDownsampleAndWrite(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
32 changes: 22 additions & 10 deletions src/query/api/experimental/annotated/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ import (
xtime "github.com/m3db/m3/src/x/time"
)

var defaultValue = ingest.IterValue{
Tags: models.EmptyTags(),
Attributes: ts.DefaultSeriesAttributes(),
Metadata: ts.Metadata{},
}

type datapoint struct {
ts.Datapoint

Expand Down Expand Up @@ -81,12 +87,22 @@ func (i *iter) Next() bool {
return i.idx < len(i.tags)
}

func (i *iter) Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte) {
func (i *iter) Current() ingest.IterValue {
if len(i.tags) == 0 || i.idx < 0 || i.idx >= len(i.tags) {
return models.EmptyTags(), nil, ts.DefaultSeriesAttributes(), 0, nil
return defaultValue
}
curr := i.datapoints[i.idx]
return i.tags[i.idx], ts.Datapoints{curr.Datapoint}, ts.DefaultSeriesAttributes(), xtime.Millisecond, curr.annotation
value := ingest.IterValue{
Tags: i.tags[i.idx],
Datapoints: ts.Datapoints{curr.Datapoint},
Attributes: ts.DefaultSeriesAttributes(),
Unit: xtime.Millisecond,
Annotation: curr.annotation,
}
if i.idx < len(i.metadatas) {
value.Metadata = i.metadatas[i.idx]
}
return value
}

func (i *iter) Reset() error {
Expand All @@ -102,12 +118,8 @@ func (i *iter) SetCurrentMetadata(metadata ts.Metadata) {
if len(i.metadatas) == 0 {
i.metadatas = make([]ts.Metadata, len(i.tags))
}
i.metadatas[i.idx] = metadata
}

func (i *iter) CurrentMetadata() ts.Metadata {
if len(i.metadatas) == 0 {
return ts.Metadata{}
if i.idx < 0 || i.idx >= len(i.metadatas) {
return
}
return i.metadatas[i.idx]
i.metadatas[i.idx] = metadata
}
10 changes: 5 additions & 5 deletions src/query/api/experimental/annotated/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ func TestIter(t *testing.T) {
func testOutput(t *testing.T, iter *iter, want iterOutput) {
require.True(t, iter.Next())

tags, datapoints, _, unit, annotation := iter.Current()
assert.Equal(t, want.tags, tags)
assert.Equal(t, want.datapoints, datapoints)
assert.Equal(t, want.unit, unit)
assert.Equal(t, want.annotation, annotation)
value := iter.Current()
assert.Equal(t, want.tags, value.Tags)
assert.Equal(t, want.datapoints, value.Datapoints)
assert.Equal(t, want.unit, value.Unit)
assert.Equal(t, want.annotation, value.Annotation)
}

type iterOutput struct {
Expand Down
22 changes: 18 additions & 4 deletions src/query/api/v1/handler/influxdb/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ const (
InfluxWriteHTTPMethod = http.MethodPost
)

var defaultValue = ingest.IterValue{
Tags: models.EmptyTags(),
Attributes: ts.DefaultSeriesAttributes(),
Metadata: ts.Metadata{},
}

type ingestWriteHandler struct {
handlerOpts options.HandlerOptions
tagOpts models.TagOptions
Expand Down Expand Up @@ -219,18 +225,26 @@ func determineTimeUnit(t time.Time) xtime.Unit {
return xtime.Nanosecond
}

func (ii *ingestIterator) Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte) {
func (ii *ingestIterator) Current() ingest.IterValue {
if ii.pointIndex < len(ii.points) && ii.nextFieldIndex > 0 && len(ii.fields) > (ii.nextFieldIndex-1) {
point := ii.points[ii.pointIndex]
field := ii.fields[ii.nextFieldIndex-1]
tags := copyTagsWithNewName(ii.tags, field.name)

t := point.Time()

return tags, []ts.Datapoint{ts.Datapoint{Timestamp: t,
Value: field.value}}, ts.DefaultSeriesAttributes(), determineTimeUnit(t), nil
value := ingest.IterValue{
Tags: tags,
Datapoints: []ts.Datapoint{ts.Datapoint{Timestamp: t, Value: field.value}},
Attributes: ts.DefaultSeriesAttributes(),
Unit: determineTimeUnit(t),
}
if ii.pointIndex < len(ii.metadatas) {
value.Metadata = ii.metadatas[ii.pointIndex]
}
return value
}
return models.EmptyTags(), nil, ts.DefaultSeriesAttributes(), 0, nil
return defaultValue
}

func (ii *ingestIterator) Reset() error {
Expand Down
14 changes: 7 additions & 7 deletions src/query/api/v1/handler/influxdb/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
// they are easiest for human to handle
func (self *ingestIterator) pop(t *testing.T) string {
if self.Next() {
tags, dp, _, _, _ := self.Current()
assert.Equal(t, 1, len(dp))
value := self.Current()
assert.Equal(t, 1, len(value.Datapoints))

return fmt.Sprintf("%s %v %s", tags.String(), dp[0].Value, dp[0].Timestamp)
return fmt.Sprintf("%s %v %s", value.Tags.String(), value.Datapoints[0].Value, value.Datapoints[0].Timestamp)
}
return ""
}
Expand Down Expand Up @@ -108,14 +108,14 @@ func TestIngestIteratorIssue2125(t *testing.T) {
require.NoError(t, iter.Error())

assert.True(t, iter.Next())
t1, _, _, _, _ := iter.Current()
value1 := iter.Current()

assert.True(t, iter.Next())
t2, _, _, _, _ := iter.Current()
value2 := iter.Current()
require.NoError(t, iter.Error())

assert.Equal(t, t1.String(), "__name__: measure_k1, lab: foo")
assert.Equal(t, t2.String(), "__name__: measure_k2, lab: foo")
assert.Equal(t, value1.Tags.String(), "__name__: measure_k1, lab: foo")
assert.Equal(t, value2.Tags.String(), "__name__: measure_k2, lab: foo")
}

func TestDetermineTimeUnit(t *testing.T) {
Expand Down
31 changes: 21 additions & 10 deletions src/query/api/v1/handler/prometheus/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ var (
Forever: &defaultForwardingRetryForever,
Jitter: &defaultForwardingRetryJitter,
}

defaultValue = ingest.IterValue{
Tags: models.EmptyTags(),
Attributes: ts.DefaultSeriesAttributes(),
Metadata: ts.Metadata{},
}
)

// PromWriteHandler represents a handler for prometheus write endpoint.
Expand Down Expand Up @@ -585,12 +591,21 @@ func (i *promTSIter) Next() bool {
return i.idx < len(i.tags)
}

func (i *promTSIter) Current() (models.Tags, ts.Datapoints, ts.SeriesAttributes, xtime.Unit, []byte) {
func (i *promTSIter) Current() ingest.IterValue {
if len(i.tags) == 0 || i.idx < 0 || i.idx >= len(i.tags) {
return models.EmptyTags(), nil, ts.DefaultSeriesAttributes(), 0, nil
return defaultValue
}

return i.tags[i.idx], i.datapoints[i.idx], i.attributes[i.idx], xtime.Millisecond, nil
value := ingest.IterValue{
Tags: i.tags[i.idx],
Datapoints: i.datapoints[i.idx],
Attributes: ts.DefaultSeriesAttributes(),
Unit: xtime.Millisecond,
}
if i.idx < len(i.metadatas) {
value.Metadata = i.metadatas[i.idx]
}
return value
}

func (i *promTSIter) Reset() error {
Expand All @@ -606,12 +621,8 @@ func (i *promTSIter) SetCurrentMetadata(metadata ts.Metadata) {
if len(i.metadatas) == 0 {
i.metadatas = make([]ts.Metadata, len(i.tags))
}
i.metadatas[i.idx] = metadata
}

func (i *promTSIter) CurrentMetadata() ts.Metadata {
if len(i.metadatas) == 0 {
return ts.Metadata{}
if i.idx < 0 || i.idx >= len(i.metadatas) {
return
}
return i.metadatas[i.idx]
i.metadatas[i.idx] = metadata
}

0 comments on commit ffe52d3

Please sign in to comment.