From 61827c46de1933fb85ae9d186304189b75acb315 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Mon, 10 Jun 2019 14:33:36 +0000 Subject: [PATCH] move conversion and chunk encoding to importer pkg also moves all the related tests --- cmd/mt-whisper-importer-reader/main.go | 75 +------ mdata/importer/chunk_encoder.go | 49 ++++ mdata/importer/chunk_encoder_test.go | 79 +++++++ .../importer}/conversion.go | 93 ++++---- .../importer}/conversion_test.go | 209 ++++++------------ 5 files changed, 260 insertions(+), 245 deletions(-) create mode 100644 mdata/importer/chunk_encoder.go create mode 100644 mdata/importer/chunk_encoder_test.go rename {cmd/mt-whisper-importer-reader => mdata/importer}/conversion.go (82%) rename {cmd/mt-whisper-importer-reader => mdata/importer}/conversion_test.go (85%) diff --git a/cmd/mt-whisper-importer-reader/main.go b/cmd/mt-whisper-importer-reader/main.go index 217cf70512..d8cb5d08f2 100644 --- a/cmd/mt-whisper-importer-reader/main.go +++ b/cmd/mt-whisper-importer-reader/main.go @@ -12,7 +12,6 @@ import ( "os" "path/filepath" "regexp" - "sort" "strings" "sync" "sync/atomic" @@ -20,7 +19,6 @@ import ( "github.com/grafana/metrictank/conf" "github.com/grafana/metrictank/logger" - "github.com/grafana/metrictank/mdata/chunk" "github.com/grafana/metrictank/mdata/importer" "github.com/kisielk/whisper-go/whisper" "github.com/raintank/schema" @@ -73,15 +71,15 @@ var ( "", "A regex pattern to be applied to all metric names, only matching ones will be imported", ) - importUpTo = flag.Uint( - "import-up-to", + importUntil = flag.Uint( + "import-until", math.MaxUint32, - "Only import up to the specified timestamp", + "Only import up to, but not including, the specified timestamp", ) - importAfter = flag.Uint( - "import-after", + importFrom = flag.Uint( + "import-from", 0, - "Only import after the specified timestamp", + "Only import starting from the specified timestamp", ) positionFile = flag.String( "position-file", @@ -241,20 +239,6 @@ func getMetricName(file string) string { return *namePrefix + strings.Replace(strings.TrimSuffix(file, ".wsp"), "/", ".", -1) } -// pointSorter sorts points by timestamp -type pointSorter []whisper.Point - -func (a pointSorter) Len() int { return len(a) } -func (a pointSorter) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a pointSorter) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp } - -// the whisper archives are organized like a ringbuffer. since we need to -// insert the points into the chunks in order we first need to sort them -func sortPoints(points pointSorter) pointSorter { - sort.Sort(points) - return points -} - func convertWhisperMethod(whisperMethod whisper.AggregationMethod) (schema.Method, error) { switch whisperMethod { case whisper.AggregationAverage: @@ -305,9 +289,9 @@ func getMetric(w *whisper.Whisper, file, name string) (*importer.ArchiveRequest, res.MetricData.SetId() _, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint)) - conversion := newConversion(w.Header.Archives, points, method) + conversion := importer.NewConversion(w.Header.Archives, points, method, uint32(*importFrom), uint32(*importUntil)) for retIdx, retention := range selectedSchema.Retentions { - convertedPoints := conversion.getPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints)) + convertedPoints := conversion.GetPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints)) for m, p := range convertedPoints { if len(p) == 0 { continue @@ -318,7 +302,7 @@ func getMetric(w *whisper.Whisper, file, name string) (*importer.ArchiveRequest, archive = schema.NewArchive(m, retention.ChunkSpan) } - encodedChunks := encodedChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan) + encodedChunks := importer.EncodeChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan, *writeUnfinishedChunks) for _, chunk := range encodedChunks { res.ChunkWriteRequests = append(res.ChunkWriteRequests, importer.NewChunkWriteRequest( archive, @@ -338,47 +322,6 @@ func getMetric(w *whisper.Whisper, file, name string) (*importer.ArchiveRequest, return res, nil } -func encodedChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint32) []*chunk.Chunk { - var point whisper.Point - var t0, prevT0 uint32 - var c *chunk.Chunk - var encodedChunks []*chunk.Chunk - - for _, point = range points { - // this shouldn't happen, but if it would we better catch it here because Metrictank wouldn't handle it well: - // https://github.com/grafana/metrictank/blob/f1868cccfb92fc82cd853914af958f6d187c5f74/mdata/aggmetric.go#L378 - if point.Timestamp == 0 { - continue - } - - t0 = point.Timestamp - (point.Timestamp % chunkSpan) - if prevT0 == 0 { - c = chunk.New(t0) - prevT0 = t0 - } else if prevT0 != t0 { - c.Finish() - encodedChunks = append(encodedChunks, c) - - c = chunk.New(t0) - prevT0 = t0 - } - - err := c.Push(point.Timestamp, point.Value) - if err != nil { - panic(fmt.Sprintf("ERROR: Failed to push value into chunk at t0 %d: %q", t0, err)) - } - } - - // if the last written point was also the last one of the current chunk, - // or if writeUnfinishedChunks is on, we close the chunk and push it - if point.Timestamp == t0+chunkSpan-intervalIn || *writeUnfinishedChunks { - c.Finish() - encodedChunks = append(encodedChunks, c) - } - - return encodedChunks -} - // scan a directory and feed the list of whisper files relative to base into the given channel func getFileListIntoChan(pos *posTracker, fileChan chan string) { filepath.Walk( diff --git a/mdata/importer/chunk_encoder.go b/mdata/importer/chunk_encoder.go new file mode 100644 index 0000000000..94f42b9928 --- /dev/null +++ b/mdata/importer/chunk_encoder.go @@ -0,0 +1,49 @@ +package importer + +import ( + "fmt" + + "github.com/grafana/metrictank/mdata/chunk" + "github.com/kisielk/whisper-go/whisper" +) + +func EncodeChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint32, writeUnfinishedChunks bool) []*chunk.Chunk { + var point whisper.Point + var t0, prevT0 uint32 + var c *chunk.Chunk + var encodedChunks []*chunk.Chunk + + for _, point = range points { + // this shouldn't happen, but if it would we better catch it here because Metrictank wouldn't handle it well: + // https://github.com/grafana/metrictank/blob/f1868cccfb92fc82cd853914af958f6d187c5f74/mdata/aggmetric.go#L378 + if point.Timestamp == 0 { + continue + } + + t0 = point.Timestamp - (point.Timestamp % chunkSpan) + if prevT0 == 0 { + c = chunk.New(t0) + prevT0 = t0 + } else if prevT0 != t0 { + c.Finish() + encodedChunks = append(encodedChunks, c) + + c = chunk.New(t0) + prevT0 = t0 + } + + err := c.Push(point.Timestamp, point.Value) + if err != nil { + panic(fmt.Sprintf("ERROR: Failed to push value into chunk at t0 %d: %q", t0, err)) + } + } + + // if the last written point was also the last one of the current chunk, + // or if writeUnfinishedChunks is on, we close the chunk and push it + if point.Timestamp == t0+chunkSpan-intervalIn || writeUnfinishedChunks { + c.Finish() + encodedChunks = append(encodedChunks, c) + } + + return encodedChunks +} diff --git a/mdata/importer/chunk_encoder_test.go b/mdata/importer/chunk_encoder_test.go new file mode 100644 index 0000000000..1b18250e1c --- /dev/null +++ b/mdata/importer/chunk_encoder_test.go @@ -0,0 +1,79 @@ +package importer + +import ( + "testing" + + "github.com/grafana/metrictank/mdata/chunk" +) + +func TestEncodedChunksFromPointsWithUnfinished(t *testing.T) { + points := generatePoints(25200, 10, 10, 0, 8640, func(i float64) float64 { return i + 1 }) + expectedCount := 8640 // count including unfinished chunks + + chunks := EncodeChunksFromPoints(points, 10, 21600, true) + + if len(chunks) != 5 { + t.Fatalf("Expected to get 5 chunks, but got %d", len(chunks)) + } + + i := 0 + for _, c := range chunks { + iterGen, err := chunk.NewIterGen(c.Series.T0, 10, c.Encode(21600)) + if err != nil { + t.Fatalf("Error getting iterator: %s", err) + } + + iter, err := iterGen.Get() + if err != nil { + t.Fatalf("Error getting iterator: %s", err) + } + + for iter.Next() { + ts, val := iter.Values() + if points[i].Timestamp != ts || points[i].Value != val { + t.Fatalf("Unexpected value at index %d:\nExpected: %d:%f\nGot: %d:%f\n", i, ts, val, points[i].Timestamp, points[i].Value) + } + i++ + } + } + if i != expectedCount { + t.Fatalf("Unexpected number of datapoints in chunks:\nExpected: %d\nGot: %d\n", expectedCount, i) + } +} + +func TestEncodedChunksFromPointsWithoutUnfinished(t *testing.T) { + // the actual data in these points doesn't matter, we just want to be sure + // that the chunks resulting from these points include the same data + points := generatePoints(25200, 10, 10, 0, 8640, func(i float64) float64 { return i + 1 }) + expectedCount := 8640 - (2520 % 2160) // count minus what would end up in an unfinished chunk + + chunks := EncodeChunksFromPoints(points, 10, 21600, false) + + if len(chunks) != 4 { + t.Fatalf("Expected to get 4 chunks, but got %d", len(chunks)) + } + + i := 0 + for _, c := range chunks { + iterGen, err := chunk.NewIterGen(c.Series.T0, 10, c.Encode(21600)) + if err != nil { + t.Fatalf("Error getting iterator: %s", err) + } + + iter, err := iterGen.Get() + if err != nil { + t.Fatalf("Error getting iterator: %s", err) + } + + for iter.Next() { + ts, val := iter.Values() + if points[i].Timestamp != ts || points[i].Value != val { + t.Fatalf("Unexpected value at index %d:\nExpected: %d:%f\nGot: %d:%f\n", i, ts, val, points[i].Timestamp, points[i].Value) + } + i++ + } + } + if i != expectedCount { + t.Fatalf("Unexpected number of datapoints in chunks:\nExpected: %d\nGot: %d\n", expectedCount, i) + } +} diff --git a/cmd/mt-whisper-importer-reader/conversion.go b/mdata/importer/conversion.go similarity index 82% rename from cmd/mt-whisper-importer-reader/conversion.go rename to mdata/importer/conversion.go index 18b2fd0969..707f67a457 100644 --- a/cmd/mt-whisper-importer-reader/conversion.go +++ b/mdata/importer/conversion.go @@ -1,49 +1,28 @@ -package main +package importer import ( "github.com/grafana/metrictank/mdata" "github.com/kisielk/whisper-go/whisper" "github.com/raintank/schema" + "sort" ) type conversion struct { archives []whisper.ArchiveInfo points map[int][]whisper.Point method schema.Method + from uint32 + until uint32 } const fakeAvg schema.Method = 255 -func newConversion(arch []whisper.ArchiveInfo, points map[int][]whisper.Point, method schema.Method) *conversion { - return &conversion{archives: arch, points: points, method: method} -} - -func (c *conversion) findSmallestLargestArchive(spp, nop uint32) (int, int) { - // find smallest archive that still contains enough data to satisfy requested range - largestArchiveIdx := len(c.archives) - 1 - for i := largestArchiveIdx; i >= 0; i-- { - arch := c.archives[i] - if arch.Points*arch.SecondsPerPoint < nop*spp { - break - } - largestArchiveIdx = i - } - - // find largest archive that still has a higher or equal resolution than requested - smallestArchiveIdx := 0 - for i := 0; i < len(c.archives); i++ { - arch := c.archives[i] - if arch.SecondsPerPoint > spp { - break - } - smallestArchiveIdx = i - } - - return smallestArchiveIdx, largestArchiveIdx +func NewConversion(arch []whisper.ArchiveInfo, points map[int][]whisper.Point, method schema.Method, from, until uint32) *conversion { + return &conversion{archives: arch, points: points, method: method, from: from, until: until} } // generates points according to specified parameters by finding and using the best archives as input -func (c *conversion) getPoints(retIdx int, spp, nop uint32) map[schema.Method][]whisper.Point { +func (c *conversion) GetPoints(retIdx int, spp, nop uint32) map[schema.Method][]whisper.Point { res := make(map[schema.Method][]whisper.Point) if len(c.points) == 0 { @@ -78,7 +57,7 @@ func (c *conversion) getPoints(retIdx int, spp, nop uint32) map[schema.Method][] rawFactor := float64(spp) / float64(rawRes) if retIdx == 0 || c.method != schema.Avg { for _, p := range in { - if p.Timestamp > uint32(*importUpTo) || p.Timestamp < uint32(*importAfter) { + if p.Timestamp > c.until || p.Timestamp < c.from { continue } adjustedPoints[c.method][p.Timestamp] = p.Value @@ -88,7 +67,7 @@ func (c *conversion) getPoints(retIdx int, spp, nop uint32) map[schema.Method][] } } else { for _, p := range in { - if p.Timestamp > uint32(*importUpTo) || p.Timestamp < uint32(*importAfter) { + if p.Timestamp > c.until || p.Timestamp < c.from { continue } adjustedPoints[schema.Sum][p.Timestamp] = p.Value * rawFactor @@ -96,13 +75,13 @@ func (c *conversion) getPoints(retIdx int, spp, nop uint32) map[schema.Method][] } } } else if arch.SecondsPerPoint > spp { - for m, points := range incResolution(in, method, arch.SecondsPerPoint, spp, rawRes) { + for m, points := range c.incResolution(in, method, arch.SecondsPerPoint, spp, rawRes) { for _, p := range points { adjustedPoints[m][p.Timestamp] = p.Value } } } else { - for m, points := range decResolution(in, method, arch.SecondsPerPoint, spp, rawRes) { + for m, points := range c.decResolution(in, method, arch.SecondsPerPoint, spp, rawRes) { for _, p := range points { adjustedPoints[m][p.Timestamp] = p.Value } @@ -113,7 +92,7 @@ func (c *conversion) getPoints(retIdx int, spp, nop uint32) map[schema.Method][] // merge the results that are keyed by timestamp into a slice of points for m, p := range adjustedPoints { for t, v := range p { - if t <= uint32(*importUpTo) && t >= uint32(*importAfter) { + if t <= c.until && t >= c.from { res[m] = append(res[m], whisper.Point{Timestamp: t, Value: v}) } } @@ -130,11 +109,35 @@ func (c *conversion) getPoints(retIdx int, spp, nop uint32) map[schema.Method][] return res } +func (c *conversion) findSmallestLargestArchive(spp, nop uint32) (int, int) { + // find smallest archive that still contains enough data to satisfy requested range + largestArchiveIdx := len(c.archives) - 1 + for i := largestArchiveIdx; i >= 0; i-- { + arch := c.archives[i] + if arch.Points*arch.SecondsPerPoint < nop*spp { + break + } + largestArchiveIdx = i + } + + // find largest archive that still has a higher or equal resolution than requested + smallestArchiveIdx := 0 + for i := 0; i < len(c.archives); i++ { + arch := c.archives[i] + if arch.SecondsPerPoint > spp { + break + } + smallestArchiveIdx = i + } + + return smallestArchiveIdx, largestArchiveIdx +} + // increase resolution of given points according to defined specs by generating // additional datapoints to bridge the gaps between the given points. depending // on what aggregation method is specified, those datapoints may be generated in // slightly different ways. -func incResolution(points []whisper.Point, method schema.Method, inRes, outRes, rawRes uint32) map[schema.Method][]whisper.Point { +func (c *conversion) incResolution(points []whisper.Point, method schema.Method, inRes, outRes, rawRes uint32) map[schema.Method][]whisper.Point { out := make(map[schema.Method][]whisper.Point) resFactor := float64(outRes) / float64(rawRes) for _, inPoint := range points { @@ -151,7 +154,7 @@ func incResolution(points []whisper.Point, method schema.Method, inRes, outRes, // generate datapoints based on inPoint in reverse order var outPoints []whisper.Point for ts := rangeEnd; ts > inPoint.Timestamp-inRes; ts = ts - outRes { - if ts > uint32(*importUpTo) || ts < uint32(*importAfter) { + if ts > c.until || ts < c.from { continue } outPoints = append(outPoints, whisper.Point{Timestamp: ts}) @@ -183,7 +186,7 @@ func incResolution(points []whisper.Point, method schema.Method, inRes, outRes, // decreases the resolution of given points by using the aggregation method specified // in the second argument. emulates the way metrictank aggregates data when it generates // rollups of the raw data. -func decResolution(points []whisper.Point, method schema.Method, inRes, outRes, rawRes uint32) map[schema.Method][]whisper.Point { +func (c *conversion) decResolution(points []whisper.Point, method schema.Method, inRes, outRes, rawRes uint32) map[schema.Method][]whisper.Point { out := make(map[schema.Method][]whisper.Point) agg := mdata.NewAggregation() currentBoundary := uint32(0) @@ -241,10 +244,10 @@ func decResolution(points []whisper.Point, method schema.Method, inRes, outRes, continue } boundary := mdata.AggBoundary(inPoint.Timestamp, outRes) - if boundary > uint32(*importUpTo) { + if boundary > c.until { break } - if boundary < uint32(*importAfter) { + if boundary < c.from { continue } @@ -262,3 +265,17 @@ func decResolution(points []whisper.Point, method schema.Method, inRes, outRes, return out } + +// pointSorter sorts points by timestamp +type pointSorter []whisper.Point + +func (a pointSorter) Len() int { return len(a) } +func (a pointSorter) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a pointSorter) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp } + +// the whisper archives are organized like a ringbuffer. since we need to +// insert the points into the chunks in order we first need to sort them +func sortPoints(points pointSorter) pointSorter { + sort.Sort(points) + return points +} diff --git a/cmd/mt-whisper-importer-reader/conversion_test.go b/mdata/importer/conversion_test.go similarity index 85% rename from cmd/mt-whisper-importer-reader/conversion_test.go rename to mdata/importer/conversion_test.go index cae00d72cc..ca874107d5 100644 --- a/cmd/mt-whisper-importer-reader/conversion_test.go +++ b/mdata/importer/conversion_test.go @@ -1,18 +1,17 @@ -package main +package importer import ( "math" "testing" - "github.com/grafana/metrictank/mdata/chunk" - "github.com/kisielk/whisper-go/whisper" "github.com/raintank/schema" ) -func testIncResolution(t *testing.T, inData []whisper.Point, expectedResult map[schema.Method][]whisper.Point, method schema.Method, inRes, outRes, rawRes uint32) { +func testIncResolution(t *testing.T, inData []whisper.Point, expectedResult map[schema.Method][]whisper.Point, method schema.Method, inRes, outRes, rawRes, from, until uint32) { t.Helper() - outData := incResolution(inData, method, inRes, outRes, rawRes) + c := conversion{from: from, until: until} + outData := c.incResolution(inData, method, inRes, outRes, rawRes) if len(expectedResult) != len(outData) { t.Fatalf("Generated data is not as expected:\nExpected:\n%+v\nGot:\n%+v\n", expectedResult, outData) @@ -24,7 +23,7 @@ func testIncResolution(t *testing.T, inData []whisper.Point, expectedResult map[ if p, ok = outData[m]; !ok { t.Fatalf("testIncResolution.\nExpected:\n%+v\nGot:\n%+v\n", expectedResult, outData) } - if len(p) != len(outData[m]) { + if len(p) != len(ep) { t.Fatalf("testIncResolution.\nExpected:\n%+v\nGot:\n%+v\n", expectedResult, outData) } for i := range p { @@ -49,9 +48,7 @@ func TestIncResolutionUpToTime(t *testing.T) { {5, 5}, }, } - *importUpTo = uint(5) - testIncResolution(t, inData, expectedResult, fakeAvg, 10, 5, 1) - *importUpTo = math.MaxUint32 + testIncResolution(t, inData, expectedResult, fakeAvg, 10, 5, 1, 0, 5) } func TestIncResolutionFakeAvgNonFactorResolutions(t *testing.T) { @@ -102,7 +99,7 @@ func TestIncResolutionFakeAvgNonFactorResolutions(t *testing.T) { }, } - testIncResolution(t, inData, expectedResult, fakeAvg, 10, 3, 1) + testIncResolution(t, inData, expectedResult, fakeAvg, 10, 3, 1, 0, math.MaxUint32) } func TestIncFakeAvgResolutionWithGaps(t *testing.T) { @@ -135,7 +132,7 @@ func TestIncFakeAvgResolutionWithGaps(t *testing.T) { }, } - testIncResolution(t, inData, expectedResult, fakeAvg, 10, 5, 1) + testIncResolution(t, inData, expectedResult, fakeAvg, 10, 5, 1, 0, math.MaxUint32) } func TestIncFakeAvgResolutionOutOfOrder(t *testing.T) { @@ -164,7 +161,7 @@ func TestIncFakeAvgResolutionOutOfOrder(t *testing.T) { }, } - testIncResolution(t, inData, expectedResult, fakeAvg, 10, 5, 1) + testIncResolution(t, inData, expectedResult, fakeAvg, 10, 5, 1, 0, math.MaxUint32) } func TestIncFakeAvgResolutionStrangeRawRes(t *testing.T) { @@ -201,7 +198,7 @@ func TestIncFakeAvgResolutionStrangeRawRes(t *testing.T) { }, } - testIncResolution(t, inData, expectedResult, fakeAvg, 30, 10, 3) + testIncResolution(t, inData, expectedResult, fakeAvg, 30, 10, 3, 0, math.MaxUint32) } func TestIncResolutionSimpleMax(t *testing.T) { @@ -218,7 +215,7 @@ func TestIncResolutionSimpleMax(t *testing.T) { {20, 11}, }, } - testIncResolution(t, inData, expectedResult, schema.Max, 10, 5, 1) + testIncResolution(t, inData, expectedResult, schema.Max, 10, 5, 1, 0, math.MaxUint32) } func TestIncResolutionSimpleLast(t *testing.T) { @@ -235,7 +232,7 @@ func TestIncResolutionSimpleLast(t *testing.T) { {20, 11}, }, } - testIncResolution(t, inData, expectedResult, schema.Lst, 10, 5, 1) + testIncResolution(t, inData, expectedResult, schema.Lst, 10, 5, 1, 0, math.MaxUint32) } func TestIncResolutionSimpleMin(t *testing.T) { @@ -252,7 +249,7 @@ func TestIncResolutionSimpleMin(t *testing.T) { {20, 11}, }, } - testIncResolution(t, inData, expectedResult, schema.Min, 10, 5, 1) + testIncResolution(t, inData, expectedResult, schema.Min, 10, 5, 1, 0, math.MaxUint32) } func TestIncResolutionSimpleAvg(t *testing.T) { @@ -269,7 +266,7 @@ func TestIncResolutionSimpleAvg(t *testing.T) { {20, 11}, }, } - testIncResolution(t, inData, expectedResult, schema.Avg, 10, 5, 1) + testIncResolution(t, inData, expectedResult, schema.Avg, 10, 5, 1, 0, math.MaxUint32) } func TestIncResolutionSimpleFakeAvg(t *testing.T) { @@ -292,7 +289,7 @@ func TestIncResolutionSimpleFakeAvg(t *testing.T) { {20, 5}, }, } - testIncResolution(t, inData, expectedResult, fakeAvg, 10, 5, 1) + testIncResolution(t, inData, expectedResult, fakeAvg, 10, 5, 1, 0, math.MaxUint32) } func TestIncResolutionSimpleSum(t *testing.T) { @@ -315,7 +312,7 @@ func TestIncResolutionSimpleSum(t *testing.T) { {20, 5}, }, } - testIncResolution(t, inData, expectedResult, schema.Sum, 10, 5, 1) + testIncResolution(t, inData, expectedResult, schema.Sum, 10, 5, 1, 0, math.MaxUint32) } func TestIncResolutionNonFactorResolutions(t *testing.T) { @@ -348,7 +345,7 @@ func TestIncResolutionNonFactorResolutions(t *testing.T) { }, } - testIncResolution(t, inData, expectedResult, schema.Max, 10, 3, 1) + testIncResolution(t, inData, expectedResult, schema.Max, 10, 3, 1, 0, math.MaxUint32) } func TestIncResolutionWithGaps(t *testing.T) { @@ -373,7 +370,7 @@ func TestIncResolutionWithGaps(t *testing.T) { }, } - testIncResolution(t, inData, expectedResult, schema.Max, 10, 5, 1) + testIncResolution(t, inData, expectedResult, schema.Max, 10, 5, 1, 0, math.MaxUint32) } func TestIncResolutionOutOfOrder(t *testing.T) { @@ -394,12 +391,13 @@ func TestIncResolutionOutOfOrder(t *testing.T) { }, } - testIncResolution(t, inData, expectedResult, schema.Max, 10, 5, 1) + testIncResolution(t, inData, expectedResult, schema.Max, 10, 5, 1, 0, math.MaxUint32) } -func testDecResolution(t *testing.T, inData []whisper.Point, expectedResult map[schema.Method][]whisper.Point, method schema.Method, inRes, outRes, rawRes uint32) { +func testDecResolution(t *testing.T, inData []whisper.Point, expectedResult map[schema.Method][]whisper.Point, method schema.Method, inRes, outRes, rawRes, from, until uint32) { t.Helper() - outData := decResolution(inData, method, inRes, outRes, rawRes) + c := conversion{from: from, until: until} + outData := c.decResolution(inData, method, inRes, outRes, rawRes) if len(expectedResult) != len(outData) { t.Fatalf("Generated data has different length (%d) than expected (%d):\n%+v\n%+v", len(expectedResult), len(outData), outData, expectedResult) @@ -440,7 +438,7 @@ func TestDecResolutionSimpleAvg(t *testing.T) { {60, 14}, }, } - testDecResolution(t, getSimpleInData(), expectedResult, schema.Avg, 10, 30, 1) + testDecResolution(t, getSimpleInData(), expectedResult, schema.Avg, 10, 30, 1, 0, math.MaxUint32) } func TestDecResolutionSimpleFakeAvg(t *testing.T) { @@ -454,7 +452,7 @@ func TestDecResolutionSimpleFakeAvg(t *testing.T) { {60, 30}, }, } - testDecResolution(t, getSimpleInData(), expectedResult, fakeAvg, 10, 30, 1) + testDecResolution(t, getSimpleInData(), expectedResult, fakeAvg, 10, 30, 1, 0, math.MaxUint32) } func TestDecResolutionSimpleSum(t *testing.T) { @@ -468,7 +466,7 @@ func TestDecResolutionSimpleSum(t *testing.T) { {60, 30}, }, } - testDecResolution(t, getSimpleInData(), expectedResult, schema.Sum, 10, 30, 1) + testDecResolution(t, getSimpleInData(), expectedResult, schema.Sum, 10, 30, 1, 0, math.MaxUint32) } func TestDecResolutionSimpleLast(t *testing.T) { @@ -478,7 +476,7 @@ func TestDecResolutionSimpleLast(t *testing.T) { {60, 15}, }, } - testDecResolution(t, getSimpleInData(), expectedResult, schema.Lst, 10, 30, 1) + testDecResolution(t, getSimpleInData(), expectedResult, schema.Lst, 10, 30, 1, 0, math.MaxUint32) } func TestDecResolutionSimpleMax(t *testing.T) { @@ -488,7 +486,7 @@ func TestDecResolutionSimpleMax(t *testing.T) { {60, 15}, }, } - testDecResolution(t, getSimpleInData(), expectedResult, schema.Max, 10, 30, 1) + testDecResolution(t, getSimpleInData(), expectedResult, schema.Max, 10, 30, 1, 0, math.MaxUint32) } func TestDecResolutionSimpleMin(t *testing.T) { @@ -498,7 +496,7 @@ func TestDecResolutionSimpleMin(t *testing.T) { {60, 13}, }, } - testDecResolution(t, getSimpleInData(), expectedResult, schema.Min, 10, 30, 1) + testDecResolution(t, getSimpleInData(), expectedResult, schema.Min, 10, 30, 1, 0, math.MaxUint32) } func TestDecResolutionUpToTime(t *testing.T) { @@ -519,9 +517,7 @@ func TestDecResolutionUpToTime(t *testing.T) { {30, 6}, }, } - *importUpTo = uint(40) - testDecResolution(t, inData, expectedResult, schema.Sum, 10, 30, 5) - *importUpTo = math.MaxUint32 + testDecResolution(t, inData, expectedResult, schema.Sum, 10, 30, 5, 0, 40) } func TestDecResolutionAvg(t *testing.T) { @@ -540,7 +536,7 @@ func TestDecResolutionAvg(t *testing.T) { {60, 14}, }, } - testDecResolution(t, inData, expectedResult, schema.Avg, 10, 30, 1) + testDecResolution(t, inData, expectedResult, schema.Avg, 10, 30, 1, 0, math.MaxUint32) } func TestDecNonFactorResolutions(t *testing.T) { @@ -561,7 +557,7 @@ func TestDecNonFactorResolutions(t *testing.T) { {60, 14.5}, }, } - testDecResolution(t, inData, expectedResult, schema.Avg, 10, 15, 1) + testDecResolution(t, inData, expectedResult, schema.Avg, 10, 15, 1, 0, math.MaxUint32) } func getGapData() []whisper.Point { @@ -586,7 +582,7 @@ func TestDecResolutionWithGapsAvg(t *testing.T) { }, } - testDecResolution(t, getGapData(), expectedResult, schema.Avg, 10, 20, 1) + testDecResolution(t, getGapData(), expectedResult, schema.Avg, 10, 20, 1, 0, math.MaxUint32) } func TestDecResolutionWithGapsFakeAvg(t *testing.T) { @@ -603,7 +599,7 @@ func TestDecResolutionWithGapsFakeAvg(t *testing.T) { }, } - testDecResolution(t, getGapData(), expectedResult, fakeAvg, 10, 20, 1) + testDecResolution(t, getGapData(), expectedResult, fakeAvg, 10, 20, 1, 0, math.MaxUint32) } func TestDecResolutionWithGapsSum(t *testing.T) { @@ -620,7 +616,7 @@ func TestDecResolutionWithGapsSum(t *testing.T) { }, } - testDecResolution(t, getGapData(), expectedResult, schema.Sum, 10, 20, 1) + testDecResolution(t, getGapData(), expectedResult, schema.Sum, 10, 20, 1, 0, math.MaxUint32) } func TestDecResolutionOutOfOrder(t *testing.T) { @@ -639,7 +635,7 @@ func TestDecResolutionOutOfOrder(t *testing.T) { {60, 15}, }, } - testDecResolution(t, inData, expectedResult, schema.Avg, 10, 30, 1) + testDecResolution(t, inData, expectedResult, schema.Avg, 10, 30, 1, 0, math.MaxUint32) } func TestDecFakeAvgNonFactorResolutions(t *testing.T) { @@ -666,7 +662,7 @@ func TestDecFakeAvgNonFactorResolutions(t *testing.T) { {60, 20}, }, } - testDecResolution(t, inData, expectedResult, schema.Sum, 10, 15, 1) + testDecResolution(t, inData, expectedResult, schema.Sum, 10, 15, 1, 0, math.MaxUint32) } func TestDecResolutionFakeAvgOutOfOrder(t *testing.T) { @@ -689,7 +685,7 @@ func TestDecResolutionFakeAvgOutOfOrder(t *testing.T) { {60, 30}, }, } - testDecResolution(t, inData, expectedResult, schema.Sum, 10, 30, 1) + testDecResolution(t, inData, expectedResult, schema.Sum, 10, 30, 1, 0, math.MaxUint32) } func generatePoints(ts, interval uint32, value float64, offset, count int, inc func(float64) float64) []whisper.Point { @@ -705,80 +701,6 @@ func generatePoints(ts, interval uint32, value float64, offset, count int, inc f return res } -func TestEncodedChunksFromPointsWithoutUnfinished(t *testing.T) { - // the actual data in these points doesn't matter, we just want to be sure - // that the chunks resulting from these points include the same data - points := generatePoints(25200, 10, 10, 0, 8640, func(i float64) float64 { return i + 1 }) - expectedCount := 8640 - (2520 % 2160) // count minus what would end up in an unfinished chunk - - *writeUnfinishedChunks = false - chunks := encodedChunksFromPoints(points, 10, 21600) - - if len(chunks) != 4 { - t.Fatalf("Expected to get 4 chunks, but got %d", len(chunks)) - } - - i := 0 - for _, c := range chunks { - iterGen, err := chunk.NewIterGen(c.Series.T0, 10, c.Encode(21600)) - if err != nil { - t.Fatalf("Error getting iterator: %s", err) - } - - iter, err := iterGen.Get() - if err != nil { - t.Fatalf("Error getting iterator: %s", err) - } - - for iter.Next() { - ts, val := iter.Values() - if points[i].Timestamp != ts || points[i].Value != val { - t.Fatalf("Unexpected value at index %d:\nExpected: %d:%f\nGot: %d:%f\n", i, ts, val, points[i].Timestamp, points[i].Value) - } - i++ - } - } - if i != expectedCount { - t.Fatalf("Unexpected number of datapoints in chunks:\nExpected: %d\nGot: %d\n", expectedCount, i) - } -} - -func TestEncodedChunksFromPointsWithUnfinished(t *testing.T) { - points := generatePoints(25200, 10, 10, 0, 8640, func(i float64) float64 { return i + 1 }) - expectedCount := 8640 // count including unfinished chunks - - *writeUnfinishedChunks = true - chunks := encodedChunksFromPoints(points, 10, 21600) - - if len(chunks) != 5 { - t.Fatalf("Expected to get 5 chunks, but got %d", len(chunks)) - } - - i := 0 - for _, c := range chunks { - iterGen, err := chunk.NewIterGen(c.Series.T0, 10, c.Encode(21600)) - if err != nil { - t.Fatalf("Error getting iterator: %s", err) - } - - iter, err := iterGen.Get() - if err != nil { - t.Fatalf("Error getting iterator: %s", err) - } - - for iter.Next() { - ts, val := iter.Values() - if points[i].Timestamp != ts || points[i].Value != val { - t.Fatalf("Unexpected value at index %d:\nExpected: %d:%f\nGot: %d:%f\n", i, ts, val, points[i].Timestamp, points[i].Value) - } - i++ - } - } - if i != expectedCount { - t.Fatalf("Unexpected number of datapoints in chunks:\nExpected: %d\nGot: %d\n", expectedCount, i) - } -} - func verifyPointMaps(t *testing.T, points map[schema.Method][]whisper.Point, expected map[schema.Method][]whisper.Point) { t.Helper() for meth, ep := range expected { @@ -818,6 +740,7 @@ func TestPointsConversionSum1(t *testing.T) { }, }, method: schema.Sum, + until: math.MaxUint32, } expectedPoints1 := map[schema.Method][]whisper.Point{ @@ -847,9 +770,9 @@ func TestPointsConversionSum1(t *testing.T) { }, } - points1 := c.getPoints(0, 1, 8) - points2 := c.getPoints(0, 2, 4) - points3 := c.getPoints(0, 4, 2) + points1 := c.GetPoints(0, 1, 8) + points2 := c.GetPoints(0, 2, 4) + points3 := c.GetPoints(0, 4, 2) verifyPointMaps(t, points1, expectedPoints1) verifyPointMaps(t, points2, expectedPoints2) @@ -878,6 +801,7 @@ func TestPointsConversionLast1(t *testing.T) { }, }, method: schema.Lst, + until: math.MaxUint32, } expectedPoints1 := map[schema.Method][]whisper.Point{ @@ -907,9 +831,9 @@ func TestPointsConversionLast1(t *testing.T) { }, } - points1 := c.getPoints(0, 1, 8) - points2 := c.getPoints(0, 2, 4) - points3 := c.getPoints(0, 4, 2) + points1 := c.GetPoints(0, 1, 8) + points2 := c.GetPoints(0, 2, 4) + points3 := c.GetPoints(0, 4, 2) verifyPointMaps(t, points1, expectedPoints1) verifyPointMaps(t, points2, expectedPoints2) @@ -956,6 +880,7 @@ func TestPointsConversionSum2(t *testing.T) { }, }, method: schema.Sum, + until: math.MaxUint32, } expectedPoints1 := map[schema.Method][]whisper.Point{ @@ -1029,9 +954,9 @@ func TestPointsConversionSum2(t *testing.T) { }, } - points1 := c.getPoints(0, 1, 32) - points2 := c.getPoints(1, 2, 16) - points3 := c.getPoints(2, 4, 8) + points1 := c.GetPoints(0, 1, 32) + points2 := c.GetPoints(1, 2, 16) + points3 := c.GetPoints(2, 4, 8) verifyPointMaps(t, points1, expectedPoints1) verifyPointMaps(t, points2, expectedPoints2) @@ -1060,6 +985,7 @@ func TestPointsConversionAvg1(t *testing.T) { }, }, method: schema.Avg, + until: math.MaxUint32, } expectedPoints1_0 := map[schema.Method][]whisper.Point{ @@ -1177,19 +1103,19 @@ func TestPointsConversionAvg1(t *testing.T) { }, } - points1_0 := c.getPoints(0, 1, 8) - points2_0 := c.getPoints(0, 2, 4) - points3_0 := c.getPoints(0, 4, 2) + points1_0 := c.GetPoints(0, 1, 8) + points2_0 := c.GetPoints(0, 2, 4) + points3_0 := c.GetPoints(0, 4, 2) - points1_1 := c.getPoints(1, 1, 8) - points2_1 := c.getPoints(1, 2, 4) - points3_1 := c.getPoints(1, 4, 100) + points1_1 := c.GetPoints(1, 1, 8) + points2_1 := c.GetPoints(1, 2, 4) + points3_1 := c.GetPoints(1, 4, 100) - *importUpTo = uint(1503407723) - points1_2 := c.getPoints(1, 1, 8) - points2_2 := c.getPoints(1, 2, 100) - points3_2 := c.getPoints(1, 4, 8) - *importUpTo = math.MaxUint32 + c.until = 1503407723 + points1_2 := c.GetPoints(1, 1, 8) + points2_2 := c.GetPoints(1, 2, 100) + points3_2 := c.GetPoints(1, 4, 8) + c.until = math.MaxUint32 verifyPointMaps(t, points1_0, expectedPoints1_0) verifyPointMaps(t, points2_0, expectedPoints2_0) @@ -1229,6 +1155,7 @@ func TestPointsConversionAvg2(t *testing.T) { }, }, method: schema.Avg, + until: math.MaxUint32, } expectedPoints1_0 := map[schema.Method][]whisper.Point{ @@ -1385,13 +1312,13 @@ func TestPointsConversionAvg2(t *testing.T) { }, } - points1_0 := c.getPoints(0, 1, 27) - points2_0 := c.getPoints(0, 3, 100) - points3_0 := c.getPoints(0, 9, 100) + points1_0 := c.GetPoints(0, 1, 27) + points2_0 := c.GetPoints(0, 3, 100) + points3_0 := c.GetPoints(0, 9, 100) - points1_1 := c.getPoints(1, 1, 27) - points2_1 := c.getPoints(1, 3, 9) - points3_1 := c.getPoints(1, 9, 3) + points1_1 := c.GetPoints(1, 1, 27) + points2_1 := c.GetPoints(1, 3, 9) + points3_1 := c.GetPoints(1, 9, 3) verifyPointMaps(t, points1_0, expectedPoints1_0) verifyPointMaps(t, points2_0, expectedPoints2_0)