From 0d3d76a18280cab4e88dd7f829eeedd5a647579a Mon Sep 17 00:00:00 2001 From: Matt Tracy Date: Thu, 5 Apr 2018 13:47:18 -0700 Subject: [PATCH] ts: Initial TimeSeries TestModel. Creates a new "testmodel" package under "ts" which contains an in-memory test model designed to be much more conceptually simple than the optimized query engine actually used by time series. The new test model stores time series in sorted arrays as full-fidelity nanosecond data points. It then uses a highly decomposed set of functions to process that data; these functions have been designed to be individually tested. The "Query" method shows an example of how simple this system is when compared to the actual time series system, which uses iterators and breaks up queries into multiple chunks. This model has not yet been hooked up to time series tests; it will soon be used to replace the existing test model, which is *not* easier to understand than the actual time series database. --- pkg/ts/testmodel/data.go | 199 ++++++++++++++++ pkg/ts/testmodel/data_test.go | 430 ++++++++++++++++++++++++++++++++++ pkg/ts/testmodel/db.go | 147 ++++++++++++ pkg/ts/testmodel/db_test.go | 200 ++++++++++++++++ pkg/ts/testmodel/functions.go | 99 ++++++++ 5 files changed, 1075 insertions(+) create mode 100644 pkg/ts/testmodel/data.go create mode 100644 pkg/ts/testmodel/data_test.go create mode 100644 pkg/ts/testmodel/db.go create mode 100644 pkg/ts/testmodel/db_test.go create mode 100644 pkg/ts/testmodel/functions.go diff --git a/pkg/ts/testmodel/data.go b/pkg/ts/testmodel/data.go new file mode 100644 index 000000000000..2992d0eb7432 --- /dev/null +++ b/pkg/ts/testmodel/data.go @@ -0,0 +1,199 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "math" + "sort" +) + +// DataPoint represents a single point in a time series. It is a timestamp/value +// pair. +type DataPoint struct { + timestamp int64 + value float64 +} + +// dp is a shorthand function for constructing a DataPoint, used for convenience +// in tests. +func dp(timestamp int64, value float64) DataPoint { + return DataPoint{ + timestamp: timestamp, + value: value, + } +} + +// DataSeries represents a series of data points ordered by timestamp. +type DataSeries []DataPoint + +func (data DataSeries) Len() int { return len(data) } +func (data DataSeries) Swap(i, j int) { data[i], data[j] = data[j], data[i] } +func (data DataSeries) Less(i, j int) bool { return data[i].timestamp < data[j].timestamp } + +func normalizeTime(time, resolution int64) int64 { + return time - time%resolution +} + +// timeSlice returns the set of the dataPoints from the supplied series with +// timestamps that fall in the interval [start, end) (not inclusive of end +// timestamp). +func (data DataSeries) timeSlice(start, end int64) DataSeries { + startIdx := sort.Search(len(data), func(i int) bool { + return data[i].timestamp >= start + }) + endIdx := sort.Search(len(data), func(i int) bool { + return end <= data[i].timestamp + }) + + result := data[startIdx:endIdx] + if len(result) == 0 { + return nil + } + return result +} + +// groupByResolution aggregates data points in the given series into time +// buckets based on the provided resolution. +func (data DataSeries) groupByResolution(resolution int64, aggFunc aggFunc) DataSeries { + if len(data) == 0 { + return nil + } + + start := normalizeTime(data[0].timestamp, resolution) + end := normalizeTime(data[len(data)-1].timestamp, resolution) + result := make(DataSeries, 0, (end-start)/resolution+1) + + for len(data) > 0 { + bucketTime := normalizeTime(data[0].timestamp, resolution) + // Grab the index of the first data point which does not belong to the same + // bucket as the start data point. + bucketEndIdx := sort.Search(len(data), func(idx int) bool { + return normalizeTime(data[idx].timestamp, resolution) > bucketTime + }) + // Compute the next point as an aggregate of all underlying points which + // go in the same bucket. + result = append(result, dp(bucketTime, aggFunc(data[:bucketEndIdx]))) + data = data[bucketEndIdx:] + } + + return result +} + +// fillForResolution is used to fill in gaps in the provided data based on the +// provided resolution and fill function; any gaps longer than the resolution +// size will be eligible for fill. This is intended to be called on data sets +// that have been generated using groupByResolution, and may have unexpected +// results otherwise. +func (data DataSeries) fillForResolution(resolution int64, fillFunc fillFunc) DataSeries { + if len(data) < 2 { + return data + } + + result := make(DataSeries, 0, len(data)) + result = append(result, data[0]) + for i := 1; i < len(data); i++ { + if data[i].timestamp-data[i-1].timestamp > resolution { + result = append(result, fillFunc(data[:i], data[i:], resolution)...) + } + result = append(result, data[i]) + } + + return result +} + +// rateOfChange returns the rate of change (over the supplied period) for each +// point in the supplied series, which is defined as: +// (value - valuePrev) / ((time - timePrev) / period) +// The returned series will be shorter than the original series by one, since +// the rate of change for the first datapoint cannot be computed in this +// fashion. +func (data DataSeries) rateOfChange(period int64) DataSeries { + if len(data) < 2 { + return nil + } + + result := make(DataSeries, len(data)-1) + for i := 1; i < len(data); i++ { + result[i-1] = dp( + data[i].timestamp, + (data[i].value-data[i-1].value)/float64((data[i].timestamp-data[i-1].timestamp)/period), + ) + } + return result +} + +// nonNegative replaces any values less than zero with a zero. +func (data DataSeries) nonNegative() DataSeries { + result := make(DataSeries, len(data)) + for i := 0; i < len(data); i++ { + if data[i].value >= 0 { + result[i] = data[i] + } else { + result[i] = dp(data[i].timestamp, 0) + } + } + return result +} + +// groupSeriesByTimestamp returns a single DataSeries by aggregating DataPoints +// with matching timestamps from the supplied set of series. +func groupSeriesByTimestamp(datas []DataSeries, aggFunc aggFunc) DataSeries { + if len(datas) == 0 { + return nil + } + + results := make(DataSeries, 0) + dataPointsToAggregate := make(DataSeries, 0, len(datas)) + for { + // Filter empty data series. + origDatas := datas + datas = datas[:0] + for _, data := range origDatas { + if len(data) > 0 { + datas = append(datas, data) + } + } + if len(datas) == 0 { + break + } + + // Create a slice of datapoints which share the earliest timestamp of any + // datapoint across all collections. If the data series are all perfectly + // aligned (same length and timestamps), then this will just be he first + // data point in each series. + earliestTime := int64(math.MaxInt64) + for _, data := range datas { + if data[0].timestamp < earliestTime { + // New earliest timestamp found, discard any points which were + // previously in the collection. + dataPointsToAggregate = dataPointsToAggregate[:0] + earliestTime = data[0].timestamp + } + if data[0].timestamp == earliestTime { + // Data point matches earliest timestamp, add it to current datapoint + // collection. + dataPointsToAggregate = append(dataPointsToAggregate, data[0]) + } + } + results = append(results, dp(earliestTime, aggFunc(dataPointsToAggregate))) + for i := range datas { + if datas[i][0].timestamp == earliestTime { + datas[i] = datas[i][1:] + } + } + } + + return results +} diff --git a/pkg/ts/testmodel/data_test.go b/pkg/ts/testmodel/data_test.go new file mode 100644 index 000000000000..d02c56a3915d --- /dev/null +++ b/pkg/ts/testmodel/data_test.go @@ -0,0 +1,430 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "reflect" + "testing" +) + +func TestDataSeriesTimeSlice(t *testing.T) { + testData := DataSeries{ + dp(1, 5), + dp(2, 5), + dp(8, 10), + dp(12, 10), + dp(17, 5), + dp(34, 50), + dp(35, 75), + dp(40, 10), + dp(49, 10), + dp(100, 500), + dp(109, 50), + dp(115, 99), + } + + for _, tc := range []struct { + start int64 + end int64 + expected DataSeries + }{ + { + start: 0, + end: 120, + expected: testData, + }, + { + start: 0, + end: 0, + expected: nil, + }, + { + start: 1000, + end: 1001, + expected: nil, + }, + { + start: 1, + end: 2, + expected: testData[:1], + }, + { + start: 15, + end: 49, + expected: DataSeries{ + dp(17, 5), + dp(34, 50), + dp(35, 75), + dp(40, 10), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := testData.timeSlice(tc.start, tc.end) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("time slice got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesGroupByResolution(t *testing.T) { + // Each test uses the same input data. The input data includes several scenarios + // for missing/present data, such as gaps of different sizes and irregular + // recording patterns + testData := DataSeries{ + dp(1, 5), + dp(2, 5), + dp(8, 10), + dp(12, 10), + dp(17, 5), + dp(34, 50), + dp(35, 75), + dp(40, 10), + dp(41, 10), + dp(42, 10), + dp(43, 10), + dp(44, 10), + dp(45, 10), + dp(46, 10), + dp(47, 10), + dp(48, 10), + dp(49, 10), + dp(100, 500), + dp(109, 50), + dp(115, 99), + } + + for _, tc := range []struct { + resolution int64 + aggFunc aggFunc + expected DataSeries + }{ + // Group by 10 second resolution, aggregate add. + { + resolution: 10, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(0, 20), + dp(10, 15), + dp(30, 125), + dp(40, 100), + dp(100, 550), + dp(110, 99), + }, + }, + // Group by 10 second resolution, aggregate last. + { + resolution: 10, + aggFunc: aggFuncLast, + expected: DataSeries{ + dp(0, 10), + dp(10, 5), + dp(30, 75), + dp(40, 10), + dp(100, 50), + dp(110, 99), + }, + }, + // Group by 20 second resolution, aggregate last. + { + resolution: 20, + aggFunc: aggFuncLast, + expected: DataSeries{ + dp(0, 5), + dp(20, 75), + dp(40, 10), + dp(100, 99), + }, + }, + // Group by 100 second resolution, aggregate add. + { + resolution: 100, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(0, 260), + dp(100, 649), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := testData.groupByResolution(tc.resolution, tc.aggFunc) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("group by resolution got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesFillByResolution(t *testing.T) { + testData := DataSeries{ + dp(0, 10), + dp(10, 5), + dp(30, 75), + dp(40, 10), + dp(100, 70), + dp(110, 99), + } + for _, tc := range []struct { + resolution int64 + fillFunc fillFunc + expected DataSeries + }{ + // fill function that does nothing + { + resolution: 10, + fillFunc: func(_ DataSeries, _ DataSeries, _ int64) DataSeries { + return nil + }, + expected: DataSeries{ + dp(0, 10), + dp(10, 5), + dp(30, 75), + dp(40, 10), + dp(100, 70), + dp(110, 99), + }, + }, + // fill function that returns a constant value + { + resolution: 10, + fillFunc: func(before DataSeries, _ DataSeries, res int64) DataSeries { + return DataSeries{ + dp(before[len(before)-1].timestamp+res, 777), + } + }, + expected: DataSeries{ + dp(0, 10), + dp(10, 5), + dp(20, 777), + dp(30, 75), + dp(40, 10), + dp(50, 777), + dp(100, 70), + dp(110, 99), + }, + }, + // interpolation fill function + { + resolution: 10, + fillFunc: fillFuncLinearInterpolate, + expected: DataSeries{ + dp(0, 10), + dp(10, 5), + dp(20, 40), + dp(30, 75), + dp(40, 10), + dp(50, 20), + dp(60, 30), + dp(70, 40), + dp(80, 50), + dp(90, 60), + dp(100, 70), + dp(110, 99), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := testData.fillForResolution(tc.resolution, tc.fillFunc) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("fill by resolution got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesRateOfChange(t *testing.T) { + testData := DataSeries{ + dp(0, 10), + dp(10, 5), + dp(30, 75), + dp(40, 10), + dp(100, 70), + dp(110, 99), + } + for _, tc := range []struct { + period int64 + expected DataSeries + }{ + { + period: 10, + expected: DataSeries{ + dp(10, -5), + dp(30, 35), + dp(40, -65), + dp(100, 10), + dp(110, 29), + }, + }, + { + period: 1, + expected: DataSeries{ + dp(10, -0.5), + dp(30, 3.5), + dp(40, -6.5), + dp(100, 1), + dp(110, 2.9), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := testData.rateOfChange(tc.period) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("rate of change got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesNonNegative(t *testing.T) { + for _, tc := range []struct { + input DataSeries + expected DataSeries + }{ + { + input: DataSeries{ + dp(10, -5), + dp(30, 35), + dp(40, -65), + dp(100, 10), + dp(110, 29), + }, + expected: DataSeries{ + dp(10, 0), + dp(30, 35), + dp(40, 0), + dp(100, 10), + dp(110, 29), + }, + }, + { + input: DataSeries{ + dp(10, -0.5), + }, + expected: DataSeries{ + dp(10, 0), + }, + }, + { + input: DataSeries{ + dp(10, 0.001), + }, + expected: DataSeries{ + dp(10, 0.001), + }, + }, + { + input: DataSeries{}, + expected: DataSeries{}, + }, + } { + t.Run("", func(t *testing.T) { + results := tc.input.nonNegative() + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("rate of change got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesGroupByTimestamp(t *testing.T) { + testData1 := DataSeries{ + dp(10, 1), + dp(20, 1), + dp(30, 1), + dp(140, 1), + dp(777, 1), + } + testData2 := DataSeries{ + dp(10, 2), + dp(20, 2), + dp(30, 2), + dp(140, 2), + dp(777, 2), + } + testDataStaggered := DataSeries{ + dp(10, 5), + dp(25, 5), + dp(30, 5), + dp(141, 5), + dp(888, 5), + } + + for _, tc := range []struct { + inputs []DataSeries + aggFunc aggFunc + expected DataSeries + }{ + { + inputs: nil, + aggFunc: aggFuncSum, + expected: nil, + }, + { + inputs: []DataSeries{testData1}, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(10, 1), + dp(20, 1), + dp(30, 1), + dp(140, 1), + dp(777, 1), + }, + }, + { + inputs: []DataSeries{testData1, testData2}, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(10, 3), + dp(20, 3), + dp(30, 3), + dp(140, 3), + dp(777, 3), + }, + }, + { + inputs: []DataSeries{testData1, testData2}, + aggFunc: aggFuncAvg, + expected: DataSeries{ + dp(10, 1.5), + dp(20, 1.5), + dp(30, 1.5), + dp(140, 1.5), + dp(777, 1.5), + }, + }, + { + inputs: []DataSeries{testData1, testData2, testDataStaggered}, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(10, 8), + dp(20, 3), + dp(25, 5), + dp(30, 8), + dp(140, 3), + dp(141, 5), + dp(777, 3), + dp(888, 5), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := groupSeriesByTimestamp(tc.inputs, tc.aggFunc) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("rate of change got %v, wanted %v", a, e) + } + }) + } +} diff --git a/pkg/ts/testmodel/db.go b/pkg/ts/testmodel/db.go new file mode 100644 index 000000000000..529c4c1c9530 --- /dev/null +++ b/pkg/ts/testmodel/db.go @@ -0,0 +1,147 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "fmt" + "sort" + "time" + + "github.com/cockroachdb/cockroach/pkg/ts/tspb" +) + +// ModelDB is a purely in-memory model of CockroachDB's time series database, +// where time series can be stored and queried. +type ModelDB struct { + data map[string]DataSeries + metricNameToDataSources map[string]map[string]struct{} +} + +// NewModelDB instantiates a new ModelDB instance. +func NewModelDB() *ModelDB { + return &ModelDB{ + data: make(map[string]DataSeries), + metricNameToDataSources: make(map[string]map[string]struct{}), + } +} + +// Record stores the given data series for the supplied metric and data source, +// merging it with any previously recorded data for the same series. +func (mdb *ModelDB) Record(metricName, dataSource string, data DataSeries) { + dataSources, ok := mdb.metricNameToDataSources[metricName] + if !ok { + dataSources = make(map[string]struct{}) + mdb.metricNameToDataSources[metricName] = dataSources + } + dataSources[dataSource] = struct{}{} + + seriesName := seriesName(metricName, dataSource) + mdb.data[seriesName] = append(mdb.data[seriesName], data...) + sort.Sort(mdb.data[seriesName]) +} + +// Query retrieves aggregated data from the model database in the same way that +// data is currently queried from CockroachDB's time series database. Each query +// has a named metric, an optional set of sources, and a number of specified +// aggregation options: +// +// + A downsampler function, which is used to group series by resolution +// + An aggregation function, which is used to group multiples series by +// timestamp +// + A derivative option, which transforms the returned series into a rate of +// change. +// +// Each query has a sample duration (determines the length of the group-by-time +// interval), a start and end time, and an 'interpolation limit' which is a +// maximum gap size above which missing data is not filled. When fills are +// performed, linear interpolation is always used. +func (mdb *ModelDB) Query( + name string, + sources []string, + downsample, agg *tspb.TimeSeriesQueryAggregator, + derivative *tspb.TimeSeriesQueryDerivative, + sampleDuration, start, end, interpolationLimit int64, +) DataSeries { + // If explicit sources were not specified, use every source currently + // available for this particular metric. + if len(sources) == 0 { + sourceMap, ok := mdb.metricNameToDataSources[name] + if !ok { + return nil + } + sources = make([]string, 0, len(sourceMap)) + for k := range sourceMap { + sources = append(sources, k) + } + } + + queryData := make([]DataSeries, 0, len(sources)) + for _, source := range sources { + queryData = append(queryData, mdb.getSeriesData(name, source)) + } + + // Process data according to query parameters. + adjustedStart := normalizeTime(start, sampleDuration) - interpolationLimit + adjustedEnd := normalizeTime(end, sampleDuration) + interpolationLimit + for i := range queryData { + data := queryData[i] + + // Slice to relevant period. + data = data.timeSlice(adjustedStart, adjustedEnd) + + // Group by resolution according to the provided sampleDuration. + data = data.groupByResolution(sampleDuration, getAggFunction(*downsample)) + + // Fill in missing data points using linear interpolation. + data = data.fillForResolution( + sampleDuration, + func(before DataSeries, after DataSeries, res int64) DataSeries { + // Do not fill if this gap exceeds the interpolation limit. + start := before[len(before)-1] + end := after[0] + if end.timestamp-start.timestamp > interpolationLimit { + return nil + } + + return fillFuncLinearInterpolate(before, after, res) + }, + ) + + // Convert series to its rate-of-change if specified. + if *derivative != tspb.TimeSeriesQueryDerivative_NONE { + data = data.rateOfChange(time.Second.Nanoseconds()) + if *derivative == tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE { + data = data.nonNegative() + } + } + + queryData[i] = data + } + + return groupSeriesByTimestamp(queryData, getAggFunction(*agg)).timeSlice(start, end) +} + +func (mdb *ModelDB) getSeriesData(metricName, dataSource string) []DataPoint { + seriesName := seriesName(metricName, dataSource) + data, ok := mdb.data[seriesName] + if !ok { + return nil + } + return data +} + +func seriesName(metricName, dataSource string) string { + return fmt.Sprintf("%s$$%s", metricName, dataSource) +} diff --git a/pkg/ts/testmodel/db_test.go b/pkg/ts/testmodel/db_test.go new file mode 100644 index 000000000000..79d6822547eb --- /dev/null +++ b/pkg/ts/testmodel/db_test.go @@ -0,0 +1,200 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ts/tspb" +) + +func TestModelDBQuery(t *testing.T) { + db := NewModelDB() + db.Record("testmetric", "source1", DataSeries{ + dp(0, 0.0), + dp(100, 100.0), + dp(200, 200.0), + dp(400, 400.0), + }) + db.Record("testmetric", "source2", DataSeries{ + dp(0, 0.0), + dp(103, 50.0), + dp(199, 150.0), + dp(205, 400.0), + dp(301, 600.0), + dp(425, 800.0), + }) + db.Record("othermetric", "source1", DataSeries{ + dp(150, 10000), + dp(250, 10000), + dp(600, 5000), + }) + + for _, tc := range []struct { + seriesName string + sources []string + downsampler tspb.TimeSeriesQueryAggregator + aggregator tspb.TimeSeriesQueryAggregator + derivative tspb.TimeSeriesQueryDerivative + sampleDuration int64 + start int64 + end int64 + interpolationLimit int64 + expected DataSeries + }{ + // Basic Query + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 200.0), + dp(200, 600.0), + dp(300, 900.0), + dp(400, 1200.0), + }, + }, + // Different downsampler + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_MAX, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 250.0), + dp(200, 600.0), + dp(300, 900.0), + dp(400, 1200.0), + }, + }, + // Different aggregator + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_MAX, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 100.0), + dp(200, 400.0), + dp(300, 600.0), + dp(400, 800.0), + }, + }, + // Single-source Query + { + seriesName: "testmetric", + sources: []string{"source2"}, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 100.0), + dp(200, 400.0), + dp(300, 600.0), + dp(400, 800.0), + }, + }, + // Limited time. + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 200, + end: 400, + interpolationLimit: 10000, + expected: DataSeries{ + dp(200, 600.0), + dp(300, 900.0), + }, + }, + // No interpolation. + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 0, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 200.0), + dp(200, 600.0), + dp(300, 600.0), + dp(400, 1200.0), + }, + }, + // No data. + { + seriesName: "wrongmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: nil, + }, + } { + t.Run("", func(t *testing.T) { + result := db.Query( + tc.seriesName, + tc.sources, + tc.downsampler.Enum(), + tc.aggregator.Enum(), + tc.derivative.Enum(), + tc.sampleDuration, + tc.start, + tc.end, + tc.interpolationLimit, + ) + if a, e := result, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("query got result %v, wanted %v", a, e) + } + }) + } +} diff --git a/pkg/ts/testmodel/functions.go b/pkg/ts/testmodel/functions.go new file mode 100644 index 000000000000..69a5e7541427 --- /dev/null +++ b/pkg/ts/testmodel/functions.go @@ -0,0 +1,99 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "fmt" + "math" + + "github.com/cockroachdb/cockroach/pkg/ts/tspb" +) + +type aggFunc func(DataSeries) float64 +type fillFunc func(DataSeries, DataSeries, int64) DataSeries + +func aggFuncSum(data DataSeries) float64 { + total := 0.0 + for _, dp := range data { + total += dp.value + } + return total +} + +func aggFuncLast(data DataSeries) float64 { + return data[len(data)-1].value +} + +func aggFuncAvg(data DataSeries) float64 { + if len(data) == 0 { + return 0.0 + } + return aggFuncSum(data) / float64(len(data)) +} + +func aggFuncMax(data DataSeries) float64 { + max := -math.MaxFloat64 + for _, dp := range data { + if dp.value > max { + max = dp.value + } + } + return max +} + +func aggFuncMin(data DataSeries) float64 { + min := math.MaxFloat64 + for _, dp := range data { + if dp.value < min { + min = dp.value + } + } + return min +} + +// getAggFunction is a convenience method used to process an aggregator option +// from our time series query protobuffer format. +func getAggFunction(agg tspb.TimeSeriesQueryAggregator) aggFunc { + switch agg { + case tspb.TimeSeriesQueryAggregator_AVG: + return aggFuncAvg + case tspb.TimeSeriesQueryAggregator_SUM: + return aggFuncSum + case tspb.TimeSeriesQueryAggregator_MAX: + return aggFuncMax + case tspb.TimeSeriesQueryAggregator_MIN: + return aggFuncMin + } + + // The model should not be called with an invalid aggregator option. + panic(fmt.Sprintf("unknown aggregator option specified: %v", agg)) +} + +func fillFuncLinearInterpolate(before DataSeries, after DataSeries, resolution int64) DataSeries { + start := before[len(before)-1] + end := after[0] + + // compute interpolation step + step := (end.value - start.value) / float64(end.timestamp-start.timestamp) + + result := make(DataSeries, (end.timestamp-start.timestamp)/resolution-1) + for i := range result { + result[i] = dp( + start.timestamp+(resolution*int64(i+1)), + start.value+(step*float64(i+1)*float64(resolution)), + ) + } + return result +}