Skip to content

Commit

Permalink
feat(storage): add mean aggregate array cursor and tests (#19019)
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher M. Wolff authored Jul 23, 2020
1 parent d3c86e8 commit 8b106bc
Show file tree
Hide file tree
Showing 13 changed files with 1,221 additions and 214 deletions.
144 changes: 144 additions & 0 deletions cmd/influxd/launcher/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,149 @@ from(bucket: v.bucket)
,result,table,_value,_field,_measurement,k
,,0,67,f,m0,k0
`,
},
{
name: "bare mean",
data: []string{
"m0,k=k0,kk=kk0 f=5 0",
"m0,k=k0,kk=kk0 f=6 5000000000",
"m0,k=k0,kk=kk0 f=7 10000000000",
"m0,k=k0,kk=kk0 f=9 15000000000",
},
query: `
from(bucket: v.bucket)
|> range(start: 0)
|> mean()
|> keep(columns: ["_value"])
`,
op: "readWindow(mean)",
want: `
#datatype,string,long,double
#group,false,false,false
#default,_result,,
,result,table,_value
,,0,6.75
`,

},
{
name: "window mean",
data: []string{
"m0,k=k0 f=1i 5000000000",
"m0,k=k0 f=2i 6000000000",
"m0,k=k0 f=3i 7000000000",
"m0,k=k0 f=4i 8000000000",
"m0,k=k0 f=5i 9000000000",
"m0,k=k0 f=6i 10000000000",
"m0,k=k0 f=7i 11000000000",
"m0,k=k0 f=8i 12000000000",
"m0,k=k0 f=9i 13000000000",
"m0,k=k0 f=10i 14000000000",
"m0,k=k0 f=11i 15000000000",
"m0,k=k0 f=12i 16000000000",
"m0,k=k0 f=13i 17000000000",
"m0,k=k0 f=14i 18000000000",
"m0,k=k0 f=16i 19000000000",
"m0,k=k0 f=17i 20000000000",
},
query: `
from(bucket: v.bucket)
|> range(start: 1970-01-01T00:00:05Z, stop: 1970-01-01T00:00:20Z)
|> aggregateWindow(fn: mean, every: 5s)
|> keep(columns: ["_time", "_value"])
`,
op: "readWindow(mean)",
want: `
#datatype,string,long,dateTime:RFC3339,double
#group,false,false,false,false
#default,_result,,,
,result,table,_time,_value
,,0,1970-01-01T00:00:10Z,3
,,0,1970-01-01T00:00:15Z,8
,,0,1970-01-01T00:00:20Z,13.2
`,

},
{
name: "window mean offset",
data: []string{
"m0,k=k0 f=1i 5000000000",
"m0,k=k0 f=2i 6000000000",
"m0,k=k0 f=3i 7000000000",
"m0,k=k0 f=4i 8000000000",
"m0,k=k0 f=5i 9000000000",
"m0,k=k0 f=6i 10000000000",
"m0,k=k0 f=7i 11000000000",
"m0,k=k0 f=8i 12000000000",
"m0,k=k0 f=9i 13000000000",
"m0,k=k0 f=10i 14000000000",
"m0,k=k0 f=11i 15000000000",
"m0,k=k0 f=12i 16000000000",
"m0,k=k0 f=13i 17000000000",
"m0,k=k0 f=14i 18000000000",
"m0,k=k0 f=16i 19000000000",
"m0,k=k0 f=17i 20000000000",
},
query: `
from(bucket: v.bucket)
|> range(start: 1970-01-01T00:00:05Z, stop: 1970-01-01T00:00:20Z)
|> window(every: 5s, offset: 1s)
|> mean()
`,
op: "readWindow(mean)",
want: `
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,string,string,string,double
#group,false,false,true,true,true,true,true,false
#default,_result,,,,,,,
,result,table,_start,_stop,_field,_measurement,k,_value
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:06Z,f,m0,k0,1
,,1,1970-01-01T00:00:06Z,1970-01-01T00:00:11Z,f,m0,k0,4
,,2,1970-01-01T00:00:11Z,1970-01-01T00:00:16Z,f,m0,k0,9
,,3,1970-01-01T00:00:16Z,1970-01-01T00:00:20Z,f,m0,k0,13.75
`,

},
{
name: "window mean offset with duplicate and unwindow",
data: []string{
"m0,k=k0 f=1i 5000000000",
"m0,k=k0 f=2i 6000000000",
"m0,k=k0 f=3i 7000000000",
"m0,k=k0 f=4i 8000000000",
"m0,k=k0 f=5i 9000000000",
"m0,k=k0 f=6i 10000000000",
"m0,k=k0 f=7i 11000000000",
"m0,k=k0 f=8i 12000000000",
"m0,k=k0 f=9i 13000000000",
"m0,k=k0 f=10i 14000000000",
"m0,k=k0 f=11i 15000000000",
"m0,k=k0 f=12i 16000000000",
"m0,k=k0 f=13i 17000000000",
"m0,k=k0 f=14i 18000000000",
"m0,k=k0 f=16i 19000000000",
"m0,k=k0 f=17i 20000000000",
},
query: `
from(bucket: v.bucket)
|> range(start: 1970-01-01T00:00:05Z, stop: 1970-01-01T00:00:20Z)
|> window(every: 5s, offset: 1s)
|> mean()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> keep(columns: ["_time", "_value"])
`,
op: "readWindow(mean)",
want: `
#datatype,string,long,dateTime:RFC3339,double
#group,false,false,false,false
#default,_result,,,
,result,table,_time,_value
,,0,1970-01-01T00:00:06Z,1
,,0,1970-01-01T00:00:11Z,4
,,0,1970-01-01T00:00:16Z,9
,,0,1970-01-01T00:00:20Z,13.75
`,

},
{
name: "group first",
Expand Down Expand Up @@ -1826,6 +1969,7 @@ from(bucket: v.bucket)
l := launcher.RunTestLauncherOrFail(t, ctx, mock.NewFlagger(map[feature.Flag]interface{}{
feature.PushDownWindowAggregateCount(): true,
feature.PushDownWindowAggregateSum(): true,
feature.PushDownWindowAggregateMean(): true,
}))

l.SetupOrFail(t)
Expand Down
6 changes: 6 additions & 0 deletions query/stdlib/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ var FluxEndToEndFeatureFlags = PerTestFeatureFlagMap{
"bare_sum_push": {
"pushDownWindowAggregateSum": "true",
},
"bare_mean_push": {
"pushDownWindowAggregateMean": "true",
},
"window_mean_push": {
"pushDownWindowAggregateMean": "true",
},
"merge_filters": {
"mergeFilterRule": "true",
},
Expand Down
1 change: 1 addition & 0 deletions storage/flux/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ const (
LastKind = "last"
MinKind = "min"
MaxKind = "max"
MeanKind = "mean"
)

// isSelector returns true if given a procedure kind that represents a selector operator.
Expand Down
198 changes: 198 additions & 0 deletions storage/flux/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/execute/table"
"github.com/influxdata/flux/execute/table/static"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/values"
Expand Down Expand Up @@ -1189,6 +1191,202 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBoundsCreateEmpty(t *testing
}
}

func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{
Tags: []*gen.TagValuesSpec{
{
TagKey: "t0",
Values: func() gen.CountableSequence {
return gen.NewCounterByteSequence("a%s", 0, 1)
},
},
},
}
spec := gen.Spec{
OrgID: org,
BucketID: bucket,
Measurements: []gen.MeasurementSpec{
{
Name: "m0",
TagsSpec: tagsSpec,
FieldValuesSpec: &gen.FieldValuesSpec{
Name: "f0",
TimeSequenceSpec: gen.TimeSequenceSpec{
Count: math.MaxInt32,
Delta: 5 * time.Second,
},
DataType: models.Integer,
Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
return gen.NewTimeIntegerValuesSequence(
spec.Count,
gen.NewTimestampSequenceFromSpec(spec),
gen.NewIntegerArrayValuesSequence([]int64{1, 2, 3, 4}),
)
},
},
},
},
}
tr := gen.TimeRange{
Start: mustParseTime("2019-11-25T00:00:00Z"),
End: mustParseTime("2019-11-25T00:01:00Z"),
}
return gen.NewSeriesGeneratorFromSpec(&spec, tr), tr
})
defer reader.Close()

t.Run("unwindowed mean", func(t *testing.T) {
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
WindowEvery: math.MaxInt64,
Aggregates: []plan.ProcedureKind{
storageflux.MeanKind,
},
}, mem)
if err != nil {
t.Fatal(err)
}

want := static.Table{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.StringKey("t0", "a0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:01:00Z"),
static.Floats("_value", 2.5),
}
if diff := table.Diff(want, ti); diff != "" {
t.Fatalf("table iterators do not match; -want/+got:\n%s", diff)
}
})

t.Run("windowed mean", func(t *testing.T) {
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
WindowEvery: int64(10 * time.Second),
Aggregates: []plan.ProcedureKind{
storageflux.MeanKind,
},
}, mem)
if err != nil {
t.Fatal(err)
}

want := static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.StringKey("t0", "a0"),
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:00:10Z"),
static.Floats("_value", 1.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:10Z"),
static.TimeKey("_stop", "2019-11-25T00:00:20Z"),
static.Floats("_value", 3.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:20Z"),
static.TimeKey("_stop", "2019-11-25T00:00:30Z"),
static.Floats("_value", 1.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:30Z"),
static.TimeKey("_stop", "2019-11-25T00:00:40Z"),
static.Floats("_value", 3.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:40Z"),
static.TimeKey("_stop", "2019-11-25T00:00:50Z"),
static.Floats("_value", 1.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:50Z"),
static.TimeKey("_stop", "2019-11-25T00:01:00Z"),
static.Floats("_value", 3.5),
},
}
if diff := table.Diff(want, ti); diff != "" {
t.Fatalf("table iterators do not match; -want/+got:\n%s", diff)
}
})

t.Run("windowed mean with offset", func(t *testing.T) {
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
WindowEvery: int64(10 * time.Second),
Offset: int64(2 * time.Second),
Aggregates: []plan.ProcedureKind{
storageflux.MeanKind,
},
}, mem)
if err != nil {
t.Fatal(err)
}

want := static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.StringKey("t0", "a0"),
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:00:02Z"),
static.Floats("_value", 1.0),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:02Z"),
static.TimeKey("_stop", "2019-11-25T00:00:12Z"),
static.Floats("_value", 2.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:12Z"),
static.TimeKey("_stop", "2019-11-25T00:00:22Z"),
static.Floats("_value", 2.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:22Z"),
static.TimeKey("_stop", "2019-11-25T00:00:32Z"),
static.Floats("_value", 2.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:32Z"),
static.TimeKey("_stop", "2019-11-25T00:00:42Z"),
static.Floats("_value", 2.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:42Z"),
static.TimeKey("_stop", "2019-11-25T00:00:52Z"),
static.Floats("_value", 2.5),
},
static.Table{
static.TimeKey("_start", "2019-11-25T00:00:52Z"),
static.TimeKey("_stop", "2019-11-25T00:01:00Z"),
static.Floats("_value", 4),
},
}
if diff := table.Diff(want, ti); diff != "" {
t.Fatalf("table iterators do not match; -want/+got:\n%s", diff)
}
})
}

func TestStorageReader_ReadWindowFirst(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{
Expand Down
Loading

0 comments on commit 8b106bc

Please sign in to comment.