Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
move conversion and chunk encoding to importer pkg
Browse files Browse the repository at this point in the history
also moves all the related tests
  • Loading branch information
replay committed Jun 10, 2019
1 parent 8514329 commit 61827c4
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 245 deletions.
75 changes: 9 additions & 66 deletions cmd/mt-whisper-importer-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ import (
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/mdata/importer"
"github.com/kisielk/whisper-go/whisper"
"github.com/raintank/schema"
Expand Down Expand Up @@ -73,15 +71,15 @@ var (
"",
"A regex pattern to be applied to all metric names, only matching ones will be imported",
)
importUpTo = flag.Uint(
"import-up-to",
importUntil = flag.Uint(
"import-until",
math.MaxUint32,
"Only import up to the specified timestamp",
"Only import up to, but not including, the specified timestamp",
)
importAfter = flag.Uint(
"import-after",
importFrom = flag.Uint(
"import-from",
0,
"Only import after the specified timestamp",
"Only import starting from the specified timestamp",
)
positionFile = flag.String(
"position-file",
Expand Down Expand Up @@ -241,20 +239,6 @@ func getMetricName(file string) string {
return *namePrefix + strings.Replace(strings.TrimSuffix(file, ".wsp"), "/", ".", -1)
}

// pointSorter sorts points by timestamp
type pointSorter []whisper.Point

func (a pointSorter) Len() int { return len(a) }
func (a pointSorter) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a pointSorter) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp }

// the whisper archives are organized like a ringbuffer. since we need to
// insert the points into the chunks in order we first need to sort them
func sortPoints(points pointSorter) pointSorter {
sort.Sort(points)
return points
}

func convertWhisperMethod(whisperMethod whisper.AggregationMethod) (schema.Method, error) {
switch whisperMethod {
case whisper.AggregationAverage:
Expand Down Expand Up @@ -305,9 +289,9 @@ func getMetric(w *whisper.Whisper, file, name string) (*importer.ArchiveRequest,
res.MetricData.SetId()

_, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint))
conversion := newConversion(w.Header.Archives, points, method)
conversion := importer.NewConversion(w.Header.Archives, points, method, uint32(*importFrom), uint32(*importUntil))
for retIdx, retention := range selectedSchema.Retentions {
convertedPoints := conversion.getPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints))
convertedPoints := conversion.GetPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints))
for m, p := range convertedPoints {
if len(p) == 0 {
continue
Expand All @@ -318,7 +302,7 @@ func getMetric(w *whisper.Whisper, file, name string) (*importer.ArchiveRequest,
archive = schema.NewArchive(m, retention.ChunkSpan)
}

encodedChunks := encodedChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan)
encodedChunks := importer.EncodeChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan, *writeUnfinishedChunks)
for _, chunk := range encodedChunks {
res.ChunkWriteRequests = append(res.ChunkWriteRequests, importer.NewChunkWriteRequest(
archive,
Expand All @@ -338,47 +322,6 @@ func getMetric(w *whisper.Whisper, file, name string) (*importer.ArchiveRequest,
return res, nil
}

func encodedChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint32) []*chunk.Chunk {
var point whisper.Point
var t0, prevT0 uint32
var c *chunk.Chunk
var encodedChunks []*chunk.Chunk

for _, point = range points {
// this shouldn't happen, but if it would we better catch it here because Metrictank wouldn't handle it well:
// https://github.com/grafana/metrictank/blob/f1868cccfb92fc82cd853914af958f6d187c5f74/mdata/aggmetric.go#L378
if point.Timestamp == 0 {
continue
}

t0 = point.Timestamp - (point.Timestamp % chunkSpan)
if prevT0 == 0 {
c = chunk.New(t0)
prevT0 = t0
} else if prevT0 != t0 {
c.Finish()
encodedChunks = append(encodedChunks, c)

c = chunk.New(t0)
prevT0 = t0
}

err := c.Push(point.Timestamp, point.Value)
if err != nil {
panic(fmt.Sprintf("ERROR: Failed to push value into chunk at t0 %d: %q", t0, err))
}
}

// if the last written point was also the last one of the current chunk,
// or if writeUnfinishedChunks is on, we close the chunk and push it
if point.Timestamp == t0+chunkSpan-intervalIn || *writeUnfinishedChunks {
c.Finish()
encodedChunks = append(encodedChunks, c)
}

return encodedChunks
}

// scan a directory and feed the list of whisper files relative to base into the given channel
func getFileListIntoChan(pos *posTracker, fileChan chan string) {
filepath.Walk(
Expand Down
49 changes: 49 additions & 0 deletions mdata/importer/chunk_encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package importer

import (
"fmt"

"github.com/grafana/metrictank/mdata/chunk"
"github.com/kisielk/whisper-go/whisper"
)

func EncodeChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint32, writeUnfinishedChunks bool) []*chunk.Chunk {
var point whisper.Point
var t0, prevT0 uint32
var c *chunk.Chunk
var encodedChunks []*chunk.Chunk

for _, point = range points {
// this shouldn't happen, but if it would we better catch it here because Metrictank wouldn't handle it well:
// https://github.com/grafana/metrictank/blob/f1868cccfb92fc82cd853914af958f6d187c5f74/mdata/aggmetric.go#L378
if point.Timestamp == 0 {
continue
}

t0 = point.Timestamp - (point.Timestamp % chunkSpan)
if prevT0 == 0 {
c = chunk.New(t0)
prevT0 = t0
} else if prevT0 != t0 {
c.Finish()
encodedChunks = append(encodedChunks, c)

c = chunk.New(t0)
prevT0 = t0
}

err := c.Push(point.Timestamp, point.Value)
if err != nil {
panic(fmt.Sprintf("ERROR: Failed to push value into chunk at t0 %d: %q", t0, err))
}
}

// if the last written point was also the last one of the current chunk,
// or if writeUnfinishedChunks is on, we close the chunk and push it
if point.Timestamp == t0+chunkSpan-intervalIn || writeUnfinishedChunks {
c.Finish()
encodedChunks = append(encodedChunks, c)
}

return encodedChunks
}
79 changes: 79 additions & 0 deletions mdata/importer/chunk_encoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package importer

import (
"testing"

"github.com/grafana/metrictank/mdata/chunk"
)

func TestEncodedChunksFromPointsWithUnfinished(t *testing.T) {
points := generatePoints(25200, 10, 10, 0, 8640, func(i float64) float64 { return i + 1 })
expectedCount := 8640 // count including unfinished chunks

chunks := EncodeChunksFromPoints(points, 10, 21600, true)

if len(chunks) != 5 {
t.Fatalf("Expected to get 5 chunks, but got %d", len(chunks))
}

i := 0
for _, c := range chunks {
iterGen, err := chunk.NewIterGen(c.Series.T0, 10, c.Encode(21600))
if err != nil {
t.Fatalf("Error getting iterator: %s", err)
}

iter, err := iterGen.Get()
if err != nil {
t.Fatalf("Error getting iterator: %s", err)
}

for iter.Next() {
ts, val := iter.Values()
if points[i].Timestamp != ts || points[i].Value != val {
t.Fatalf("Unexpected value at index %d:\nExpected: %d:%f\nGot: %d:%f\n", i, ts, val, points[i].Timestamp, points[i].Value)
}
i++
}
}
if i != expectedCount {
t.Fatalf("Unexpected number of datapoints in chunks:\nExpected: %d\nGot: %d\n", expectedCount, i)
}
}

func TestEncodedChunksFromPointsWithoutUnfinished(t *testing.T) {
// the actual data in these points doesn't matter, we just want to be sure
// that the chunks resulting from these points include the same data
points := generatePoints(25200, 10, 10, 0, 8640, func(i float64) float64 { return i + 1 })
expectedCount := 8640 - (2520 % 2160) // count minus what would end up in an unfinished chunk

chunks := EncodeChunksFromPoints(points, 10, 21600, false)

if len(chunks) != 4 {
t.Fatalf("Expected to get 4 chunks, but got %d", len(chunks))
}

i := 0
for _, c := range chunks {
iterGen, err := chunk.NewIterGen(c.Series.T0, 10, c.Encode(21600))
if err != nil {
t.Fatalf("Error getting iterator: %s", err)
}

iter, err := iterGen.Get()
if err != nil {
t.Fatalf("Error getting iterator: %s", err)
}

for iter.Next() {
ts, val := iter.Values()
if points[i].Timestamp != ts || points[i].Value != val {
t.Fatalf("Unexpected value at index %d:\nExpected: %d:%f\nGot: %d:%f\n", i, ts, val, points[i].Timestamp, points[i].Value)
}
i++
}
}
if i != expectedCount {
t.Fatalf("Unexpected number of datapoints in chunks:\nExpected: %d\nGot: %d\n", expectedCount, i)
}
}
Loading

0 comments on commit 61827c4

Please sign in to comment.